Skip to content

Commit 8ddb3ee

Browse files
Add success and failure count OTel metrics for async shard fetch (opensearch-project#15976)
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
1 parent 7ba8b78 commit 8ddb3ee

12 files changed

+344
-44
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
1515
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
1616
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
17+
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
1718

1819
### Dependencies
1920
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

+21-6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.action.FailedNodeException;
3838
import org.opensearch.action.support.ActionFilters;
3939
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
40+
import org.opensearch.cluster.ClusterManagerMetrics;
4041
import org.opensearch.cluster.ClusterState;
4142
import org.opensearch.cluster.block.ClusterBlockException;
4243
import org.opensearch.cluster.block.ClusterBlockLevel;
@@ -88,6 +89,7 @@ public class TransportIndicesShardStoresAction extends TransportClusterManagerNo
8889
private static final Logger logger = LogManager.getLogger(TransportIndicesShardStoresAction.class);
8990

9091
private final TransportNodesListGatewayStartedShards listShardStoresInfo;
92+
private final ClusterManagerMetrics clusterManagerMetrics;
9193

9294
@Inject
9395
public TransportIndicesShardStoresAction(
@@ -96,7 +98,8 @@ public TransportIndicesShardStoresAction(
9698
ThreadPool threadPool,
9799
ActionFilters actionFilters,
98100
IndexNameExpressionResolver indexNameExpressionResolver,
99-
TransportNodesListGatewayStartedShards listShardStoresInfo
101+
TransportNodesListGatewayStartedShards listShardStoresInfo,
102+
ClusterManagerMetrics clusterManagerMetrics
100103
) {
101104
super(
102105
IndicesShardStoresAction.NAME,
@@ -109,6 +112,7 @@ public TransportIndicesShardStoresAction(
109112
true
110113
);
111114
this.listShardStoresInfo = listShardStoresInfo;
115+
this.clusterManagerMetrics = clusterManagerMetrics;
112116
}
113117

114118
@Override
@@ -154,7 +158,7 @@ protected void clusterManagerOperation(
154158
// we could fetch all shard store info from every node once (nNodes requests)
155159
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
156160
// for fetching shard stores info, that operates on a list of shards instead of a single shard
157-
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start();
161+
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener, clusterManagerMetrics).start();
158162
}
159163

160164
@Override
@@ -175,27 +179,37 @@ private class AsyncShardStoresInfoFetches {
175179
private final ActionListener<IndicesShardStoresResponse> listener;
176180
private CountDown expectedOps;
177181
private final Queue<InternalAsyncFetch.Response> fetchResponses;
182+
private final ClusterManagerMetrics clusterManagerMetrics;
178183

179184
AsyncShardStoresInfoFetches(
180185
DiscoveryNodes nodes,
181186
RoutingNodes routingNodes,
182187
Set<Tuple<ShardId, String>> shards,
183-
ActionListener<IndicesShardStoresResponse> listener
188+
ActionListener<IndicesShardStoresResponse> listener,
189+
ClusterManagerMetrics clusterManagerMetrics
184190
) {
185191
this.nodes = nodes;
186192
this.routingNodes = routingNodes;
187193
this.shards = shards;
188194
this.listener = listener;
189195
this.fetchResponses = new ConcurrentLinkedQueue<>();
190196
this.expectedOps = new CountDown(shards.size());
197+
this.clusterManagerMetrics = clusterManagerMetrics;
191198
}
192199

193200
void start() {
194201
if (shards.isEmpty()) {
195202
listener.onResponse(new IndicesShardStoresResponse());
196203
} else {
197204
for (Tuple<ShardId, String> shard : shards) {
198-
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo);
205+
InternalAsyncFetch fetch = new InternalAsyncFetch(
206+
logger,
207+
"shard_stores",
208+
shard.v1(),
209+
shard.v2(),
210+
listShardStoresInfo,
211+
clusterManagerMetrics
212+
);
199213
fetch.fetchData(nodes, Collections.emptyMap());
200214
}
201215
}
@@ -213,9 +227,10 @@ private class InternalAsyncFetch extends AsyncShardFetch<NodeGatewayStartedShard
213227
String type,
214228
ShardId shardId,
215229
String customDataPath,
216-
TransportNodesListGatewayStartedShards action
230+
TransportNodesListGatewayStartedShards action,
231+
ClusterManagerMetrics clusterManagerMetrics
217232
) {
218-
super(logger, type, shardId, customDataPath, action);
233+
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);
219234
}
220235

221236
@Override

server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java

+13
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public final class ClusterManagerMetrics {
3434

3535
public final Counter leaderCheckFailureCounter;
3636
public final Counter followerChecksFailureCounter;
37+
public final Counter asyncFetchFailureCounter;
38+
public final Counter asyncFetchSuccessCounter;
3739

3840
public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
3941
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
@@ -71,6 +73,17 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
7173
"Counter for number of failed leader checks",
7274
COUNTER_METRICS_UNIT
7375
);
76+
asyncFetchFailureCounter = metricsRegistry.createCounter(
77+
"async.fetch.failure.count",
78+
"Counter for number of failed async fetches",
79+
COUNTER_METRICS_UNIT
80+
);
81+
asyncFetchSuccessCounter = metricsRegistry.createCounter(
82+
"async.fetch.success.count",
83+
"Counter for number of successful async fetches",
84+
COUNTER_METRICS_UNIT
85+
);
86+
7487
}
7588

