Skip to content

Commit a254aa9

Browse files
Adds latency metrics for ClusterState Appliers and Listeners (#12333)
* Adds latency metrics for ClusterState Appliers and Listeners Signed-off-by: Harsh Garg <gkharsh@amazon.com>
1 parent d26cd46 commit a254aa9

File tree

48 files changed

+495
-101
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+495
-101
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
89
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
910
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
1011
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
2222
import org.opensearch.plugins.ActionPlugin;
2323
import org.opensearch.rest.RestHandler;
24+
import org.opensearch.test.ClusterServiceUtils;
2425
import org.opensearch.test.OpenSearchTestCase;
2526
import org.opensearch.threadpool.ExecutorBuilder;
2627
import org.opensearch.threadpool.ScalingExecutorBuilder;
@@ -50,8 +51,7 @@ public void setup() {
5051
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
5152
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
5253

53-
clusterService = new ClusterService(settings, clusterSettings, threadPool);
54-
54+
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
5555
}
5656

5757
public void testGetSettings() {

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
2323
import org.opensearch.search.aggregations.support.ValueType;
2424
import org.opensearch.search.builder.SearchSourceBuilder;
25+
import org.opensearch.test.ClusterServiceUtils;
2526
import org.opensearch.test.OpenSearchTestCase;
2627
import org.junit.Before;
2728

@@ -57,7 +58,7 @@ public void setup() {
5758
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
5859
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
5960
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
60-
clusterService = new ClusterService(settings, clusterSettings, null);
61+
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
6162
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
6263
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
6364
}

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
1818
import org.opensearch.plugin.insights.rules.model.MetricType;
1919
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
20+
import org.opensearch.test.ClusterServiceUtils;
2021
import org.opensearch.test.OpenSearchTestCase;
2122
import org.opensearch.threadpool.ThreadPool;
2223
import org.opensearch.transport.TransportService;
@@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
3334
private final Settings.Builder settingsBuilder = Settings.builder();
3435
private final Settings settings = settingsBuilder.build();
3536
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
36-
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
37+
private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
3738
private final TransportService transportService = mock(TransportService.class);
3839
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
3940
private final ActionFilters actionFilters = mock(ActionFilters.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster;
10+
11+
import org.opensearch.telemetry.metrics.Histogram;
12+
import org.opensearch.telemetry.metrics.MetricsRegistry;
13+
import org.opensearch.telemetry.metrics.tags.Tags;
14+
15+
import java.util.Objects;
16+
import java.util.Optional;
17+
18+
/**
19+
* Class containing metrics (counters/latency) specific to ClusterManager.
20+
*
21+
* @opensearch.internal
22+
*/
23+
public final class ClusterManagerMetrics {
24+
25+
private static final String LATENCY_METRIC_UNIT_MS = "ms";
26+
27+
public final Histogram clusterStateAppliersHistogram;
28+
public final Histogram clusterStateListenersHistogram;
29+
public final Histogram rerouteHistogram;
30+
public final Histogram clusterStateComputeHistogram;
31+
public final Histogram clusterStatePublishHistogram;
32+
33+
public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
34+
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
35+
"cluster.state.appliers.latency",
36+
"Histogram for tracking the latency of cluster state appliers",
37+
LATENCY_METRIC_UNIT_MS
38+
);
39+
clusterStateListenersHistogram = metricsRegistry.createHistogram(
40+
"cluster.state.listeners.latency",
41+
"Histogram for tracking the latency of cluster state listeners",
42+
LATENCY_METRIC_UNIT_MS
43+
);
44+
rerouteHistogram = metricsRegistry.createHistogram(
45+
"allocation.reroute.latency",
46+
"Histogram for recording latency of shard re-routing",
47+
LATENCY_METRIC_UNIT_MS
48+
);
49+
clusterStateComputeHistogram = metricsRegistry.createHistogram(
50+
"cluster.state.new.compute.latency",
51+
"Histogram for recording time taken to compute new cluster state",
52+
LATENCY_METRIC_UNIT_MS
53+
);
54+
clusterStatePublishHistogram = metricsRegistry.createHistogram(
55+
"cluster.state.publish.success.latency",
56+
"Histogram for recording time taken to publish a new cluster state",
57+
LATENCY_METRIC_UNIT_MS
58+
);
59+
}
60+
61+
public void recordLatency(Histogram histogram, Double value) {
62+
histogram.record(value);
63+
}
64+
65+
public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags) {
66+
if (Objects.isNull(tags) || tags.isEmpty()) {
67+
histogram.record(value);
68+
return;
69+
}
70+
histogram.record(value, tags.get());
71+
}
72+
}

