Skip to content

Commit 0d9e974

Browse files
authored
[Backport 2.16] Optimized ClusterStatsIndices to precomute shard stats (#14426) (#14913)
* Optimized ClusterStatsIndices to precompute shard stats (#14426) Signed-off-by: Pranshu Shukla <pranshushukla06@gmail.com>
1 parent 9231468 commit 0d9e974

File tree

9 files changed

+578
-38
lines changed

9 files changed

+578
-38
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
2929
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
3030
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
31+
- Optimize Cluster Stats Indices to precomute node level stats ([#14426](https://github.com/opensearch-project/OpenSearch/pull/14426))
3132

3233
### Dependencies
3334
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java

+102-17
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ public void testNodeCounts() {
8888
Map<String, Integer> expectedCounts = getExpectedCounts(1, 1, 1, 1, 1, 0, 0);
8989
int numNodes = randomIntBetween(1, 5);
9090

91-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
91+
ClusterStatsResponse response = client().admin()
92+
.cluster()
93+
.prepareClusterStats()
94+
.useAggregatedNodeLevelResponses(randomBoolean())
95+
.get();
9296
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);
9397

9498
for (int i = 0; i < numNodes; i++) {
@@ -153,7 +157,11 @@ public void testNodeCountsWithDeprecatedMasterRole() throws ExecutionException,
153157
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 0, 0, 0);
154158

155159
Client client = client();
156-
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
160+
ClusterStatsResponse response = client.admin()
161+
.cluster()
162+
.prepareClusterStats()
163+
.useAggregatedNodeLevelResponses(randomBoolean())
164+
.get();
157165
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);
158166

159167
Set<String> expectedRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE.roleName());
@@ -176,15 +184,60 @@ private void assertShardStats(ClusterStatsIndices.ShardStats stats, int indices,
176184
assertThat(stats.getReplication(), Matchers.equalTo(replicationFactor));
177185
}
178186

179-
public void testIndicesShardStats() throws ExecutionException, InterruptedException {
187+
public void testIndicesShardStatsWithoutNodeLevelAggregations() {
188+
internalCluster().startNode();
189+
ensureGreen();
190+
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
191+
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
192+
193+
prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();
194+
195+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
196+
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
197+
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
198+
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
199+
assertShardStats(response.getIndicesStats().getShards(), 1, 2, 2, 0.0);
200+
201+
// add another node, replicas should get assigned
202+
internalCluster().startNode();
203+
ensureGreen();
204+
index("test1", "type", "1", "f", "f");
205+
refresh(); // make the doc visible
206+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
207+
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
208+
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
209+
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);
210+
211+
prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
212+
ensureGreen();
213+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(false).get();
214+
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
215+
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
216+
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
217+
218+
assertThat(response.getIndicesStats().getShards().getAvgIndexPrimaryShards(), Matchers.equalTo(2.5));
219+
assertThat(response.getIndicesStats().getShards().getMinIndexPrimaryShards(), Matchers.equalTo(2));
220+
assertThat(response.getIndicesStats().getShards().getMaxIndexPrimaryShards(), Matchers.equalTo(3));
221+
222+
assertThat(response.getIndicesStats().getShards().getAvgIndexShards(), Matchers.equalTo(3.5));
223+
assertThat(response.getIndicesStats().getShards().getMinIndexShards(), Matchers.equalTo(3));
224+
assertThat(response.getIndicesStats().getShards().getMaxIndexShards(), Matchers.equalTo(4));
225+
226+
assertThat(response.getIndicesStats().getShards().getAvgIndexReplication(), Matchers.equalTo(0.5));
227+
assertThat(response.getIndicesStats().getShards().getMinIndexReplication(), Matchers.equalTo(0.0));
228+
assertThat(response.getIndicesStats().getShards().getMaxIndexReplication(), Matchers.equalTo(1.0));
229+
230+
}
231+
232+
public void testIndicesShardStatsWithNodeLevelAggregations() {
180233
internalCluster().startNode();
181234
ensureGreen();
182-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
235+
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
183236
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
184237

185238
prepareCreate("test1").setSettings(Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 1)).get();
186239

187-
response = client().admin().cluster().prepareClusterStats().get();
240+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
188241
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.YELLOW));
189242
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(0L));
190243
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(1));
@@ -195,14 +248,14 @@ public void testIndicesShardStats() throws ExecutionException, InterruptedExcept
195248
ensureGreen();
196249
index("test1", "type", "1", "f", "f");
197250
refresh(); // make the doc visible
198-
response = client().admin().cluster().prepareClusterStats().get();
251+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
199252
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
200253
assertThat(response.indicesStats.getDocs().getCount(), Matchers.equalTo(1L));
201254
assertShardStats(response.getIndicesStats().getShards(), 1, 4, 2, 1.0);
202255