7689
public void recordLatency(Histogram histogram, Double value) {

server/src/main/java/org/opensearch/cluster/ClusterModule.java

+3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class ClusterModule extends AbstractModule {
142142
// pkg private for tests
143143
final Collection<AllocationDecider> deciderList;
144144
final ShardsAllocator shardsAllocator;
145+
private final ClusterManagerMetrics clusterManagerMetrics;
145146

146147
public ClusterModule(
147148
Settings settings,
@@ -166,6 +167,7 @@ public ClusterModule(
166167
settings,
167168
clusterManagerMetrics
168169
);
170+
this.clusterManagerMetrics = clusterManagerMetrics;
169171
}
170172

171173
public static List<Entry> getNamedWriteables() {
@@ -456,6 +458,7 @@ protected void configure() {
456458
bind(TaskResultsService.class).asEagerSingleton();
457459
bind(AllocationDeciders.class).toInstance(allocationDeciders);
458460
bind(ShardsAllocator.class).toInstance(shardsAllocator);
461+
bind(ClusterManagerMetrics.class).toInstance(clusterManagerMetrics);
459462
}
460463

461464
public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) {

server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.opensearch.action.support.nodes.BaseNodeResponse;
1313
import org.opensearch.action.support.nodes.BaseNodesResponse;
14+
import org.opensearch.cluster.ClusterManagerMetrics;
1415
import org.opensearch.cluster.node.DiscoveryNode;
1516
import org.opensearch.common.logging.Loggers;
1617
import org.opensearch.core.index.shard.ShardId;
@@ -48,7 +49,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
4849
Class<V> clazz,
4950
V emptyShardResponse,
5051
Predicate<V> emptyShardResponsePredicate,
51-
ShardBatchResponseFactory<T, V> responseFactory
52+
ShardBatchResponseFactory<T, V> responseFactory,
53+
ClusterManagerMetrics clusterManagerMetrics
5254
) {
5355
super(
5456
logger,
@@ -64,7 +66,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
6466
clazz,
6567
emptyShardResponse,
6668
emptyShardResponsePredicate,
67-
responseFactory
69+
responseFactory,
70+
clusterManagerMetrics
6871
)
6972
);
7073
}
@@ -116,9 +119,10 @@ public ShardBatchCache(
116119
Class<V> clazz,
117120
V emptyResponse,
118121
Predicate<V> emptyShardResponsePredicate,
119-
ShardBatchResponseFactory<T, V> responseFactory
122+
ShardBatchResponseFactory<T, V> responseFactory,
123+
ClusterManagerMetrics clusterManagerMetrics
120124
) {
121-
super(Loggers.getLogger(logger, "_" + logKey), type);
125+
super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics);
122126
this.batchSize = shardAttributesMap.size();
123127
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
124128
cache = new HashMap<>();

server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.action.FailedNodeException;
3636
import org.opensearch.action.support.nodes.BaseNodeResponse;
3737
import org.opensearch.action.support.nodes.BaseNodesResponse;
38+
import org.opensearch.cluster.ClusterManagerMetrics;
3839
import org.opensearch.cluster.node.DiscoveryNode;
3940
import org.opensearch.cluster.node.DiscoveryNodes;
4041
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
@@ -94,15 +95,16 @@ protected AsyncShardFetch(
9495
String type,
9596
ShardId shardId,
9697
String customDataPath,
97-
Lister<? extends BaseNodesResponse<T>, T> action
98+
Lister<? extends BaseNodesResponse<T>, T> action,
99+
ClusterManagerMetrics clusterManagerMetrics
98100
) {
99101
this.logger = logger;
100102
this.type = type;
101103
shardAttributesMap = new HashMap<>();
102104
shardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
103105
this.action = (Lister<BaseNodesResponse<T>, T>) action;
104106
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
105-
cache = new ShardCache<>(logger, reroutingKey, type);
107+
cache = new ShardCache<>(logger, reroutingKey, type, clusterManagerMetrics);
106108
}
107109

108110
/**
@@ -284,8 +286,8 @@ static class ShardCache<K extends BaseNodeResponse> extends AsyncShardFetchCache
284286

285287
private final Map<String, NodeEntry<K>> cache;
286288

287-
public ShardCache(Logger logger, String logKey, String type) {
288-
super(Loggers.getLogger(logger, "_" + logKey), type);
289+
public ShardCache(Logger logger, String logKey, String type, ClusterManagerMetrics clusterManagerMetrics) {
290+
super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics);
289291
cache = new HashMap<>();
290292
}
291293

server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.OpenSearchTimeoutException;
1515
import org.opensearch.action.FailedNodeException;
1616
import org.opensearch.action.support.nodes.BaseNodeResponse;
17+
import org.opensearch.cluster.ClusterManagerMetrics;
1718
import org.opensearch.cluster.node.DiscoveryNode;
1819
import org.opensearch.cluster.node.DiscoveryNodes;
1920
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
@@ -51,10 +52,12 @@ public abstract class AsyncShardFetchCache<K extends BaseNodeResponse> {
5152

5253
private final Logger logger;
5354
private final String type;
55+
private final ClusterManagerMetrics clusterManagerMetrics;
5456

55-
protected AsyncShardFetchCache(Logger logger, String type) {
57+
protected AsyncShardFetchCache(Logger logger, String type, ClusterManagerMetrics clusterManagerMetrics) {
5658
this.logger = logger;
5759
this.type = type;
60+
this.clusterManagerMetrics = clusterManagerMetrics;
5861
}
5962

6063
abstract void initData(DiscoveryNode node);
@@ -162,6 +165,7 @@ Map<DiscoveryNode, K> getCacheData(DiscoveryNodes nodes, Set<String> failedNodes
162165
}
163166

164167
void processResponses(List<K> responses, long fetchingRound) {
168+
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, Double.valueOf(responses.size()));
165169
for (K response : responses) {
166170
BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
167171
if (nodeEntry != null) {
@@ -222,6 +226,7 @@ boolean retryableException(Throwable unwrappedCause) {
222226
}
223227

224228
void processFailures(List<FailedNodeException> failures, long fetchingRound) {
229+
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, Double.valueOf(failures.size()));
225230
for (FailedNodeException failure : failures) {
226231
logger.trace("processing failure {} for [{}]", failure, type);
227232
BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());

server/src/main/java/org/opensearch/gateway/GatewayAllocator.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.action.support.nodes.BaseNodeResponse;
3939
import org.opensearch.action.support.nodes.BaseNodesResponse;
40+
import org.opensearch.cluster.ClusterManagerMetrics;
4041
import org.opensearch.cluster.metadata.IndexMetadata;
4142
import org.opensearch.cluster.node.DiscoveryNode;
4243
import org.opensearch.cluster.node.DiscoveryNodes;
@@ -92,11 +93,12 @@ public class GatewayAllocator implements ExistingShardsAllocator {
9293
public GatewayAllocator(
9394
RerouteService rerouteService,
9495
TransportNodesListGatewayStartedShards startedAction,
95-
TransportNodesListShardStoreMetadata storeAction
96+
TransportNodesListShardStoreMetadata storeAction,
97+
ClusterManagerMetrics clusterManagerMetrics
9698
) {
9799
this.rerouteService = rerouteService;
98-
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
99-
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
100+
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction, clusterManagerMetrics);
101+
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction, clusterManagerMetrics);
100102
}
101103

102104
@Override
@@ -251,9 +253,10 @@ class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T>
251253
String type,
252254
ShardId shardId,
253255
String customDataPath,
254-
Lister<? extends BaseNodesResponse<T>, T> action
256+
Lister<? extends BaseNodesResponse<T>, T> action,
257+
ClusterManagerMetrics clusterManagerMetrics
255258
) {
256-
super(logger, type, shardId, customDataPath, action);
259+
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);
257260
}
258261

259262
@Override
@@ -274,9 +277,11 @@ protected void reroute(String reroutingKey, String reason) {
274277
class InternalPrimaryShardAllocator extends PrimaryShardAllocator {
275278

276279
private final TransportNodesListGatewayStartedShards startedAction;
280+
private final ClusterManagerMetrics clusterManagerMetrics;
277281

278-
InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction) {
282+
InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction, ClusterManagerMetrics clusterManagerMetrics) {
279283
this.startedAction = startedAction;
284+
this.clusterManagerMetrics = clusterManagerMetrics;
280285
}
281286

282287
@Override
@@ -291,7 +296,8 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
291296
"shard_started",
292297
shardId,
293298
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
294-
startedAction
299+
startedAction,
300+
clusterManagerMetrics
295301
)
296302
);
297303
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(
@@ -313,9 +319,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
313319
class InternalReplicaShardAllocator extends ReplicaShardAllocator {
314320

315321
private final TransportNodesListShardStoreMetadata storeAction;
322+
private final ClusterManagerMetrics clusterManagerMetrics;
316323

317-
InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction) {
324+
InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction, ClusterManagerMetrics clusterManagerMetrics) {
318325
this.storeAction = storeAction;
326+
this.clusterManagerMetrics = clusterManagerMetrics;
319327
}
320328

321329
@Override
@@ -330,7 +338,8 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
330338
"shard_store",
331339
shard.shardId(),
332340
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
333-
storeAction
341+
storeAction,
342+
clusterManagerMetrics
334343
)
335344
);
336345
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> shardStores = fetch.fetchData(

0 commit comments

Comments
 (0)