server/src/main/java/org/opensearch/cluster/ClusterModule.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public ClusterModule(
146146
List<ClusterPlugin> clusterPlugins,
147147
ClusterInfoService clusterInfoService,
148148
SnapshotsInfoService snapshotsInfoService,
149-
ThreadContext threadContext
149+
ThreadContext threadContext,
150+
ClusterManagerMetrics clusterManagerMetrics
150151
) {
151152
this.clusterPlugins = clusterPlugins;
152153
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
@@ -159,7 +160,8 @@ public ClusterModule(
159160
shardsAllocator,
160161
clusterInfoService,
161162
snapshotsInfoService,
162-
settings
163+
settings,
164+
clusterManagerMetrics
163165
);
164166
}
165167

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.Version;
3939
import org.opensearch.cluster.ClusterInfoService;
40+
import org.opensearch.cluster.ClusterManagerMetrics;
4041
import org.opensearch.cluster.ClusterState;
4142
import org.opensearch.cluster.RestoreInProgress;
4243
import org.opensearch.cluster.health.ClusterHealthStatus;
@@ -56,10 +57,12 @@
5657
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
5758
import org.opensearch.cluster.routing.allocation.decider.Decision;
5859
import org.opensearch.common.settings.Settings;
60+
import org.opensearch.common.unit.TimeValue;
5961
import org.opensearch.gateway.GatewayAllocator;
6062
import org.opensearch.gateway.PriorityComparator;
6163
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
6264
import org.opensearch.snapshots.SnapshotsInfoService;
65+
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
6366

6467
import java.util.ArrayList;
6568
import java.util.Collections;
@@ -96,6 +99,7 @@ public class AllocationService {
9699
private final ShardsAllocator shardsAllocator;
97100
private final ClusterInfoService clusterInfoService;
98101
private SnapshotsInfoService snapshotsInfoService;
102+
private final ClusterManagerMetrics clusterManagerMetrics;
99103

100104
// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
101105
public AllocationService(
@@ -105,32 +109,40 @@ public AllocationService(
105109
ClusterInfoService clusterInfoService,
106110
SnapshotsInfoService snapshotsInfoService
107111
) {
108-
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
112+
this(
113+
allocationDeciders,
114+
shardsAllocator,
115+
clusterInfoService,
116+
snapshotsInfoService,
117+
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
118+
);
109119
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
110120
}
111121

112122
public AllocationService(
113123
AllocationDeciders allocationDeciders,
114124
ShardsAllocator shardsAllocator,
115125
ClusterInfoService clusterInfoService,
116-
SnapshotsInfoService snapshotsInfoService
126+
SnapshotsInfoService snapshotsInfoService,
127+
ClusterManagerMetrics clusterManagerMetrics
117128
) {
118-
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY);
129+
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY, clusterManagerMetrics);
119130
}
120131

121132
public AllocationService(
122133
AllocationDeciders allocationDeciders,
123134
ShardsAllocator shardsAllocator,
124135
ClusterInfoService clusterInfoService,
125136
SnapshotsInfoService snapshotsInfoService,
126-
Settings settings
127-
137+
Settings settings,
138+
ClusterManagerMetrics clusterManagerMetrics
128139
) {
129140
this.allocationDeciders = allocationDeciders;
130141
this.shardsAllocator = shardsAllocator;
131142
this.clusterInfoService = clusterInfoService;
132143
this.snapshotsInfoService = snapshotsInfoService;
133144
this.settings = settings;
145+
this.clusterManagerMetrics = clusterManagerMetrics;
134146
}
135147

