Skip to content

Commit d4ed2e8

Browse files
gargharsh3134Harsh Garg
authored andcommitted
[Backport] ClusterManager metrics instrumentation changes (opensearch-project#14118)
--------- Signed-off-by: Harsh Garg <gkharsh@amazon.com> Co-authored-by: Harsh Garg <gkharsh@amazon.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 33c26a1 commit d4ed2e8

File tree

56 files changed

+848
-137
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+848
-137
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- Add leader and follower check failure counter metrics ([#12439](https://github.com/opensearch-project/OpenSearch/pull/12439))
9+
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
810
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
911
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
1012
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
2222
import org.opensearch.plugins.ActionPlugin;
2323
import org.opensearch.rest.RestHandler;
24+
import org.opensearch.test.ClusterServiceUtils;
2425
import org.opensearch.test.OpenSearchTestCase;
2526
import org.opensearch.threadpool.ExecutorBuilder;
2627
import org.opensearch.threadpool.ScalingExecutorBuilder;
@@ -51,8 +52,7 @@ public void setup() {
5152
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
5253
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS);
5354

54-
clusterService = new ClusterService(settings, clusterSettings, threadPool);
55-
55+
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
5656
}
5757

5858
public void testGetSettings() {

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.search.aggregations.support.ValueType;
2929
import org.opensearch.search.builder.SearchSourceBuilder;
3030
import org.opensearch.tasks.Task;
31+
import org.opensearch.test.ClusterServiceUtils;
3132
import org.opensearch.test.OpenSearchTestCase;
3233
import org.opensearch.threadpool.ThreadPool;
3334
import org.junit.Before;
@@ -69,7 +70,7 @@ public void setup() {
6970
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
7071
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
7172
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
72-
clusterService = new ClusterService(settings, clusterSettings, null);
73+
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
7374
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
7475
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
7576

plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
1818
import org.opensearch.plugin.insights.rules.model.MetricType;
1919
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
20+
import org.opensearch.test.ClusterServiceUtils;
2021
import org.opensearch.test.OpenSearchTestCase;
2122
import org.opensearch.threadpool.ThreadPool;
2223
import org.opensearch.transport.TransportService;
@@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
3334
private final Settings.Builder settingsBuilder = Settings.builder();
3435
private final Settings settings = settingsBuilder.build();
3536
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
36-
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
37+
private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
3738
private final TransportService transportService = mock(TransportService.class);
3839
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
3940
private final ActionFilters actionFilters = mock(ActionFilters.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster;
10+
11+
import org.opensearch.telemetry.metrics.Counter;
12+
import org.opensearch.telemetry.metrics.Histogram;
13+
import org.opensearch.telemetry.metrics.MetricsRegistry;
14+
import org.opensearch.telemetry.metrics.tags.Tags;
15+
16+
import java.util.Objects;
17+
import java.util.Optional;
18+
19+
/**
20+
* Class containing metrics (counters/latency) specific to ClusterManager.
21+
*
22+
* @opensearch.internal
23+
*/
24+
public final class ClusterManagerMetrics {
25+
26+
private static final String LATENCY_METRIC_UNIT_MS = "ms";
27+
private static final String COUNTER_METRICS_UNIT = "1";
28+
29+
public final Histogram clusterStateAppliersHistogram;
30+
public final Histogram clusterStateListenersHistogram;
31+
public final Histogram rerouteHistogram;
32+
public final Histogram clusterStateComputeHistogram;
33+
public final Histogram clusterStatePublishHistogram;
34+
35+
public final Counter leaderCheckFailureCounter;
36+
public final Counter followerChecksFailureCounter;
37+
38+
public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
39+
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
40+
"cluster.state.appliers.latency",
41+
"Histogram for tracking the latency of cluster state appliers",
42+
LATENCY_METRIC_UNIT_MS
43+
);
44+
clusterStateListenersHistogram = metricsRegistry.createHistogram(
45+
"cluster.state.listeners.latency",
46+
"Histogram for tracking the latency of cluster state listeners",
47+
LATENCY_METRIC_UNIT_MS
48+
);
49+
rerouteHistogram = metricsRegistry.createHistogram(
50+
"allocation.reroute.latency",
51+
"Histogram for recording latency of shard re-routing",
52+
LATENCY_METRIC_UNIT_MS
53+
);
54+
clusterStateComputeHistogram = metricsRegistry.createHistogram(
55+
"cluster.state.new.compute.latency",
56+
"Histogram for recording time taken to compute new cluster state",
57+
LATENCY_METRIC_UNIT_MS
58+
);
59+
clusterStatePublishHistogram = metricsRegistry.createHistogram(
60+
"cluster.state.publish.success.latency",
61+
"Histogram for recording time taken to publish a new cluster state",
62+
LATENCY_METRIC_UNIT_MS
63+
);
64+
followerChecksFailureCounter = metricsRegistry.createCounter(
65+
"followers.checker.failure.count",
66+
"Counter for number of failed follower checks",
67+
COUNTER_METRICS_UNIT
68+
);
69+
leaderCheckFailureCounter = metricsRegistry.createCounter(
70+
"leader.checker.failure.count",
71+
"Counter for number of failed leader checks",
72+
COUNTER_METRICS_UNIT
73+
);
74+
}
75+
76+
public void recordLatency(Histogram histogram, Double value) {
77+
histogram.record(value);
78+
}
79+
80+
public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags) {
81+
if (Objects.isNull(tags) || tags.isEmpty()) {
82+
histogram.record(value);
83+
return;
84+
}
85+
histogram.record(value, tags.get());
86+
}
87+
88+
public void incrementCounter(Counter counter, Double value) {
89+
incrementCounter(counter, value, Optional.empty());
90+
}
91+
92+
public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
93+
if (Objects.isNull(tags) || tags.isEmpty()) {
94+
counter.add(value);
95+
return;
96+
}
97+
counter.add(value, tags.get());
98+
}
99+
}

server/src/main/java/org/opensearch/cluster/ClusterModule.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public ClusterModule(
144144
List<ClusterPlugin> clusterPlugins,
145145
ClusterInfoService clusterInfoService,
146146
SnapshotsInfoService snapshotsInfoService,
147-
ThreadContext threadContext
147+
ThreadContext threadContext,
148+
ClusterManagerMetrics clusterManagerMetrics
148149
) {
149150
this.clusterPlugins = clusterPlugins;
150151
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
@@ -157,7 +158,8 @@ public ClusterModule(
157158
shardsAllocator,
158159
clusterInfoService,
159160
snapshotsInfoService,
160-
settings
161+
settings,
162+
clusterManagerMetrics
161163
);
162164
}
163165

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.LegacyESVersion;
3939
import org.opensearch.cluster.ClusterChangedEvent;
40+
import org.opensearch.cluster.ClusterManagerMetrics;
4041
import org.opensearch.cluster.ClusterName;
4142
import org.opensearch.cluster.ClusterState;
4243
import org.opensearch.cluster.ClusterStateTaskConfig;
@@ -208,7 +209,8 @@ public Coordinator(
208209
ElectionStrategy electionStrategy,
209210
NodeHealthService nodeHealthService,
210211
PersistedStateRegistry persistedStateRegistry,
211-
RemoteStoreNodeService remoteStoreNodeService
212+
RemoteStoreNodeService remoteStoreNodeService,
213+
ClusterManagerMetrics clusterManagerMetrics
212214
) {
213215
this.settings = settings;
214216
this.transportService = transportService;
@@ -262,14 +264,22 @@ public Coordinator(
262264
this::handlePublishRequest,
263265
this::handleApplyCommit
264266
);
265-
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
267+
this.leaderChecker = new LeaderChecker(
268+
settings,
269+
clusterSettings,
270+
transportService,
271+
this::onLeaderFailure,
272+
nodeHealthService,
273+
clusterManagerMetrics
274+
);
266275
this.followersChecker = new FollowersChecker(
267276
settings,
268277
clusterSettings,
269278
transportService,
270279
this::onFollowerCheckRequest,
271280
this::removeNode,
272-
nodeHealthService
281+
nodeHealthService,
282+
clusterManagerMetrics
273283
);
274284
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
275285
this.clusterApplier = clusterApplier;

server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
38+
import org.opensearch.cluster.ClusterManagerMetrics;
3839
import org.opensearch.cluster.coordination.Coordinator.Mode;
3940
import org.opensearch.cluster.node.DiscoveryNode;
4041
import org.opensearch.cluster.node.DiscoveryNodes;
@@ -127,14 +128,16 @@ public class FollowersChecker {
127128
private final TransportService transportService;
128129
private final NodeHealthService nodeHealthService;
129130
private volatile FastResponseState fastResponseState;
131+
private ClusterManagerMetrics clusterManagerMetrics;
130132

131133
public FollowersChecker(
132134
Settings settings,
133135
ClusterSettings clusterSettings,
134136
TransportService transportService,
135137
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
136138
BiConsumer<DiscoveryNode, String> onNodeFailure,
137-
NodeHealthService nodeHealthService
139+
NodeHealthService nodeHealthService,
140+
ClusterManagerMetrics clusterManagerMetrics
138141
) {
139142
this.settings = settings;
140143
this.transportService = transportService;
@@ -161,6 +164,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
161164
handleDisconnectedNode(node);
162165
}
163166
});
167+
this.clusterManagerMetrics = clusterManagerMetrics;
164168
}
165169

