Skip to content

Commit 36df8ce

Browse files
authored
removed queued jobs for a tablet if no new jobs are seen (apache#4394)
fixes apache#3528
1 parent 59b873b commit 36df8ce

File tree

6 files changed

+314
-23
lines changed

6 files changed

+314
-23
lines changed

server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java

+2
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,9 @@ public void run() {
678678
eventHandler.clearNeedsFullScan();
679679

680680
iter = store.iterator(tableMgmtParams);
681+
manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel());
681682
var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true);
683+
manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel());
682684

683685
// If currently looking for volume replacements, determine if the next round needs to look.
684686
if (lookForTabletsNeedingVolReplacement) {

server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java

+48-8
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,23 @@
2323
import java.util.ArrayList;
2424
import java.util.Collection;
2525
import java.util.HashMap;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Objects;
30+
import java.util.Set;
2931
import java.util.TreeMap;
3032
import java.util.concurrent.atomic.AtomicBoolean;
3133
import java.util.concurrent.atomic.AtomicLong;
3234

3335
import org.apache.accumulo.core.dataImpl.KeyExtent;
36+
import org.apache.accumulo.core.metadata.schema.Ample;
3437
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
3538
import org.apache.accumulo.core.spi.compaction.CompactionJob;
3639
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
3740
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
3843

3944
import com.google.common.base.Preconditions;
4045

@@ -49,6 +54,8 @@
4954
*/
5055
public class CompactionJobPriorityQueue {
5156

57+
private static final Logger log = LoggerFactory.getLogger(CompactionJobPriorityQueue.class);
58+
5259
private final CompactorGroupId groupId;
5360

5461
private class CjpqKey implements Comparable<CjpqKey> {
@@ -99,9 +106,19 @@ public boolean equals(Object o) {
99106
private final AtomicLong rejectedJobs;
100107
private final AtomicLong dequeuedJobs;
101108

109+
private static class TabletJobs {
110+
final long generation;
111+
final HashSet<CjpqKey> jobs;
112+
113+
private TabletJobs(long generation, HashSet<CjpqKey> jobs) {
114+
this.generation = generation;
115+
this.jobs = jobs;
116+
}
117+
}
118+
102119
// This map tracks what jobs a tablet currently has in the queue. Its used to efficiently remove
103120
// jobs in the queue when new jobs are queued for a tablet.
104-
private final Map<KeyExtent,List<CjpqKey>> tabletJobs;
121+
private final Map<KeyExtent,TabletJobs> tabletJobs;
105122

106123
private final AtomicLong nextSeq = new AtomicLong(0);
107124

@@ -116,24 +133,46 @@ public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
116133
this.dequeuedJobs = new AtomicLong(0);
117134
}
118135

136+
public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) {
137+
if (closed.get()) {
138+
return;
139+
}
140+
141+
List<KeyExtent> removals = new ArrayList<>();
142+
143+
tabletJobs.forEach((extent, jobs) -> {
144+
if (Ample.DataLevel.of(extent.tableId()) == level && jobs.generation < currGeneration) {
145+
removals.add(extent);
146+
}
147+
});
148+
149+
if (!removals.isEmpty()) {
150+
log.trace("Removed {} queued tablets that no longer need compaction for {} {}",
151+
removals.size(), groupId, level);
152+
}
153+
154+
removals.forEach(this::removePreviousSubmissions);
155+
}
156+
119157
/**
120158
* @return the number of jobs added. If the queue is closed returns -1
121159
*/
122-
public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) {
160+
public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs,
161+
long generation) {
123162
Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId)));
124163
if (closed.get()) {
125164
return -1;
126165
}
127166

128167
removePreviousSubmissions(tabletMetadata.getExtent());
129168

130-
List<CjpqKey> newEntries = new ArrayList<>(jobs.size());
169+
HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
131170

132171
int jobsAdded = 0;
133172
for (CompactionJob job : jobs) {
134173
CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
135174
if (cjqpKey != null) {
136-
newEntries.add(cjqpKey);
175+
checkState(newEntries.add(cjqpKey));
137176
jobsAdded++;
138177
} else {
139178
// The priority for this job was lower than all other priorities and not added
@@ -143,7 +182,8 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection<Compaction
143182
}
144183

145184
if (!newEntries.isEmpty()) {
146-
checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) == null);
185+
checkState(tabletJobs.put(tabletMetadata.getExtent(), new TabletJobs(generation, newEntries))
186+
== null);
147187
}
148188