136148
/**
@@ -550,11 +562,15 @@ private void reroute(RoutingAllocation allocation) {
550562
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
551563
: "auto-expand replicas out of sync with number of nodes in the cluster";
552564
assert assertInitialized();
553-
565+
long rerouteStartTimeNS = System.nanoTime();
554566
removeDelayMarkers(allocation);
555567

556568
allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
557569
shardsAllocator.allocate(allocation);
570+
clusterManagerMetrics.recordLatency(
571+
clusterManagerMetrics.rerouteHistogram,
572+
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS))
573+
);
558574
assert RoutingNodes.assertShardStats(allocation.routingNodes());
559575
}
560576

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.cluster.ClusterChangedEvent;
39+
import org.opensearch.cluster.ClusterManagerMetrics;
3940
import org.opensearch.cluster.ClusterState;
4041
import org.opensearch.cluster.ClusterStateApplier;
4142
import org.opensearch.cluster.ClusterStateListener;
@@ -61,13 +62,15 @@
6162
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
6263
import org.opensearch.common.util.concurrent.ThreadContext;
6364
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
65+
import org.opensearch.telemetry.metrics.tags.Tags;
6466
import org.opensearch.threadpool.Scheduler;
6567
import org.opensearch.threadpool.ThreadPool;
6668

6769
import java.util.Arrays;
6870
import java.util.Collection;
6971
import java.util.Map;
7072
import java.util.Objects;
73+
import java.util.Optional;
7174
import java.util.concurrent.ConcurrentHashMap;
7275
import java.util.concurrent.CopyOnWriteArrayList;
7376
import java.util.concurrent.CountDownLatch;
@@ -120,8 +123,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
120123
private final String nodeName;
121124

122125
private NodeConnectionsService nodeConnectionsService;
123-
124-
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
126+
private final ClusterManagerMetrics clusterManagerMetrics;
127+
128+
public ClusterApplierService(
129+
String nodeName,
130+
Settings settings,
131+
ClusterSettings clusterSettings,
132+
ThreadPool threadPool,
133+
ClusterManagerMetrics clusterManagerMetrics
134+
) {
125135
this.clusterSettings = clusterSettings;
126136
this.threadPool = threadPool;
127137
this.state = new AtomicReference<>();
@@ -132,6 +142,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
132142
CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
133143
this::setSlowTaskLoggingThreshold
134144
);
145+
this.clusterManagerMetrics = clusterManagerMetrics;
135146
}
136147

137148
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@@ -597,15 +608,21 @@ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, S
597608
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
598609
}
599610

600-
private static void callClusterStateAppliers(
611+
private void callClusterStateAppliers(
601612
ClusterChangedEvent clusterChangedEvent,
602613
StopWatch stopWatch,
603614
Collection<ClusterStateApplier> clusterStateAppliers
604615
) {
605616
for (ClusterStateApplier applier : clusterStateAppliers) {
606617
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
607618
try (TimingHandle ignored = stopWatch.timing("running applier [" + applier + "]")) {
619+
long applierStartTimeNS = System.nanoTime();
608620
applier.applyClusterState(clusterChangedEvent);
621+
clusterManagerMetrics.recordLatency(
622+
clusterManagerMetrics.clusterStateAppliersHistogram,
623+
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - applierStartTimeNS)),
624+
Optional.of(Tags.create().addTag("Operation", applier.getClass().getSimpleName()))
625+
);
609626
}
610627
}
611628
}
@@ -624,7 +641,13 @@ private void callClusterStateListener(
624641
try {
625642
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
626643
try (TimingHandle ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
644+
long listenerStartTimeNS = System.nanoTime();
627645
listener.clusterChanged(clusterChangedEvent);
646+
clusterManagerMetrics.recordLatency(
647+
clusterManagerMetrics.clusterStateListenersHistogram,
648+
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)),
649+
Optional.of(Tags.create().addTag("Operation", listener.getClass().getSimpleName()))
650+
);
628651
}
629652
} catch (Exception ex) {
630653
logger.warn("failed to notify ClusterStateListener", ex);

server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.cluster.service;
1010

11+
import org.opensearch.cluster.ClusterManagerMetrics;
1112
import org.opensearch.common.annotation.PublicApi;
1213
import org.opensearch.common.settings.ClusterSettings;
1314
import org.opensearch.common.settings.Settings;
@@ -20,7 +21,12 @@
2021
*/
2122
@PublicApi(since = "2.2.0")
2223
public class ClusterManagerService extends MasterService {
23-
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
24-
super(settings, clusterSettings, threadPool);
24+
public ClusterManagerService(
25+
Settings settings,
26+
ClusterSettings clusterSettings,
27+
ThreadPool threadPool,
28+
ClusterManagerMetrics clusterManagerMetrics
29+
) {
30+
super(settings, clusterSettings, threadPool, clusterManagerMetrics);
2531
}
2632
}

server/src/main/java/org/opensearch/cluster/service/ClusterService.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.cluster.service;
3434

35+
import org.opensearch.cluster.ClusterManagerMetrics;
3536
import org.opensearch.cluster.ClusterName;
3637
import org.opensearch.cluster.ClusterState;
3738
import org.opensearch.cluster.ClusterStateApplier;
@@ -91,12 +92,17 @@ public class ClusterService extends AbstractLifecycleComponent {
9192

9293
private IndexingPressureService indexingPressureService;
9394

94-
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
95+
public ClusterService(
96+
Settings settings,
97+
ClusterSettings clusterSettings,
98+
ThreadPool threadPool,
99+
ClusterManagerMetrics clusterManagerMetrics
100+
) {
95101
this(
96102
settings,
97103
clusterSettings,
98-
new ClusterManagerService(settings, clusterSettings, threadPool),
99-
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
104+
new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics),
105+
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics)
100106
);
101107
}
102108

0 commit comments

Comments
 (0)