Skip to content

Commit 52f7f17

Browse files
committed
Small improvements
1 parent f977c0b commit 52f7f17

File tree

7 files changed

+74
-82
lines changed

7 files changed

+74
-82
lines changed

core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030

3131
public interface MetricsInfo {
3232

33-
public static final String INSTANCE_NAME_TAG_KEY = "instance.name";
34-
public static final String PROCESS_NAME_TAG_KEY = "process.name";
35-
public static final String RESOURCE_GROUP_TAG_KEY = "resource.group";
36-
public static final String HOST_TAG_KEY = "host";
37-
public static final String PORT_TAG_KEY = "port";
33+
String INSTANCE_NAME_TAG_KEY = "instance.name";
34+
String PROCESS_NAME_TAG_KEY = "process.name";
35+
String RESOURCE_GROUP_TAG_KEY = "resource.group";
36+
String HOST_TAG_KEY = "host";
37+
String PORT_TAG_KEY = "port";
3838

3939
/**
4040
* Convenience method to create tag name / value pair for the instance name

server/base/src/main/java/org/apache/accumulo/server/metrics/MetricResponseWrapper.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.nio.ByteBuffer;
2828
import java.util.List;
29+
import java.util.Set;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.function.Consumer;
3132
import java.util.stream.Collectors;
@@ -81,11 +82,11 @@ public MetricResponseWrapper(FlatBufferBuilder builder) {
8182
* Remove tags from the Metric that duplicate other information found in the MetricResponse
8283
*/
8384
private List<Tag> reduceTags(List<Tag> tags, List<Tag> extraTags) {
84-
return Stream.concat(tags.stream(), extraTags.stream()).filter(t -> {
85-
return !t.getKey().equals(INSTANCE_NAME_TAG_KEY) && !t.getKey().equals(PROCESS_NAME_TAG_KEY)
86-
&& !t.getKey().equals(RESOURCE_GROUP_TAG_KEY) && !t.getKey().equals(HOST_TAG_KEY)
87-
&& !t.getKey().equals(PORT_TAG_KEY);
88-
}).collect(Collectors.toList());
85+
final Set<String> existingTags = Set.of(INSTANCE_NAME_TAG_KEY, PROCESS_NAME_TAG_KEY,
86+
RESOURCE_GROUP_TAG_KEY, HOST_TAG_KEY, PORT_TAG_KEY);
87+
88+
return Stream.concat(tags.stream(), extraTags.stream())
89+
.filter(t -> !existingTags.contains(t.getKey())).collect(Collectors.toList());
8990
}
9091

9192
private void parseAndCreateCommonInfo(Meter.Id id, List<Tag> extraTags) {

server/base/src/main/java/org/apache/accumulo/server/metrics/MetricServiceHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ public void setHost(HostAndPort host) {
5656
}
5757

5858
@Override
59-
public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials)
60-
throws ThriftSecurityException, TException {
59+
public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials) throws TException {
6160

6261
if (!(ctx.getSecurityOperation().isSystemUser(credentials)
6362
&& ctx.getSecurityOperation().authenticateUser(credentials, credentials))) {

server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java

+13-18
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,11 @@ public static class TimeOrderedRunningCompactionSet {
172172

173173
private static final int UPPER_LIMIT = 50;
174174

175+
Comparator<RunningCompaction> oldestFirstComparator =
176+
Comparator.comparingLong(RunningCompaction::getStartTime)
177+
.thenComparing(rc -> rc.getJob().getExternalCompactionId());
175178
private final ConcurrentSkipListSet<RunningCompaction> compactions =
176-
new ConcurrentSkipListSet<>(new Comparator<RunningCompaction>() {
177-
@Override
178-
public int compare(RunningCompaction rc1, RunningCompaction rc2) {
179-
int result = Long.compare(rc1.getStartTime(), rc2.getStartTime());
180-
if (result == 0) {
181-
result = rc1.getJob().getExternalCompactionId()
182-
.compareTo(rc2.getJob().getExternalCompactionId());
183-
}
184-
return result;
185-
}
186-
});
179+
new ConcurrentSkipListSet<>(oldestFirstComparator);
187180

188181
// Tracking size here as ConcurrentSkipListSet.size() is not constant time
189182
private final AtomicInteger size = new AtomicInteger(0);
@@ -381,9 +374,7 @@ public void run() {
381374
update.setState(TCompactionState.IN_PROGRESS);
382375
update.setMessage(RESTART_UPDATE_MSG);
383376
rc.addUpdate(System.currentTimeMillis(), update);
384-
if (!rc.isStartTimeSet()) {
385-
rc.setStartTime(this.coordinatorStartTime);
386-
}
377+
rc.setStartTime(this.coordinatorStartTime);
387378
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
388379
LONG_RUNNING_COMPACTIONS_BY_RG
389380
.computeIfAbsent(rc.getGroupName(), k -> new TimeOrderedRunningCompactionSet()).add(rc);
@@ -1069,8 +1060,10 @@ public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
10691060
case CANCELLED:
10701061
case FAILED:
10711062
case SUCCEEDED:
1072-
LONG_RUNNING_COMPACTIONS_BY_RG
1073-
.getOrDefault(rc.getGroupName(), new TimeOrderedRunningCompactionSet()).remove(rc);
1063+
var compactionSet = LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroupName());
1064+
if (compactionSet != null) {
1065+
compactionSet.remove(rc);
1066+
}
10741067
break;
10751068
case ASSIGNED:
10761069
case IN_PROGRESS:
@@ -1086,8 +1079,10 @@ public void recordCompletion(ExternalCompactionId ecid) {
10861079
var rc = RUNNING_CACHE.remove(ecid);
10871080
if (rc != null) {
10881081
completed.put(ecid, rc);
1089-
LONG_RUNNING_COMPACTIONS_BY_RG
1090-
.getOrDefault(rc.getGroupName(), new TimeOrderedRunningCompactionSet()).remove(rc);
1082+
var compactionSet = LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroupName());
1083+
if (compactionSet != null) {
1084+
compactionSet.remove(rc);
1085+
}
10911086
}
10921087
}
10931088

server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ private void validateResourceGroup(String resourceGroup) {
7979
@Description("Returns a list of the available endpoints and a description for each")
8080
public Map<String,String> getEndpoints(@Context HttpServletRequest request) {
8181

82-
/**
83-
* Attemtped to use OpenAPI annotation for use with Swagger-UI, but ran into potential
82+
/*
83+
* Attempted to use OpenAPI annotation for use with Swagger-UI, but ran into potential
8484
* dependency convergence issues as we were using newer version of some of the same
8585
* dependencies.
8686
*/
@@ -159,7 +159,7 @@ public InstanceSummary getInstanceSummary() {
159159
return new InstanceSummary(monitor.getContext().getInstanceName(),
160160
monitor.getContext().instanceOperations().getInstanceId().canonical(),
161161
Set.of(monitor.getContext().getZooKeepers().split(",")),
162-
monitor.getContext().getVolumeManager().getVolumes().stream().map(v -> v.toString())
162+
monitor.getContext().getVolumeManager().getVolumes().stream().map(Object::toString)
163163
.collect(Collectors.toSet()),
164164
Constants.VERSION);
165165
}

server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,10 @@ public void run() {
264264

265265
while (true) {
266266

267-
// Don't fetch new data if there are no connections
268-
// On an initial connection, no data may be displayed
269-
// If a connection has not been made in a while, stale data may be displayed
270-
// Only refresh every 5s (old monitor logic)
267+
// Don't fetch new data if there are no connections.
268+
// On an initial connection, no data may be displayed.
269+
// If a connection has not been made in a while, stale data may be displayed.
270+
// Only refresh every 5s (old monitor logic).
271271
while (!newConnectionEvent.get() && connectionCount.get() == 0
272272
&& NanoTime.millisElapsed(refreshTime, NanoTime.now()) > 5000) {
273273
Thread.onSpinWait();
@@ -302,7 +302,7 @@ public void run() {
302302
ctx.getConfiguration().getTimeInMillis(Property.MONITOR_FETCH_TIMEOUT);
303303
long allFuturesAdded = NanoTime.now();
304304
boolean tookToLong = false;
305-
while (futures.size() > 0) {
305+
while (!futures.isEmpty()) {
306306

307307
if (NanoTime.millisElapsed(allFuturesAdded, NanoTime.now()) > monitorFetchTimeout) {
308308
tookToLong = true;
@@ -322,7 +322,7 @@ public void run() {
322322
}
323323
}
324324
}
325-
if (futures.size() > 0) {
325+
if (!futures.isEmpty()) {
326326
UtilWaitThread.sleep(3_000);
327327
}
328328
}

test/src/main/java/org/apache/accumulo/test/metrics/MetricsThriftRpcIT.java

+40-43
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package org.apache.accumulo.test.metrics;
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2223
import static org.junit.jupiter.api.Assertions.assertTrue;
23-
import static org.junit.jupiter.api.Assertions.fail;
2424

2525
import java.nio.ByteBuffer;
2626
import java.time.Duration;
@@ -50,11 +50,15 @@
5050
import org.apache.accumulo.test.functional.ConfigurableMacBase;
5151
import org.apache.hadoop.conf.Configuration;
5252
import org.junit.jupiter.api.Test;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
5355

5456
import com.google.common.net.HostAndPort;
5557

5658
public class MetricsThriftRpcIT extends ConfigurableMacBase {
5759

60+
private static final Logger log = LoggerFactory.getLogger(MetricsThriftRpcIT.class);
61+
5862
@Override
5963
protected Duration defaultTimeout() {
6064
return Duration.ofMinutes(3);
@@ -79,8 +83,8 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit
7983

8084
private int handleMetrics(final MetricResponse response) {
8185
if (response.getMetricsSize() == 0) {
82-
System.out.println("type: " + response.getServerType() + ", host: " + response.getServer()
83-
+ ", group: " + response.getResourceGroup() + "has no metrics");
86+
log.info("type: {}, host: {}, group: {} has no metrics", response.getServerType(),
87+
response.getServer(), response.getResourceGroup());
8488
return response.getMetricsSize();
8589
}
8690
for (final ByteBuffer binary : response.getMetrics()) {
@@ -90,10 +94,11 @@ private int handleMetrics(final MetricResponse response) {
9094
FTag t = fm.tags(i);
9195
tags.add(t.key() + " = " + t.value());
9296
}
93-
System.out.println("type: " + response.getServerType() + ", host: " + response.getServer()
94-
+ ", group: " + response.getResourceGroup() + ", time: " + response.getTimestamp()
95-
+ ", name: " + fm.name() + ", type: " + fm.type() + ", tags: " + tags + ", dval: "
96-
+ fm.dvalue() + ", ival: " + fm.ivalue() + ", lval: " + fm.lvalue());
97+
log.info(
98+
"type: {}, host: {}, group: {}, time: {}, name: {}, type: {}, tags: {}, dval: {}, ival: {}, lval: {}",
99+
response.getServerType(), response.getServer(), response.getResourceGroup(),
100+
response.getTimestamp(), fm.name(), fm.type(), tags, fm.dvalue(), fm.ivalue(),
101+
fm.lvalue());
97102
}
98103
return response.getMetricsSize();
99104
}
@@ -107,47 +112,39 @@ public void testRpc() throws Exception {
107112
ClientContext cc = (ClientContext) client;
108113
Set<ServerId> managers = client.instanceOperations().getServers(ServerId.Type.MANAGER);
109114
assertEquals(1, managers.size());
110-
for (ServerId server : managers) {
111-
Client metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
112-
HostAndPort.fromParts(server.getHost(), server.getPort()), cc);
113-
try {
114-
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),
115-
getCluster().getServerContext().rpcCreds());
116-
assertEquals(server.getResourceGroup(), response.getResourceGroup());
117-
assertEquals(MetricSource.MANAGER, response.getServerType());
118-
assertTrue(handleMetrics(response) > 0);
119-
} finally {
120-
ThriftUtil.returnClient(metricsClient, cc);
121-
}
115+
ServerId managerServer = managers.iterator().next();
116+
Client metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
117+
HostAndPort.fromParts(managerServer.getHost(), managerServer.getPort()), cc);
118+
try {
119+
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),
120+
getCluster().getServerContext().rpcCreds());
121+
assertEquals(managerServer.getResourceGroup(), response.getResourceGroup());
122+
assertEquals(MetricSource.MANAGER, response.getServerType());
123+
assertTrue(handleMetrics(response) > 0);
124+
} finally {
125+
ThriftUtil.returnClient(metricsClient, cc);
122126
}
123127
ServiceLockPath zgcPath = cc.getServerPaths().getGarbageCollector(true);
124-
if (zgcPath != null) {
125-
Optional<ServiceLockData> sld = cc.getZooCache().getLockData(zgcPath);
126-
String location = null;
127-
if (sld.isPresent()) {
128-
location = sld.orElseThrow().getAddressString(ThriftService.GC);
129-
HostAndPort hp = HostAndPort.fromString(location);
130-
Client metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS, hp, cc);
131-
try {
132-
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),
133-
getCluster().getServerContext().rpcCreds());
134-
assertEquals(Constants.DEFAULT_RESOURCE_GROUP_NAME, response.getResourceGroup());
135-
assertEquals(MetricSource.GARBAGE_COLLECTOR, response.getServerType());
136-
assertTrue(handleMetrics(response) > 0);
137-
} finally {
138-
ThriftUtil.returnClient(metricsClient, cc);
139-
}
140-
} else {
141-
fail("Garbage Collector ZooKeeper lock data not found");
142-
}
143-
} else {
144-
fail("Garbage Collector not found in ZooKeeper");
128+
assertNotNull(zgcPath, "Garbage Collector not found in ZooKeeper");
129+
Optional<ServiceLockData> sld = cc.getZooCache().getLockData(zgcPath);
130+
assertTrue(sld.isPresent(), "Garbage Collector ZooKeeper lock data not found");
131+
String location = sld.orElseThrow().getAddressString(ThriftService.GC);
132+
HostAndPort hp = HostAndPort.fromString(location);
133+
metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS, hp, cc);
134+
try {
135+
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),
136+
getCluster().getServerContext().rpcCreds());
137+
assertEquals(Constants.DEFAULT_RESOURCE_GROUP_NAME, response.getResourceGroup());
138+
assertEquals(MetricSource.GARBAGE_COLLECTOR, response.getServerType());
139+
assertTrue(handleMetrics(response) > 0);
140+
} finally {
141+
ThriftUtil.returnClient(metricsClient, cc);
145142
}
146143

147144
Set<ServerId> compactors = client.instanceOperations().getServers(ServerId.Type.COMPACTOR);
148145
assertEquals(4, compactors.size());
149146
for (ServerId server : compactors) {
150-
Client metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
147+
metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
151148
HostAndPort.fromParts(server.getHost(), server.getPort()), cc);
152149
try {
153150
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),
@@ -162,7 +159,7 @@ public void testRpc() throws Exception {
162159
Set<ServerId> sservers = client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER);
163160
assertEquals(3, sservers.size());
164161
for (ServerId server : sservers) {
165-
Client metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
162+
metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
166163
HostAndPort.fromParts(server.getHost(), server.getPort()), cc);
167164
try {
168165
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),
@@ -177,7 +174,7 @@ public void testRpc() throws Exception {
177174
Set<ServerId> tservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER);
178175
assertEquals(2, tservers.size());
179176
for (ServerId server : tservers) {
180-
Client metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
177+
metricsClient = ThriftUtil.getClient(ThriftClientTypes.METRICS,
181178
HostAndPort.fromParts(server.getHost(), server.getPort()), cc);
182179
try {
183180
MetricResponse response = metricsClient.getMetrics(TraceUtil.traceInfo(),

0 commit comments

Comments
 (0)