203256
prepareCreate("test2").setSettings(Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0)).get();
204257
ensureGreen();
205-
response = client().admin().cluster().prepareClusterStats().get();
258+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(true).get();
206259
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
207260
assertThat(response.indicesStats.getIndexCount(), Matchers.equalTo(2));
208261
assertShardStats(response.getIndicesStats().getShards(), 2, 7, 5, 2.0 / 5);
@@ -225,7 +278,11 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
225278
internalCluster().startNodes(randomIntBetween(1, 3));
226279
index("test1", "type", "1", "f", "f");
227280

228-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
281+
ClusterStatsResponse response = client().admin()
282+
.cluster()
283+
.prepareClusterStats()
284+
.useAggregatedNodeLevelResponses(randomBoolean())
285+
.get();
229286
String msg = response.toString();
230287
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
231288
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));
@@ -265,13 +322,21 @@ public void testAllocatedProcessors() throws Exception {
265322
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
266323
waitForNodes(1);
267324

268-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
325+
ClusterStatsResponse response = client().admin()
326+
.cluster()
327+
.prepareClusterStats()
328+
.useAggregatedNodeLevelResponses(randomBoolean())
329+
.get();
269330
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
270331
}
271332

272333
public void testClusterStatusWhenStateNotRecovered() throws Exception {
273334
internalCluster().startClusterManagerOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
274-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
335+
ClusterStatsResponse response = client().admin()
336+
.cluster()
337+
.prepareClusterStats()
338+
.useAggregatedNodeLevelResponses(randomBoolean())
339+
.get();
275340
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
276341

277342
if (randomBoolean()) {
@@ -281,14 +346,18 @@ public void testClusterStatusWhenStateNotRecovered() throws Exception {
281346
}
282347
// wait for the cluster status to settle
283348
ensureGreen();
284-
response = client().admin().cluster().prepareClusterStats().get();
349+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
285350
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
286351
}
287352

288353
public void testFieldTypes() {
289354
internalCluster().startNode();
290355
ensureGreen();
291-
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
356+
ClusterStatsResponse response = client().admin()
357+
.cluster()
358+
.prepareClusterStats()
359+
.useAggregatedNodeLevelResponses(randomBoolean())
360+
.get();
292361
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
293362
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());
294363

@@ -301,7 +370,7 @@ public void testFieldTypes() {
301370
+ "\"eggplant\":{\"type\":\"integer\"}}}}}"
302371
)
303372
.get();
304-
response = client().admin().cluster().prepareClusterStats().get();
373+
response = client().admin().cluster().prepareClusterStats().useAggregatedNodeLevelResponses(randomBoolean()).get();
305374
assertThat(response.getIndicesStats().getMappings().getFieldTypeStats().size(), equalTo(3));
306375
Set<IndexFeatureStats> stats = response.getIndicesStats().getMappings().getFieldTypeStats();
307376
for (IndexFeatureStats stat : stats) {
@@ -329,7 +398,11 @@ public void testNodeRolesWithMasterLegacySettings() throws ExecutionException, I
329398
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);
330399

331400
Client client = client();
332-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
401+
ClusterStatsResponse clusterStatsResponse = client.admin()
402+
.cluster()
403+
.prepareClusterStats()
404+
.useAggregatedNodeLevelResponses(randomBoolean())
405+
.get();
333406
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);
334407

335408
Set<String> expectedRoles = Set.of(
@@ -359,7 +432,11 @@ public void testNodeRolesWithClusterManagerRole() throws ExecutionException, Int
359432
Map<String, Integer> expectedCounts = getExpectedCounts(0, 1, 1, 0, 1, 0, 0);
360433

361434
Client client = client();
362-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
435+
ClusterStatsResponse clusterStatsResponse = client.admin()
436+
.cluster()
437+
.prepareClusterStats()
438+
.useAggregatedNodeLevelResponses(randomBoolean())
439+
.get();
363440
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedCounts);
364441

365442
Set<String> expectedRoles = Set.of(
@@ -383,7 +460,11 @@ public void testNodeRolesWithSeedDataNodeLegacySettings() throws ExecutionExcept
383460
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);
384461

