Skip to content

Commit b926c89

Browse files
Remove CompactorGroupIdImpl in favor of CompactorGroupId, add cache (apache#4313)
* Remove CompactorGroupIdImpl in favor of CompactorGroupId, use cache --------- Co-authored-by: Keith Turner <kturner@apache.org>
1 parent cca3683 commit b926c89

File tree

13 files changed

+49
-74
lines changed

13 files changed

+49
-74
lines changed

core/src/main/java/org/apache/accumulo/core/metadata/schema/CompactionMetadata.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.accumulo.core.metadata.StoredTabletFile;
3232
import org.apache.accumulo.core.spi.compaction.CompactionKind;
3333
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
34-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
3534

3635
public class CompactionMetadata {
3736

@@ -126,9 +125,8 @@ public static CompactionMetadata fromJson(String json) {
126125

127126
return new CompactionMetadata(jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
128127
StoredTabletFile.of(jData.tmp).getTabletFile(), jData.compactor,
129-
CompactionKind.valueOf(jData.kind), jData.priority,
130-
CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels,
131-
jData.fateId == null ? null : FateId.from(jData.fateId));
128+
CompactionKind.valueOf(jData.kind), jData.priority, CompactorGroupId.of(jData.groupId),
129+
jData.propDels, jData.fateId == null ? null : FateId.from(jData.fateId));
132130
}
133131

134132
@Override

core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactorGroupId.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.accumulo.core.spi.compaction;
2020

2121
import org.apache.accumulo.core.data.AbstractId;
22+
import org.apache.accumulo.core.util.cache.Caches;
23+
24+
import com.github.benmanes.caffeine.cache.Cache;
2225

2326
/**
2427
* A unique identifier for a compactor group that a {@link CompactionPlanner} can schedule
@@ -28,10 +31,23 @@
2831
* @see org.apache.accumulo.core.spi.compaction
2932
*/
3033
public class CompactorGroupId extends AbstractId<CompactorGroupId> {
31-
// ELASTICITY_TODO make this cache ids like TableId. This will help save manager memory.
3234
private static final long serialVersionUID = 1L;
3335

34-
protected CompactorGroupId(String canonical) {
36+
static final Cache<String,CompactorGroupId> cache = Caches.getInstance()
37+
.createNewBuilder(Caches.CacheName.COMPACTOR_GROUP_ID, false).weakValues().build();
38+
39+
private CompactorGroupId(String canonical) {
3540
super(canonical);
3641
}
42+
43+
/**
44+
* Get a CompactorGroupId object for the provided canonical string. This is guaranteed to be
45+
* non-null.
46+
*
47+
* @param canonical compactor group ID string
48+
* @return CompactorGroupId object
49+
*/
50+
public static CompactorGroupId of(String canonical) {
51+
return cache.get(canonical, CompactorGroupId::new);
52+
}
3753
}

core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public enum CacheName {
4242
COMPACTION_CONFIGS,
4343
COMPACTION_DIR_CACHE,
4444
COMPACTION_DISPATCHERS,
45+
COMPACTOR_GROUP_ID,
4546
COMPRESSION_ALGORITHM,
4647
CRYPT_PASSWORDS,
4748
HOST_REGEX_BALANCER_TABLE_REGEX,

core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public GroupManager getGroupManager() {
6767

6868
@Override
6969
public CompactorGroupId getGroup(String name) {
70-
var cgid = CompactorGroupIdImpl.groupId(name);
70+
var cgid = CompactorGroupId.of(name);
7171
Preconditions.checkArgument(!getRequestedGroups().contains(cgid),
7272
"Duplicate compactor group for group: " + name);
7373
getRequestedGroups().add(cgid);

core/src/main/java/org/apache/accumulo/core/util/compaction/CompactorGroupIdImpl.java

-34
This file was deleted.

core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@
7676
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
7777
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
7878
import org.apache.accumulo.core.spi.compaction.CompactionKind;
79+
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
7980
import org.apache.accumulo.core.tabletserver.log.LogEntry;
80-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
8181
import org.apache.hadoop.fs.Path;
8282
import org.apache.hadoop.io.Text;
8383
import org.junit.jupiter.api.Test;
@@ -431,9 +431,8 @@ public void testBuilder() {
431431
assertThrows(IllegalStateException.class, tm2::getCompacted);
432432

433433
var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
434-
CompactionMetadata ecm =
435-
new CompactionMetadata(Set.of(sf1, sf2), rf1, "cid1", CompactionKind.USER, (short) 3,
436-
CompactorGroupIdImpl.groupId("Q1"), true, FateId.from(type, 99L));
434+
CompactionMetadata ecm = new CompactionMetadata(Set.of(sf1, sf2), rf1, "cid1",
435+
CompactionKind.USER, (short) 3, CompactorGroupId.of("Q1"), true, FateId.from(type, 99L));
437436

438437
LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
439438
LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());

core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java

+15-16
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
5555
import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
5656
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
57-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
5857
import org.easymock.EasyMock;
5958
import org.junit.jupiter.api.Test;
6059

@@ -178,7 +177,7 @@ public void testRunningCompaction() {
178177
// planner should compact.
179178
var job = getOnlyElement(plan.getJobs());
180179
assertEquals(candidates, job.getFiles());
181-
assertEquals(CompactorGroupIdImpl.groupId("medium"), job.getGroup());
180+
assertEquals(CompactorGroupId.of("medium"), job.getGroup());
182181
}
183182

184183
@Test
@@ -201,7 +200,7 @@ public void testUserCompaction() {
201200
// a running non-user compaction should not prevent a user compaction
202201
var job = getOnlyElement(plan.getJobs());
203202
assertEquals(candidates, job.getFiles());
204-
assertEquals(CompactorGroupIdImpl.groupId("medium"), job.getGroup());
203+
assertEquals(CompactorGroupId.of("medium"), job.getGroup());
205204
assertEquals(CompactionJobPrioritizer.createPriority(TableId.of("42"), CompactionKind.USER,
206205
all.size(), job.getFiles().size()), job.getPriority());
207206

@@ -221,7 +220,7 @@ public void testUserCompaction() {
221220
plan = planner.makePlan(params);
222221
job = getOnlyElement(plan.getJobs());
223222
assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles());
224-
assertEquals(CompactorGroupIdImpl.groupId("small"), job.getGroup());
223+
assertEquals(CompactorGroupId.of("small"), job.getGroup());
225224

226225
// should compact all 15
227226
all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", "64M", "F8", "128M",
@@ -231,7 +230,7 @@ public void testUserCompaction() {
231230
plan = planner.makePlan(params);
232231
job = getOnlyElement(plan.getJobs());
233232
assertEquals(all, job.getFiles());
234-
assertEquals(CompactorGroupIdImpl.groupId("huge"), job.getGroup());
233+
assertEquals(CompactorGroupId.of("huge"), job.getGroup());
235234

236235
// For user compaction, can compact a subset that meets the compaction ratio if there is also a
237236
// larger set of files that meets the compaction ratio
@@ -241,7 +240,7 @@ public void testUserCompaction() {
241240
plan = planner.makePlan(params);
242241
job = getOnlyElement(plan.getJobs());
243242
assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), job.getFiles());
244-
assertEquals(CompactorGroupIdImpl.groupId("small"), job.getGroup());
243+
assertEquals(CompactorGroupId.of("small"), job.getGroup());
245244

246245
// There is a subset of small files that meets the compaction ratio, but the larger set does not
247246
// so compact everything to avoid doing more than logarithmic work
@@ -250,7 +249,7 @@ public void testUserCompaction() {
250249
plan = planner.makePlan(params);
251250
job = getOnlyElement(plan.getJobs());
252251
assertEquals(all, job.getFiles());
253-
assertEquals(CompactorGroupIdImpl.groupId("medium"), job.getGroup());
252+
assertEquals(CompactorGroupId.of("medium"), job.getGroup());
254253

255254
}
256255

@@ -267,14 +266,14 @@ public void testMaxSize() {
267266
// should only compact files less than max size
268267
var job = getOnlyElement(plan.getJobs());
269268
assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), job.getFiles());
270-
assertEquals(CompactorGroupIdImpl.groupId("large"), job.getGroup());
269+
assertEquals(CompactorGroupId.of("large"), job.getGroup());
271270

272271
// user compaction can exceed the max size
273272
params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER);
274273
plan = planner.makePlan(params);
275274
job = getOnlyElement(plan.getJobs());
276275
assertEquals(all, job.getFiles());
277-
assertEquals(CompactorGroupIdImpl.groupId("large"), job.getGroup());
276+
assertEquals(CompactorGroupId.of("large"), job.getGroup());
278277
}
279278

280279
@Test
@@ -300,7 +299,7 @@ public void testMultipleCompactions() {
300299
plan.getJobs().forEach(job -> {
301300
assertEquals(15, job.getFiles().size());
302301
assertEquals(kind, job.getKind());
303-
assertEquals(CompactorGroupIdImpl.groupId("small"), job.getGroup());
302+
assertEquals(CompactorGroupId.of("small"), job.getGroup());
304303
// ensure the files across all of the jobs are disjoint
305304
job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
306305
});
@@ -375,7 +374,7 @@ public void testMultipleCompactionsAndRunningCompactions() {
375374
plan.getJobs().forEach(job -> {
376375
assertEquals(15, job.getFiles().size());
377376
assertEquals(kind, job.getKind());
378-
assertEquals(CompactorGroupIdImpl.groupId("small"), job.getGroup());
377+
assertEquals(CompactorGroupId.of("small"), job.getGroup());
379378
// ensure the files across all of the jobs are disjoint
380379
job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
381380
});
@@ -444,15 +443,15 @@ public void testQueueCreation() {
444443

445444
var job = getOnlyElement(plan.getJobs());
446445
assertEquals(all, job.getFiles());
447-
assertEquals(CompactorGroupIdImpl.groupId("small"), job.getGroup());
446+
assertEquals(CompactorGroupId.of("small"), job.getGroup());
448447

449448
all = createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "100M");
450449
params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM);
451450
plan = planner.makePlan(params);
452451

453452
job = getOnlyElement(plan.getJobs());
454453
assertEquals(all, job.getFiles());
455-
assertEquals(CompactorGroupIdImpl.groupId("midsize"), job.getGroup());
454+
assertEquals(CompactorGroupId.of("midsize"), job.getGroup());
456455
}
457456

458457
/**
@@ -634,8 +633,8 @@ public void testMaxTabletFilesNoCompaction() {
634633
// ensure when a compaction is running and we are over files max but below the compaction ratio
635634
// that a compaction is not planned
636635
all = createCFs(1_000, 2, 2, 2, 2, 2, 2, 2);
637-
var job = new CompactionJobImpl((short) 1, CompactorGroupIdImpl.groupId("ee1"),
638-
createCFs("F1", "1000"), CompactionKind.SYSTEM, Optional.of(false));
636+
var job = new CompactionJobImpl((short) 1, CompactorGroupId.of("ee1"), createCFs("F1", "1000"),
637+
CompactionKind.SYSTEM, Optional.of(false));
639638
params = createPlanningParams(all, all, Set.of(job), 3, CompactionKind.SYSTEM, conf);
640639
plan = planner.makePlan(params);
641640

@@ -672,7 +671,7 @@ public void testMaxTableFilesFallback() {
672671
private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all,
673672
Set<CompactableFile> files) {
674673
return new CompactionPlanImpl.BuilderImpl(kind, all, all)
675-
.addJob((short) all.size(), CompactorGroupIdImpl.groupId("small"), files).build().getJobs()
674+
.addJob((short) all.size(), CompactorGroupId.of("small"), files).build().getJobs()
676675
.iterator().next();
677676
}
678677

core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.accumulo.core.metadata.AccumuloTable;
3636
import org.apache.accumulo.core.spi.compaction.CompactionJob;
3737
import org.apache.accumulo.core.spi.compaction.CompactionKind;
38+
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
3839
import org.junit.jupiter.api.Test;
3940

4041
public class CompactionPrioritizerTest {
@@ -48,7 +49,7 @@ public CompactionJob createJob(CompactionKind kind, String tablet, int numFiles,
4849
}
4950
// TODO pass numFiles
5051
return new CompactionJobImpl(createPriority(TableId.of("1"), kind, totalFiles, numFiles),
51-
CompactorGroupIdImpl.groupId("test"), files, kind, Optional.of(false));
52+
CompactorGroupId.of("test"), files, kind, Optional.of(false));
5253
}
5354

5455
@Test

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@
9797
import org.apache.accumulo.core.util.Retry;
9898
import org.apache.accumulo.core.util.UtilWaitThread;
9999
import org.apache.accumulo.core.util.cache.Caches.CacheName;
100-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
101100
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
102101
import org.apache.accumulo.core.util.compaction.RunningCompaction;
103102
import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -326,7 +325,7 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent
326325
throw new AccumuloSecurityException(credentials.getPrincipal(),
327326
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
328327
}
329-
CompactorGroupId groupId = CompactorGroupIdImpl.groupId(groupName);
328+
CompactorGroupId groupId = CompactorGroupId.of(groupName);
330329
LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress);
331330
TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
332331

@@ -365,7 +364,7 @@ public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credent
365364
LOG.debug(
366365
"Unable to reserve compaction job for {}, pulling another off the queue for group {}",
367366
metaJob.getTabletMetadata().getExtent(), groupName);
368-
metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName));
367+
metaJob = jobQueues.poll(CompactorGroupId.of(groupName));
369368
}
370369
}
371370

server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.apache.accumulo.core.trace.TraceUtil;
6464
import org.apache.accumulo.core.util.cache.Caches;
6565
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
66-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
6766
import org.apache.accumulo.core.util.compaction.RunningCompaction;
6867
import org.apache.accumulo.manager.Manager;
6968
import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
@@ -86,7 +85,7 @@ public class CompactionCoordinatorTest {
8685
private static final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances =
8786
new AtomicReference<>(Map.of());
8887

89-
private static final CompactorGroupId GROUP_ID = CompactorGroupIdImpl.groupId("R2DQ");
88+
private static final CompactorGroupId GROUP_ID = CompactorGroupId.of("R2DQ");
9089

9190
public class TestCoordinator extends CompactionCoordinator {
9291

server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@
3737
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
3838
import org.apache.accumulo.core.util.Pair;
3939
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
40-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
4140
import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob;
4241
import org.apache.hadoop.io.Text;
4342
import org.easymock.EasyMock;
4443
import org.junit.jupiter.api.Test;
4544

4645
public class CompactionJobPriorityQueueTest {
4746

48-
private static final CompactorGroupId GROUP = CompactorGroupIdImpl.groupId("TEST");
47+
private static final CompactorGroupId GROUP = CompactorGroupId.of("TEST");
4948

5049
@Test
5150
public void testTabletFileReplacement() {

server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.accumulo.core.spi.compaction.CompactionKind;
4040
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
4141
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
42-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
4342
import org.apache.hadoop.io.Text;
4443
import org.junit.jupiter.api.Test;
4544

@@ -57,8 +56,8 @@ public void testAddPollRaceCondition() throws Exception {
5756
final int numToAdd = 100_000;
5857

5958
CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1);
60-
CompactorGroupId[] groups = Stream.of("G1", "G2", "G3")
61-
.map(s -> CompactorGroupIdImpl.groupId(s)).toArray(l -> new CompactorGroupId[l]);
59+
CompactorGroupId[] groups =
60+
Stream.of("G1", "G2", "G3").map(CompactorGroupId::of).toArray(CompactorGroupId[]::new);
6261

6362
var executor = Executors.newFixedThreadPool(groups.length);
6463

test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@
7373
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
7474
import org.apache.accumulo.core.util.FastFormat;
7575
import org.apache.accumulo.core.util.Merge;
76-
import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
7776
import org.apache.accumulo.harness.AccumuloClusterHarness;
7877
import org.apache.accumulo.test.TestIngest;
7978
import org.apache.accumulo.test.TestIngest.IngestParams;
@@ -697,7 +696,7 @@ public void testCompactionMetadata() throws Exception {
697696

698697
ReferencedTabletFile tmpFile =
699698
ReferencedTabletFile.of(new Path("file:///accumulo/tables/t-0/b-0/c1.rf"));
700-
CompactorGroupId ceid = CompactorGroupIdImpl.groupId("G1");
699+
CompactorGroupId ceid = CompactorGroupId.of("G1");
701700
Set<StoredTabletFile> jobFiles =
702701
Set.of(StoredTabletFile.of(new Path("file:///accumulo/tables/t-0/b-0/b2.rf")));
703702
CompactionMetadata ecMeta = new CompactionMetadata(jobFiles, tmpFile, "localhost:4444",

0 commit comments

Comments
 (0)