149189
return jobsAdded;
@@ -178,7 +218,7 @@ public synchronized CompactionJobQueues.MetaJob poll() {
178218
if (first != null) {
179219
dequeuedJobs.getAndIncrement();
180220
var extent = first.getValue().getTabletMetadata().getExtent();
181-
List<CjpqKey> jobs = tabletJobs.get(extent);
221+
Set<CjpqKey> jobs = tabletJobs.get(extent).jobs;
182222
checkState(jobs.remove(first.getKey()));
183223
if (jobs.isEmpty()) {
184224
tabletJobs.remove(extent);
@@ -207,9 +247,9 @@ public synchronized boolean closeIfEmpty() {
207247
}
208248

209249
private void removePreviousSubmissions(KeyExtent extent) {
210-
List<CjpqKey> prevJobs = tabletJobs.get(extent);
250+
TabletJobs prevJobs = tabletJobs.get(extent);
211251
if (prevJobs != null) {
212-
prevJobs.forEach(jobQueue::remove);
252+
prevJobs.jobs.forEach(jobQueue::remove);
213253
tabletJobs.remove(extent);
214254
}
215255
}

server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
package org.apache.accumulo.manager.compaction.queue;
2020

2121
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.EnumMap;
24+
import java.util.Map;
2225
import java.util.concurrent.ConcurrentHashMap;
2326
import java.util.concurrent.ConcurrentHashMap.KeySetView;
27+
import java.util.concurrent.atomic.AtomicLong;
2428
import java.util.stream.Collectors;
2529

30+
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
2631
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
2732
import org.apache.accumulo.core.spi.compaction.CompactionJob;
2833
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
@@ -46,8 +51,35 @@ public class CompactionJobQueues {
4651

4752
private final int queueSize;
4853

54+
private final Map<DataLevel,AtomicLong> currentGenerations;
55+
4956
public CompactionJobQueues(int queueSize) {
5057
this.queueSize = queueSize;
58+
Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class);
59+
for (var level : DataLevel.values()) {
60+
cg.put(level, new AtomicLong());
61+
}
62+
currentGenerations = Collections.unmodifiableMap(cg);
63+
64+
}
65+
66+
public void beginFullScan(DataLevel level) {
67+
currentGenerations.get(level).incrementAndGet();
68+
}
69+
70+
/**
71+
* The purpose of this method is to remove any tablets that were added before beginFullScan() was
72+
* called. The purpose of this is to handle tablets that were queued for compaction for a while
73+
* and because of some change no longer need to compact. If a full scan of the metadata table does
74+
* not find any new work for tablet, then any previously queued work for that tablet should be
75+
* discarded.
76+
*
77+
* @param level full metadata scans are done independently per DataLevel, so the tracking what
78+
* needs to be removed must be done per DataLevel
79+
*/
80+
public void endFullScan(DataLevel level) {
81+
priorityQueues.values()
82+
.forEach(pq -> pq.removeOlderGenerations(level, currentGenerations.get(level).get()));
5183
}
5284

5385
public void add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) {
@@ -155,7 +187,8 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,
155187

156188
var pq = priorityQueues.computeIfAbsent(groupId,
157189
gid -> new CompactionJobPriorityQueue(gid, queueSize));
158-
while (pq.add(tabletMetadata, jobs) < 0) {
190+
while (pq.add(tabletMetadata, jobs,
191+
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) {
159192
// When entering this loop its expected the queue is closed
160193
Preconditions.checkState(pq.isClosed());
161194
// This loop handles race condition where poll() closes empty priority queues. The queue could

server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ public void testGetCompactionJob() throws Exception {
294294
TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class);
295295
expect(tm.getExtent()).andReturn(ke).anyTimes();
296296
expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes();
297+
expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes();
297298

298299
EasyMock.replay(tconf, context, creds, tm, security);
299300

server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void testTabletFileReplacement() {
7171
EasyMock.replay(tm, cj1, cj2);
7272

7373
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2);
74-
assertEquals(1, queue.add(tm, List.of(cj1)));
74+
assertEquals(1, queue.add(tm, List.of(cj1), 1L));
7575

7676
MetaJob job = queue.peek();
7777
assertEquals(cj1, job.getJob());
@@ -84,7 +84,7 @@ public void testTabletFileReplacement() {
8484
assertEquals(1, queue.getQueuedJobs());
8585

8686
// replace the files for the same tablet
87-
assertEquals(1, queue.add(tm, List.of(cj2)));
87+
assertEquals(1, queue.add(tm, List.of(cj2), 1L));
8888

8989
job = queue.peek();
9090
assertEquals(cj2, job.getJob());
@@ -126,7 +126,7 @@ public void testAddEqualToMaxSize() {
126126
EasyMock.replay(tm, cj1, cj2);
127127

128128
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2);
129-
assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
129+
assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
130130

131131
EasyMock.verify(tm, cj1, cj2);
132132

@@ -183,7 +183,7 @@ public void testAddMoreThanMax() {
183183
EasyMock.replay(tm, cj1, cj2, cj3);
184184

185185
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2);
186-
assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3)));
186+
assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L));
187187

188188
EasyMock.verify(tm, cj1, cj2, cj3);
189189

@@ -234,7 +234,7 @@ public void testAddAfterClose() {
234234
EasyMock.replay(tm, cj1, cj2);
235235

236236
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2);
237-
assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
237+
assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
238238

239239
assertFalse(queue.closeIfEmpty());
240240

@@ -257,7 +257,7 @@ public void testAddAfterClose() {
257257

258258
assertTrue(queue.closeIfEmpty());
259259

260-
assertEquals(-1, queue.add(tm, List.of(cj1, cj2)));
260+
assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L));
261261

262262
}
263263

@@ -295,7 +295,7 @@ public void test() {
295295
// create and add 1000 jobs
296296
for (int x = 0; x < 1000; x++) {
297297
Pair<TabletMetadata,CompactionJob> pair = createJob();
298-
queue.add(pair.getFirst(), Set.of(pair.getSecond()));
298+
queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L);
299299
expected.add(pair.getSecond());
300300
}
301301

0 commit comments

Comments
 (0)