Skip to content

Commit 3aecd36

Browse files
committed
Merge branch 'main' into 4973-new-monitor-metrics
2 parents 82adf95 + a907748 commit 3aecd36

File tree

59 files changed

+1521
-1985
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1521
-1985
lines changed

core/src/main/java/org/apache/accumulo/core/Constants.java

-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ public class Constants {
7474
public static final String ZDEAD = "/dead";
7575
public static final String ZDEADTSERVERS = ZDEAD + "/tservers";
7676

77-
public static final String ZPROBLEMS = "/problems";
78-
7977
public static final String ZFATE = "/fate";
8078

8179
public static final String ZNEXT_FILE = "/next_file";

core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -278,18 +278,23 @@ public List<ActiveScan> getActiveScans(String tserver)
278278
@Override
279279
public List<ActiveScan> getActiveScans(Collection<ServerId> servers)
280280
throws AccumuloException, AccumuloSecurityException {
281+
servers.forEach(InstanceOperationsImpl::checkActiveScanServer);
281282
return queryServers(servers, this::getActiveScans, INSTANCE_OPS_SCANS_FINDER_POOL);
282283
}
283284

284-
private List<ActiveScan> getActiveScans(ServerId server)
285-
throws AccumuloException, AccumuloSecurityException {
286-
285+
private static void checkActiveScanServer(ServerId server) {
287286
Objects.requireNonNull(server);
288287
Preconditions.checkArgument(
289288
server.getType() == ServerId.Type.SCAN_SERVER
290289
|| server.getType() == ServerId.Type.TABLET_SERVER,
291290
"Server type %s is not %s or %s.", server.getType(), ServerId.Type.SCAN_SERVER,
292291
ServerId.Type.TABLET_SERVER);
292+
}
293+
294+
private List<ActiveScan> getActiveScans(ServerId server)
295+
throws AccumuloException, AccumuloSecurityException {
296+
297+
checkActiveScanServer(server);
293298

294299
final var parsedTserver = HostAndPort.fromParts(server.getHost(), server.getPort());
295300
TabletScanClientService.Client rpcClient = null;
@@ -335,12 +340,7 @@ public List<ActiveCompaction> getActiveCompactions(String server)
335340
private List<ActiveCompaction> getActiveCompactions(ServerId server)
336341
throws AccumuloException, AccumuloSecurityException {
337342

338-
Objects.requireNonNull(server);
339-
Preconditions.checkArgument(
340-
server.getType() == ServerId.Type.COMPACTOR
341-
|| server.getType() == ServerId.Type.TABLET_SERVER,
342-
"Server type %s is not %s or %s.", server.getType(), ServerId.Type.COMPACTOR,
343-
ServerId.Type.TABLET_SERVER);
343+
checkActiveCompactionServer(server);
344344

345345
final HostAndPort serverHostAndPort = HostAndPort.fromParts(server.getHost(), server.getPort());
346346
final List<ActiveCompaction> as = new ArrayList<>();
@@ -371,6 +371,13 @@ private List<ActiveCompaction> getActiveCompactions(ServerId server)
371371
}
372372
}
373373

374+
private static void checkActiveCompactionServer(ServerId server) {
375+
Objects.requireNonNull(server);
376+
Preconditions.checkArgument(
377+
server.getType() == Type.COMPACTOR || server.getType() == Type.TABLET_SERVER,
378+
"Server type %s is not %s or %s.", server.getType(), Type.COMPACTOR, Type.TABLET_SERVER);
379+
}
380+
374381
@Override
375382
@Deprecated
376383
public List<ActiveCompaction> getActiveCompactions()
@@ -386,6 +393,7 @@ public List<ActiveCompaction> getActiveCompactions()
386393
@Override
387394
public List<ActiveCompaction> getActiveCompactions(Collection<ServerId> compactionServers)
388395
throws AccumuloException, AccumuloSecurityException {
396+
compactionServers.forEach(InstanceOperationsImpl::checkActiveCompactionServer);
389397
return queryServers(compactionServers, this::getActiveCompactions,
390398
INSTANCE_OPS_COMPACTIONS_FINDER_POOL);
391399
}

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+40-14
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,11 @@ public enum Property {
362362
GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0", PropertyType.STRING,
363363
"The local IP address to which this server should bind for sending and receiving network traffic.",
364364
"3.0.0"),
365+
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "2m",
366+
PropertyType.TIMEDURATION,
367+
"Interval at which the Manager and TabletServer should verify their server locks. A value of zero"
368+
+ " disables this check. The default value change from 0 to 2m in 3.1.0.",
369+
"2.1.4"),
365370
// properties that are specific to manager server behavior
366371
MANAGER_PREFIX("manager.", null, PropertyType.PREFIX,
367372
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
@@ -425,6 +430,12 @@ public enum Property {
425430
"The number of threads used to run fault-tolerant executions (FATE)."
426431
+ " These are primarily table operations like merge.",
427432
"1.4.3"),
433+
MANAGER_FATE_IDLE_CHECK_INTERVAL("manager.fate.idle.check.interval", "60m",
434+
PropertyType.TIMEDURATION,
435+
"The interval at which to check if the number of idle Fate threads has consistently been zero."
436+
+ " The way this is checked is an approximation. Logs a warning in the Manager log to increase"
437+
+ " MANAGER_FATE_THREADPOOL_SIZE. A value of zero disables this check and has a maximum value of 60m.",
438+
"4.0.0"),
428439
MANAGER_STATUS_THREAD_POOL_SIZE("manager.status.threadpool.size", "0", PropertyType.COUNT,
429440
"The number of threads to use when fetching the tablet server status for balancing. Zero "
430441
+ "indicates an unlimited number of threads will be used.",
@@ -921,6 +932,14 @@ public enum Property {
921932
"The maximum amount of memory that will be used to cache results of a client query/scan. "
922933
+ "Once this limit is reached, the buffered data is sent to the client.",
923934
"1.3.5"),
935+
TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "100", PropertyType.COUNT,
936+
"The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited.",
937+
"2.1.0"),
938+
TABLE_BULK_MAX_TABLET_FILES("table.bulk.max.tablet.files", "100", PropertyType.COUNT,
939+
"The maximum number of files a bulk import can add to a single tablet. When this property "
940+
+ "is exceeded for any tablet the entire bulk import operation will fail before making any "
941+
+ "changes. Value of 0 is unlimited.",
942+
"4.0.0"),
924943
TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.FILENAME_EXT,
925944
"Change the type of file a table writes.", "1.3.5"),
926945
TABLE_LOAD_BALANCER("table.balancer", "org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer",
@@ -947,17 +966,28 @@ public enum Property {
947966
+ " defaults are used.",
948967
"1.3.5"),
949968
TABLE_FILE_MAX("table.file.max", "15", PropertyType.COUNT,
950-
"The maximum number of RFiles each tablet in a table can have. When"
951-
+ " adjusting this property you may want to consider adjusting"
952-
+ " table.compaction.major.ratio also. Setting this property to 0 will make"
953-
+ " it default to tserver.scan.files.open.max-1, this will prevent a tablet"
954-
+ " from having more RFiles than can be opened. Prior to 2.1.0 this property"
955-
+ " was used to trigger merging minor compactions, but merging minor compactions"
956-
+ " were removed in 2.1.0. Now this property is only used by the"
957-
+ " DefaultCompactionStrategy and the RatioBasedCompactionPlanner."
958-
+ " The RatioBasedCompactionPlanner started using this property in 2.1.3, before"
959-
+ " that it did not use the property.",
969+
"This property is used to signal to the compaction planner that it should be more "
970+
+ "aggressive for compacting tablets that exceed this limit. The "
971+
+ "RatioBasedCompactionPlanner will lower the compaction ratio and increase the "
972+
+ "priority for tablets that exceed this limit. When adjusting this property you may "
973+
+ "want to consider adjusting table.compaction.major.ratio also. Setting this property "
974+
+ "to 0 will make it default to tserver.scan.files.open.max-1, this will prevent a tablet"
975+
+ " from having more RFiles than can be opened by a scan.",
960976
"1.4.0"),
977+
TABLE_FILE_PAUSE("table.file.pause", "100", PropertyType.COUNT,
978+
"When a tablet has more than this number of files, bulk imports and minor compactions "
979+
+ "will wait until the tablet has less files before proceeding. This will cause back "
980+
+ "pressure on bulk imports and writes to tables when compactions are not keeping up. "
981+
+ "Only the number of files a tablet currently has is considered for pausing, the "
982+
+ "number of files a bulk import will add is not considered. This means a bulk import "
983+
+ "can surge above this limit once causing future bulk imports or minor compactions to "
984+
+ "pause until compactions can catch up. This property plus "
985+
+ TABLE_BULK_MAX_TABLET_FILES.getKey()
986+
+ " determines the total number of files a tablet could temporarily surge to based on bulk "
987+
+ "imports. Ideally this property would be set higher than " + TABLE_FILE_MAX.getKey()
988+
+ " so that compactions are more aggressive prior to reaching the pause point. Value of 0 is "
989+
+ "unlimited.",
990+
"4.0.0"),
961991
TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT,
962992
"The maximum number of files that a merge operation will process. Before "
963993
+ "merging a sum of the number of files in the merge range is computed and if it "
@@ -994,10 +1024,6 @@ public enum Property {
9941024
"1.3.5"),
9951025
TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING,
9961026
"The bloom filter hash type.", "1.3.5"),
997-
TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT,
998-
"The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited. "
999-
+ "This property is only enforced in the new bulk import API.",
1000-
"2.1.0"),
10011027
TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY,
10021028
"The durability used to write to the write-ahead log. Legal values are:"
10031029
+ " none, which skips the write-ahead log; log, which sends the data to the"

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+40-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.time.Duration;
3535
import java.util.EnumSet;
3636
import java.util.Optional;
37+
import java.util.concurrent.ConcurrentLinkedQueue;
3738
import java.util.concurrent.ExecutorService;
3839
import java.util.concurrent.LinkedTransferQueue;
3940
import java.util.concurrent.RejectedExecutionException;
@@ -83,6 +84,7 @@ public class Fate<T> {
8384
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
8485
private final TransferQueue<FateId> workQueue;
8586
private final Thread workFinder;
87+
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>();
8688

8789
public enum TxInfo {
8890
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
@@ -355,7 +357,8 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
355357
// resize the pool if the property changed
356358
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
357359
// If the pool grew, then ensure that there is a TransactionRunner for each thread
358-
int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) - pool.getQueue().size();
360+
final int configured = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
361+
final int needed = configured - pool.getQueue().size();
359362
if (needed > 0) {
360363
for (int i = 0; i < needed; i++) {
361364
try {
@@ -372,6 +375,41 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
372375
break;
373376
}
374377
}
378+
idleCountHistory.clear();
379+
} else {
380+
// The property did not change, but should it based on idle Fate threads? Maintain
381+
// count of the last X minutes of idle Fate threads. If zero 95% of the time, then suggest
382+
// that the
383+
// MANAGER_FATE_THREADPOOL_SIZE be increased.
384+
final long interval = Math.min(60, TimeUnit.MILLISECONDS
385+
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
386+
if (interval == 0) {
387+
idleCountHistory.clear();
388+
} else {
389+
if (idleCountHistory.size() >= interval * 2) { // this task runs every 30s
390+
int zeroFateThreadsIdleCount = 0;
391+
for (Integer idleConsumerCount : idleCountHistory) {
392+
if (idleConsumerCount == 0) {
393+
zeroFateThreadsIdleCount++;
394+
}
395+
}
396+
boolean needMoreThreads =
397+
(zeroFateThreadsIdleCount / (double) idleCountHistory.size()) >= 0.95;
398+
if (needMoreThreads) {
399+
log.warn(
400+
"All Fate threads appear to be busy for the last {} minutes,"
401+
+ " consider increasing property: {}",
402+
interval, Property.MANAGER_FATE_THREADPOOL_SIZE.getKey());
403+
// Clear the history so that we don't log for interval minutes.
404+
idleCountHistory.clear();
405+
} else {
406+
while (idleCountHistory.size() >= interval * 2) {
407+
idleCountHistory.remove();
408+
}
409+
}
410+
}
411+
idleCountHistory.add(workQueue.getWaitingConsumerCount());
412+
}
375413
}
376414
}, 3, 30, SECONDS));
377415
this.transactionExecutor = pool;
@@ -611,5 +649,6 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
611649
if (deadResCleanerExecutor != null) {
612650
deadResCleanerExecutor.shutdownNow();
613651
}
652+
idleCountHistory.clear();
614653
}
615654
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.core.fate.zookeeper;
20+
21+
import java.util.List;
22+
import java.util.Objects;
23+
24+
import com.google.common.base.Preconditions;
25+
26+
/**
27+
* Immutable data class used by zoo cache to hold what it is caching for single zookeeper node. Data
28+
* and children are obtained from zookeeper at different times. This class is structured so that
29+
* data can be obtained first and then children added later or visa veras.
30+
*
31+
* <p>
32+
* Four distinct states can be cached for a zookeeper node.
33+
* <ul>
34+
* <li>Can cache that a node does not exist in zookeeper. This state is represented by data, state,
35+
* and children all being null.</li>
36+
* <li>Can cache only the data for a zookeeper node. For this state data and stat are non-null while
37+
* children is null. Calling getChildren on node in this state will throw an exception.</li>
38+
* <li>Can cache only the children for a zookeeper node. For this state children is non-null while
39+
* data and stat are null. Calling getData or getStat on node in this state will throw an
40+
* exception.</li>
41+
* <li>Can cache the children and data for a zookeeper node. For this state data,stat, and children
42+
* are non-null.</li>
43+
* </ul>
44+
* <p>
45+
*
46+
*/
47+
class ZcNode {
48+
49+
private final byte[] data;
50+
private final ZooCache.ZcStat stat;
51+
private final List<String> children;
52+
53+
static final ZcNode NON_EXISTENT = new ZcNode();
54+
55+
private ZcNode() {
56+
this.data = null;
57+
this.stat = null;
58+
this.children = null;
59+
}
60+
61+
/**
62+
* Creates a new ZcNode that combines the data and stat from an existing ZcNode and sets the
63+
* children.
64+
*/
65+
ZcNode(List<String> children, ZcNode existing) {
66+
Objects.requireNonNull(children);
67+
if (existing == null) {
68+
this.data = null;
69+
this.stat = null;
70+
} else {
71+
this.data = existing.data;
72+
this.stat = existing.stat;
73+
}
74+
75+
this.children = List.copyOf(children);
76+
}
77+
78+
/**
79+
* Creates a new ZcNode that combines the children from an existing ZcNode and sets the data and
80+
* stat.
81+
*/
82+
ZcNode(byte[] data, ZooCache.ZcStat zstat, ZcNode existing) {
83+
this.data = Objects.requireNonNull(data);
84+
this.stat = Objects.requireNonNull(zstat);
85+
if (existing == null) {
86+
this.children = null;
87+
} else {
88+
this.children = existing.children;
89+
}
90+
}
91+
92+
/**
93+
* @return the data if the node exists and the data was set OR return null when the node does not
94+
* exist
95+
* @throws IllegalStateException in the case where the node exists and the data was never set
96+
*/
97+
byte[] getData() {
98+
Preconditions.checkState(cachedData());
99+
return data;
100+
}
101+
102+
/**
103+
* @return the stat if the node exists and the stat was set OR return null when the node does not
104+
* exist
105+
* @throws IllegalStateException in the case where the node exists and the data was never set
106+
*/
107+
ZooCache.ZcStat getStat() {
108+
Preconditions.checkState(cachedData());
109+
return stat;
110+
}
111+
112+
/**
113+
* @return the children if the node exists and the children were set OR return null when the node
114+
* does not exist exists
115+
* @throws IllegalStateException in the case where the node exists and the children were never set
116+
*/
117+
List<String> getChildren() {
118+
Preconditions.checkState(cachedChildren());
119+
return children;
120+
}
121+
122+
/**
123+
* @return true if the node does not exists or it exists and children are cached.
124+
*/
125+
boolean cachedChildren() {
126+
return children != null || notExists();
127+
}
128+
129+
/**
130+
* @return true if the node does not exists or it exists and data and stat cached.
131+
*/
132+
boolean cachedData() {
133+
return data != null || notExists();
134+
}
135+
136+
/**
137+
* @return true if the node does not exists in zookeeper
138+
*/
139+
boolean notExists() {
140+
return stat == null && data == null && children == null;
141+
}
142+
}

0 commit comments

Comments
 (0)