385462
Client client = client();
386-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
463+
ClusterStatsResponse clusterStatsResponse = client.admin()
464+
.cluster()
465+
.prepareClusterStats()
466+
.useAggregatedNodeLevelResponses(randomBoolean())
467+
.get();
387468
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);
388469

389470
Set<String> expectedRoles = Set.of(
@@ -410,7 +491,11 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
410491
Map<String, Integer> expectedRoleCounts = getExpectedCounts(1, 1, 1, 0, 1, 0, 0);
411492

412493
Client client = client();
413-
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get();
494+
ClusterStatsResponse clusterStatsResponse = client.admin()
495+
.cluster()
496+
.prepareClusterStats()
497+
.useAggregatedNodeLevelResponses(randomBoolean())
498+
.get();
414499
assertCounts(clusterStatsResponse.getNodesStats().getCounts(), total, expectedRoleCounts);
415500

416501
Set<Set<String>> expectedNodesRoles = Set.of(

server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java

+50-17
Original file line numberDiff line numberDiff line change
@@ -78,26 +78,49 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
7878
this.segments = new SegmentsStats();
7979

8080
for (ClusterStatsNodeResponse r : nodeResponses) {
81-
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
82-
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
83-
if (indexShardStats == null) {
84-
indexShardStats = new ShardStats();
85-
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
81+
// Aggregated response from the node
82+
if (r.getAggregatedNodeLevelStats() != null) {
83+
84+
for (Map.Entry<String, ClusterStatsNodeResponse.AggregatedIndexStats> entry : r.getAggregatedNodeLevelStats().indexStatsMap
85+
.entrySet()) {
86+
ShardStats indexShardStats = countsPerIndex.get(entry.getKey());
87+
if (indexShardStats == null) {
88+
indexShardStats = new ShardStats(entry.getValue());
89+
countsPerIndex.put(entry.getKey(), indexShardStats);
90+
} else {
91+
indexShardStats.addStatsFrom(entry.getValue());
92+
}
8693
}
8794

88-
indexShardStats.total++;
89-
90-
CommonStats shardCommonStats = shardStats.getStats();
91-
92-
if (shardStats.getShardRouting().primary()) {
93-
indexShardStats.primaries++;
94-
docs.add(shardCommonStats.docs);
95+
docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
96+
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
97+
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
98+
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
99+
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
100+
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
101+
} else {
102+
// Default response from the node
103+
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
104+
ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndexName());
105+
if (indexShardStats == null) {
106+
indexShardStats = new ShardStats();
107+
countsPerIndex.put(shardStats.getShardRouting().getIndexName(), indexShardStats);
108+
}
109+
110+
indexShardStats.total++;
111+
112+
CommonStats shardCommonStats = shardStats.getStats();
113+
114+
if (shardStats.getShardRouting().primary()) {
115+
indexShardStats.primaries++;
116+
docs.add(shardCommonStats.docs);
117+
}
118+
store.add(shardCommonStats.store);
119+
fieldData.add(shardCommonStats.fieldData);
120+
queryCache.add(shardCommonStats.queryCache);
121+
completion.add(shardCommonStats.completion);
122+
segments.add(shardCommonStats.segments);
95123
}
96-
store.add(shardCommonStats.store);
97-
fieldData.add(shardCommonStats.fieldData);
98-
queryCache.add(shardCommonStats.queryCache);
99-
completion.add(shardCommonStats.completion);
100-
segments.add(shardCommonStats.segments);
101124
}
102125
}
103126

@@ -202,6 +225,11 @@ public static class ShardStats implements ToXContentFragment {
202225

203226
public ShardStats() {}
204227

228+
public ShardStats(ClusterStatsNodeResponse.AggregatedIndexStats aggregatedIndexStats) {
229+
this.total = aggregatedIndexStats.total;
230+
this.primaries = aggregatedIndexStats.primaries;
231+
}
232+
205233
/**
206234
* number of indices in the cluster
207235
*/
@@ -329,6 +357,11 @@ public void addIndexShardCount(ShardStats indexShardCount) {
329357
}
330358
}
331359

360+
public void addStatsFrom(ClusterStatsNodeResponse.AggregatedIndexStats incomingStats) {
361+
this.total += incomingStats.total;
362+
this.primaries += incomingStats.primaries;
363+
}
364+
332365
/**
333366
* Inner Fields used for creating XContent and parsing
334367
*

0 commit comments

Comments
 (0)