Skip to content

Commit 2a12b13

Browse files
authored
Used data size instead of entry size of compaction queue (apache#5252)
Modified the compaction queue limit to use the data size of the compaction jobs instead of the number of compaction jobs for limiting the number of compaction jobs buffered. fixes apache#5186
1 parent 0cc90ef commit 2a12b13

File tree

10 files changed

+285
-56
lines changed

10 files changed

+285
-56
lines changed

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -465,14 +465,12 @@ public enum Property {
465465
"The number of threads used to seed fate split task, the actual split work is done by fate"
466466
+ " threads.",
467467
"4.0.0"),
468-
469-
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
470-
"manager.compaction.major.service.queue.initial.size", "10000", PropertyType.COUNT,
471-
"The initial size of each resource groups compaction job priority queue.", "4.0.0"),
472-
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
473-
"manager.compaction.major.service.queue.size.factor", "3.0", PropertyType.FRACTION,
474-
"The dynamic resizing of the compaction job priority queue is based on"
475-
+ " the number of compactors for the group multiplied by this factor.",
468+
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
469+
"1M", PropertyType.MEMORY,
470+
"The data size of each resource groups compaction job priority queue. The memory size of "
471+
+ "each compaction job is estimated and the sum of these sizes per resource group will not "
472+
+ "exceed this setting. When the size is exceeded the lowest priority jobs are dropped as "
473+
+ "needed.",
476474
"4.0.0"),
477475
SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
478476
"System wide properties related to splitting tablets.", "3.1.0"),
@@ -1460,7 +1458,7 @@ public static boolean isValidTablePropertyKey(String key) {
14601458
RPC_MAX_MESSAGE_SIZE,
14611459

14621460
// compaction coordiantor properties
1463-
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
1461+
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
14641462

14651463
// block cache options
14661464
GENERAL_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ MiniAccumuloConfigImpl initialize() {
188188

189189
mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true");
190190

191-
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(),
192-
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue());
191+
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(),
192+
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue());
193193
mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
194194
Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
195195

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public class CompactionCoordinator
194194
private final Manager manager;
195195

196196
private final LoadingCache<String,Integer> compactorCounts;
197-
private final int jobQueueInitialSize;
197+
private final long jobQueueInitialSize;
198198

199199
private volatile long coordinatorStartTime;
200200

@@ -208,8 +208,8 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
208208
this.security = security;
209209
this.manager = Objects.requireNonNull(manager);
210210

211-
this.jobQueueInitialSize = ctx.getConfiguration()
212-
.getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE);
211+
this.jobQueueInitialSize =
212+
ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE);
213213

214214
this.jobQueues = new CompactionJobQueues(jobQueueInitialSize);
215215

@@ -1121,8 +1121,6 @@ private void cleanUpEmptyCompactorPathInZK() {
11211121
final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS;
11221122

11231123
final var zoorw = this.ctx.getZooSession().asReaderWriter();
1124-
final double queueSizeFactor = ctx.getConfiguration()
1125-
.getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR);
11261124