166170
private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
@@ -413,6 +417,7 @@ public String executor() {
413417
}
414418

415419
void failNode(String reason) {
420+
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0);
416421
transportService.getThreadPool().generic().execute(new Runnable() {
417422
@Override
418423
public void run() {

server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.OpenSearchException;
39+
import org.opensearch.cluster.ClusterManagerMetrics;
3940
import org.opensearch.cluster.node.DiscoveryNode;
4041
import org.opensearch.cluster.node.DiscoveryNodes;
4142
import org.opensearch.common.Nullable;
@@ -119,17 +120,17 @@ public class LeaderChecker {
119120
private final TransportService transportService;
120121
private final Consumer<Exception> onLeaderFailure;
121122
private final NodeHealthService nodeHealthService;
122-
123123
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
124-
125124
private volatile DiscoveryNodes discoveryNodes;
125+
private final ClusterManagerMetrics clusterManagerMetrics;
126126

127127
LeaderChecker(
128128
final Settings settings,
129129
final ClusterSettings clusterSettings,
130130
final TransportService transportService,
131131
final Consumer<Exception> onLeaderFailure,
132-
NodeHealthService nodeHealthService
132+
NodeHealthService nodeHealthService,
133+
final ClusterManagerMetrics clusterManagerMetrics
133134
) {
134135
this.settings = settings;
135136
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
@@ -138,6 +139,7 @@ public class LeaderChecker {
138139
this.transportService = transportService;
139140
this.onLeaderFailure = onLeaderFailure;
140141
this.nodeHealthService = nodeHealthService;
142+
this.clusterManagerMetrics = clusterManagerMetrics;
141143
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);
142144

143145
transportService.registerRequestHandler(
@@ -293,7 +295,6 @@ public void handleResponse(Empty response) {
293295
logger.debug("closed check scheduler received a response, doing nothing");
294296
return;
295297
}
296-
297298
failureCountSinceLastSuccess.set(0);
298299
scheduleNextWakeUp(); // logs trace message indicating success
299300
}
@@ -304,7 +305,6 @@ public void handleException(TransportException exp) {
304305
logger.debug("closed check scheduler received a response, doing nothing");
305306
return;
306307
}
307-
308308
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
309309
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
310310
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
@@ -355,6 +355,7 @@ public String executor() {
355355

356356
void leaderFailed(Exception e) {
357357
if (isClosed.compareAndSet(false, true)) {
358+
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0);
358359
transportService.getThreadPool().generic().execute(new Runnable() {
359360
@Override
360361
public void run() {

0 commit comments

Comments
 (0)