11271125
try {
11281126
var groups = zoorw.getChildren(compactorQueuesPath);
@@ -1139,7 +1137,6 @@ private void cleanUpEmptyCompactorPathInZK() {
11391137
CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
11401138
if (queue != null) {
11411139
queue.clearIfInactive(Duration.ofMinutes(10));
1142-
queue.setMaxSize(this.jobQueueInitialSize);
11431140
}
11441141
} else {
11451142
int aliveCompactorsForGroup = 0;
@@ -1152,16 +1149,8 @@ private void cleanUpEmptyCompactorPathInZK() {
11521149
aliveCompactorsForGroup++;
11531150
}
11541151
}
1155-
CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
1156-
if (queue != null) {
1157-
queue.setMaxSize(Math.min(
1158-
Math.max(1, (int) (aliveCompactorsForGroup * queueSizeFactor)), Integer.MAX_VALUE));
1159-
}
1160-
11611152
}
1162-
11631153
}
1164-
11651154
} catch (KeeperException | RuntimeException e) {
11661155
LOG.warn("Failed to clean up compactors", e);
11671156
} catch (InterruptedException e) {

server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@
3131
import java.util.Objects;
3232
import java.util.Optional;
3333
import java.util.Set;
34-
import java.util.TreeMap;
3534
import java.util.concurrent.CompletableFuture;
3635
import java.util.concurrent.ConcurrentHashMap;
3736
import java.util.concurrent.TimeUnit;
38-
import java.util.concurrent.atomic.AtomicInteger;
3937
import java.util.concurrent.atomic.AtomicLong;
4038
import java.util.concurrent.atomic.AtomicReference;
4139

@@ -116,8 +114,8 @@ public boolean equals(Object o) {
116114
// behavior is not supported with a PriorityQueue. Second a PriorityQueue does not support
117115
// efficiently removing entries from anywhere in the queue. Efficient removal is needed for the
118116
// case where tablets decided to issues different compaction jobs than what is currently queued.
119-
private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
120-
private final AtomicInteger maxSize;
117+
private final SizeTrackingTreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
118+
private final AtomicLong maxSize;
121119
private final AtomicLong rejectedJobs;
122120
private final AtomicLong dequeuedJobs;
123121
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> futures;
@@ -142,9 +140,10 @@ private TabletJobs(long generation, HashSet<CjpqKey> jobs) {
142140

143141
private final AtomicLong nextSeq = new AtomicLong(0);
144142

145-
public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
146-
this.jobQueue = new TreeMap<>();
147-
this.maxSize = new AtomicInteger(maxSize);
143+
public CompactionJobPriorityQueue(CompactorGroupId groupId, long maxSize,
144+
SizeTrackingTreeMap.Weigher<CompactionJobQueues.MetaJob> weigher) {
145+
this.jobQueue = new SizeTrackingTreeMap<>(weigher);
146+
this.maxSize = new AtomicLong(maxSize);
148147
this.tabletJobs = new HashMap<>();
149148
this.groupId = groupId;
150149
this.rejectedJobs = new AtomicLong(0);
@@ -230,11 +229,11 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection<Compaction
230229
return jobsAdded;
231230
}
232231

233-
public synchronized int getMaxSize() {
232+
public synchronized long getMaxSize() {
234233
return maxSize.get();
235234
}
236235

237-
public synchronized void setMaxSize(int maxSize) {
236+
public synchronized void setMaxSize(long maxSize) {
238237
Preconditions.checkArgument(maxSize > 0,
239238
"Maximum size of the Compaction job priority queue must be greater than 0");
240239
this.maxSize.set(maxSize);
@@ -249,7 +248,7 @@ public long getDequeuedJobs() {
249248
}
250249

251250
public synchronized long getQueuedJobs() {
252-
return jobQueue.size();
251+
return jobQueue.entrySize();
253252
}
254253

255254
public synchronized long getLowestPriority() {
@@ -332,7 +331,7 @@ private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges)
332331
}
333332

334333
private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) {
335-
if (jobQueue.size() >= maxSize.get()) {
334+
if (jobQueue.dataSize() >= maxSize.get()) {
336335
var lastEntry = jobQueue.lastKey();
337336
if (job.getPriority() <= lastEntry.job.getPriority()) {
338337
// the queue is full and this job has a lower or same priority than the lowest job in the

server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,20 @@ public class CompactionJobQueues {
4848
private final ConcurrentHashMap<CompactorGroupId,CompactionJobPriorityQueue> priorityQueues =
4949
new ConcurrentHashMap<>();
5050

51-
private final int queueSize;
51+
private final long queueSize;
5252

5353
private final Map<DataLevel,AtomicLong> currentGenerations;
5454

55-
public CompactionJobQueues(int queueSize) {
55+
private SizeTrackingTreeMap.Weigher<MetaJob> weigher =
56+
val -> val.getTabletMetadata().toString().length() + val.getJob().toString().length();
57+
58+
public CompactionJobQueues(long queueSize) {
5659
this.queueSize = queueSize;
5760
Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class);
5861
for (var level : DataLevel.values()) {
5962
cg.put(level, new AtomicLong());
6063
}
6164
currentGenerations = Collections.unmodifiableMap(cg);
62-
6365
}
6466

6567
public void beginFullScan(DataLevel level) {
@@ -164,7 +166,7 @@ public TabletMetadata getTabletMetadata() {
164166
*/
165167
public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
166168
var pq = priorityQueues.computeIfAbsent(groupId,
167-
gid -> new CompactionJobPriorityQueue(gid, queueSize));
169+
gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher));
168170
return pq.getAsync();
169171
}
170172

@@ -187,7 +189,7 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,
187189
}
188190

189191
var pq = priorityQueues.computeIfAbsent(groupId,
190-
gid -> new CompactionJobPriorityQueue(gid, queueSize));
192+
gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher));
191193
pq.add(tabletMetadata, jobs,
192194
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
193195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.manager.compaction.queue;
20+
21+
import java.util.AbstractMap;
22+
import java.util.Map;
23+
import java.util.TreeMap;
24+
25+
import com.google.common.base.Preconditions;
26+
27+
/**
28+
* This class wraps a treemap and tracks the data size of everything added and removed from the
29+
* treemap.
30+
*/
31+
class SizeTrackingTreeMap<K,V> {
32+
33+
private static class ValueWrapper<V2> {
34+
final V2 val;
35+
final long computedSize;
36+
37+
private ValueWrapper(V2 val, long computedSize) {
38+
this.val = val;
39+
this.computedSize = computedSize;
40+
}
41+
}
42+
43+
private final TreeMap<K,ValueWrapper<V>> map = new TreeMap<>();
44+
private long dataSize = 0;
45+
private Weigher<V> weigher;
46+
47+
private Map.Entry<K,V> unwrap(Map.Entry<K,ValueWrapper<V>> wrapperEntry) {
48+
if (wrapperEntry == null) {
49+
return null;
50+
}
51+
return new AbstractMap.SimpleImmutableEntry<>(wrapperEntry.getKey(),
52+
wrapperEntry.getValue().val);
53+
}
54+
55+
private void incrementDataSize(ValueWrapper<V> val) {
56+
Preconditions.checkState(dataSize >= 0);
57+
dataSize += val.computedSize;
58+
}
59+
60+
private void decrementDataSize(Map.Entry<K,ValueWrapper<V>> entry) {
61+
if (entry != null) {
62+
decrementDataSize(entry.getValue());
63+
}
64+
}
65+
66+
private void decrementDataSize(ValueWrapper<V> val) {
67+
if (val != null) {
68+
Preconditions.checkState(dataSize >= val.computedSize);
69+
dataSize -= val.computedSize;
70+
}
71+
}
72+
73+
interface Weigher<V2> {
74+
long weigh(V2 val);
75+
}
76+
77+
public SizeTrackingTreeMap(Weigher<V> weigher) {
78+
this.weigher = weigher;
79+
}
80+
81+
public boolean isEmpty() {
82+
return map.isEmpty();
83+
}
84+
85+
public long dataSize() {
86+
return dataSize;
87+
}
88+
89+
public int entrySize() {
90+
return map.size();
91+
}
92+
93+
public K lastKey() {
94+
return map.lastKey();
95+
}
96+
97+
public Map.Entry<K,V> firstEntry() {
98+
return unwrap(map.firstEntry());
99+
}
100+
101+
public void remove(K key) {
102+
var prev = map.remove(key);
103+
decrementDataSize(prev);
104+
}
105+
106+
public Map.Entry<K,V> pollFirstEntry() {
107+
var first = map.pollFirstEntry();
108+
decrementDataSize(first);
109+
return unwrap(first);
110+
}
111+
112+
public Map.Entry<K,V> pollLastEntry() {
113+
var last = map.pollLastEntry();
114+
decrementDataSize(last);
115+
return unwrap(last);
116+
}
117+
118+
public void put(K key, V val) {
119+
var wrapped = new ValueWrapper<>(val, weigher.weigh(val));
120+
var prev = map.put(key, wrapped);
121+
decrementDataSize(prev);
122+
incrementDataSize(wrapped);
123+
}
124+
125+
public void clear() {
126+
map.clear();
127+
dataSize = 0;
128+
}
129+
}

0 commit comments

Comments
 (0)