Skip to content

Commit def0c0c

Browse files
github-actions[bot]Pranshu-S
authored andcommitted
Optimize NodeIndicesStats output behind flag (opensearch-project#14454)
* Optimize NodeIndicesStats output behind flag Signed-off-by: Pranshu Shukla <pranshushukla06@gmail.com> (cherry picked from commit e146f13) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 175cbd0 commit def0c0c

File tree

8 files changed

+904
-30
lines changed

8 files changed

+904
-30
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4444
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
4545
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
4646
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
47+
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))
4748

4849
### Dependencies
4950
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java

+309
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java

+15
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
6868
// Used for metric CACHE_STATS, to determine which caches to report stats for
6969
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
7070
private String[] levels = new String[0];
71+
private boolean includeIndicesStatsByLevel = false;
7172

7273
/**
7374
* @param flags flags to set. If no flags are supplied, default flags will be set.
@@ -106,6 +107,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
106107
includeCaches = in.readEnumSet(CacheType.class);
107108
levels = in.readStringArray();
108109
}
110+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
111+
includeIndicesStatsByLevel = in.readBoolean();
112+
}
109113
}
110114

111115
@Override
@@ -135,6 +139,9 @@ public void writeTo(StreamOutput out) throws IOException {
135139
out.writeEnumSet(includeCaches);
136140
out.writeStringArrayNullable(levels);
137141
}
142+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
143+
out.writeBoolean(includeIndicesStatsByLevel);
144+
}
138145
}
139146

140147
/**
@@ -273,6 +280,14 @@ public boolean includeSegmentFileSizes() {
273280
return this.includeSegmentFileSizes;
274281
}
275282

283+
public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
284+
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
285+
}
286+
287+
public boolean getIncludeIndicesStatsByLevel() {
288+
return this.includeIndicesStatsByLevel;
289+
}
290+
276291
public boolean isSet(Flag flag) {
277292
return flags.contains(flag);
278293
}

server/src/main/java/org/opensearch/indices/IndicesService.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -758,8 +758,12 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
758758
break;
759759
}
760760
}
761-
762-
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
761+
if (flags.getIncludeIndicesStatsByLevel()) {
762+
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
763+
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);
764+
} else {
765+
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
766+
}
763767
}
764768

765769
Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

+171-28
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.indices;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.action.admin.indices.stats.CommonStats;
3637
import org.opensearch.action.admin.indices.stats.IndexShardStats;
3738
import org.opensearch.action.admin.indices.stats.ShardStats;
@@ -63,9 +64,11 @@
6364

6465
import java.io.IOException;
6566
import java.util.ArrayList;
67+
import java.util.Arrays;
6668
import java.util.HashMap;
6769
import java.util.List;
6870
import java.util.Map;
71+
import java.util.Optional;
6972

7073
/**
7174
* Global information on indices stats running on a specific node.
@@ -74,26 +77,27 @@
7477
*/
7578
@PublicApi(since = "1.0.0")
7679
public class NodeIndicesStats implements Writeable, ToXContentFragment {
77-
private CommonStats stats;
78-
private Map<Index, List<IndexShardStats>> statsByShard;
80+
protected CommonStats stats;
81+
protected Map<Index, CommonStats> statsByIndex;
82+
protected Map<Index, List<IndexShardStats>> statsByShard;
7983

8084
public NodeIndicesStats(StreamInput in) throws IOException {
8185
stats = new CommonStats(in);
82-
if (in.readBoolean()) {
83-
int entries = in.readVInt();
84-
statsByShard = new HashMap<>();
85-
for (int i = 0; i < entries; i++) {
86-
Index index = new Index(in);
87-
int indexShardListSize = in.readVInt();
88-
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
89-
for (int j = 0; j < indexShardListSize; j++) {
90-
indexShardStats.add(new IndexShardStats(in));
91-
}
92-
statsByShard.put(index, indexShardStats);
86+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
87+
// contains statsByIndex
88+
if (in.readBoolean()) {
89+
statsByIndex = readStatsByIndex(in);
9390
}
9491
}
92+
if (in.readBoolean()) {
93+
statsByShard = readStatsByShard(in);
94+
}
9595
}
9696

97+
/**
98+
* Without passing the information of the levels to the constructor, we return the Node-level aggregated stats as
99+
* {@link CommonStats} along with a hash-map containing Index to List of Shard Stats.
100+
*/
97101
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats) {
98102
// this.stats = stats;
99103
this.statsByShard = statsByShard;
@@ -112,6 +116,90 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
112116
}
113117
}
114118

119+
/**
120+
* Passing the level information to the nodes allows us to aggregate the stats based on the level passed. This
121+
* allows us to aggregate based on NodeLevel (default - if no level is passed) or Index level if `indices` level is
122+
* passed and finally return the statsByShards map if `shards` level is passed. This allows us to reduce ser/de of
123+
* stats and return only the information that is required while returning to the client.
124+
*/
125+
public NodeIndicesStats(
126+
CommonStats oldStats,
127+
Map<Index, List<IndexShardStats>> statsByShard,
128+
SearchRequestStats searchRequestStats,
129+
StatsLevel level
130+
) {
131+
// make a total common stats from old ones and current ones
132+
this.stats = oldStats;
133+
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
134+
for (IndexShardStats indexShardStats : shardStatsList) {
135+
for (ShardStats shardStats : indexShardStats.getShards()) {
136+
stats.add(shardStats.getStats());
137+
}
138+
}
139+
}
140+
141+
if (this.stats.search != null) {
142+
this.stats.search.setSearchRequestStats(searchRequestStats);
143+
}
144+
145+
if (level != null) {
146+
switch (level) {
147+
case INDICES:
148+
this.statsByIndex = createStatsByIndex(statsByShard);
149+
break;
150+
case SHARDS:
151+
this.statsByShard = statsByShard;
152+
break;
153+
}
154+
}
155+
}
156+
157+
/**
158+
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
159+
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is
160+
* selected based on enum defined in {@link StatsLevel}
161+
*
162+
* Note - we are picking the first level as multiple levels are not supported in the previous versions.
163+
* @param levels - levels sent in the request.
164+
*
165+
* @return Corresponding identified enum {@link StatsLevel}
166+
*/
167+
public static StatsLevel getAcceptedLevel(String[] levels) {
168+
if (levels != null && levels.length > 0) {
169+
Optional<StatsLevel> level = Arrays.stream(StatsLevel.values())
170+
.filter(field -> field.getRestName().equals(levels[0]))
171+
.findFirst();
172+
return level.orElseThrow(() -> new IllegalArgumentException("Level provided is not supported by NodeIndicesStats"));
173+
}
174+
return null;
175+
}
176+
177+
private Map<Index, CommonStats> readStatsByIndex(StreamInput in) throws IOException {
178+
Map<Index, CommonStats> statsByIndex = new HashMap<>();
179+
int indexEntries = in.readVInt();
180+
for (int i = 0; i < indexEntries; i++) {
181+
Index index = new Index(in);
182+
CommonStats commonStats = new CommonStats(in);
183+
statsByIndex.put(index, commonStats);
184+
}
185+
return statsByIndex;
186+
}
187+
188+
private Map<Index, List<IndexShardStats>> readStatsByShard(StreamInput in) throws IOException {
189+
Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
190+
int entries = in.readVInt();
191+
for (int i = 0; i < entries; i++) {
192+
Index index = new Index(in);
193+
int indexShardListSize = in.readVInt();
194+
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
195+
for (int j = 0; j < indexShardListSize; j++) {
196+
indexShardStats.add(new IndexShardStats(in));
197+
}
198+
statsByShard.put(index, indexShardStats);
199+
}
200+
return statsByShard;
201+
}
202+
115203
@Nullable
116204
public StoreStats getStore() {
117205
return stats.getStore();
@@ -195,7 +283,31 @@ public RecoveryStats getRecoveryStats() {
195283
@Override
196284
public void writeTo(StreamOutput out) throws IOException {
197285
stats.writeTo(out);
286+
287+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
288+
out.writeBoolean(statsByIndex != null);
289+
if (statsByIndex != null) {
290+
writeStatsByIndex(out);
291+
}
292+
}
293+
198294
out.writeBoolean(statsByShard != null);
295+
if (statsByShard != null) {
296+
writeStatsByShards(out);
297+
}
298+
}
299+
300+
private void writeStatsByIndex(StreamOutput out) throws IOException {
301+
if (statsByIndex != null) {
302+
out.writeVInt(statsByIndex.size());
303+
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
304+
entry.getKey().writeTo(out);
305+
entry.getValue().writeTo(out);
306+
}
307+
}
308+
}
309+
310+
private void writeStatsByShards(StreamOutput out) throws IOException {
199311
if (statsByShard != null) {
200312
out.writeVInt(statsByShard.size());
201313
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
@@ -210,29 +322,46 @@ public void writeTo(StreamOutput out) throws IOException {
210322

211323
@Override
212324
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
213-
final String level = params.param("level", "node");
214-
final boolean isLevelValid = "indices".equalsIgnoreCase(level)
215-
|| "node".equalsIgnoreCase(level)
216-
|| "shards".equalsIgnoreCase(level);
325+
final String level = params.param("level", StatsLevel.NODE.getRestName());
326+
final boolean isLevelValid = StatsLevel.NODE.getRestName().equalsIgnoreCase(level)
327+
|| StatsLevel.INDICES.getRestName().equalsIgnoreCase(level)
328+
|| StatsLevel.SHARDS.getRestName().equalsIgnoreCase(level);
217329
if (!isLevelValid) {
218-
throw new IllegalArgumentException("level parameter must be one of [indices] or [node] or [shards] but was [" + level + "]");
330+
throw new IllegalArgumentException(
331+
"level parameter must be one of ["
332+
+ StatsLevel.INDICES.getRestName()
333+
+ "] or ["
334+
+ StatsLevel.NODE.getRestName()
335+
+ "] or ["
336+
+ StatsLevel.SHARDS.getRestName()
337+
+ "] but was ["
338+
+ level
339+
+ "]"
340+
);
219341
}
220342

221343
// "node" level
222-
builder.startObject(Fields.INDICES);
344+
builder.startObject(StatsLevel.INDICES.getRestName());
223345
stats.toXContent(builder, params);
224346

225-
if ("indices".equals(level)) {
226-
Map<Index, CommonStats> indexStats = createStatsByIndex();
227-
builder.startObject(Fields.INDICES);
228-
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
347+
if (StatsLevel.INDICES.getRestName().equals(level)) {
348+
assert statsByIndex != null || statsByShard != null : "Expected shard stats or index stats in response for generating ["
349+
+ StatsLevel.INDICES
350+
+ "] field";
351+
if (statsByIndex == null) {
352+
statsByIndex = createStatsByIndex(statsByShard);
353+
}
354+
355+
builder.startObject(StatsLevel.INDICES.getRestName());
356+
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
229357
builder.startObject(entry.getKey().getName());
230358
entry.getValue().toXContent(builder, params);
231359
builder.endObject();
232360
}
233361
builder.endObject();
234-
} else if ("shards".equals(level)) {
235-
builder.startObject("shards");
362+
} else if (StatsLevel.SHARDS.getRestName().equals(level)) {
363+
builder.startObject(StatsLevel.SHARDS.getRestName());
364+
assert statsByShard != null : "Expected shard stats in response for generating [" + StatsLevel.SHARDS + "] field";
236365
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
237366
builder.startArray(entry.getKey().getName());
238367
for (IndexShardStats indexShardStats : entry.getValue()) {
@@ -251,7 +380,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
251380
return builder;
252381
}
253382

254-
private Map<Index, CommonStats> createStatsByIndex() {
383+
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
255384
Map<Index, CommonStats> statsMap = new HashMap<>();
256385
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
257386
if (!statsMap.containsKey(entry.getKey())) {
@@ -281,7 +410,21 @@ public List<IndexShardStats> getShardStats(Index index) {
281410
*
282411
* @opensearch.internal
283412
*/
284-
static final class Fields {
285-
static final String INDICES = "indices";
413+
@PublicApi(since = "3.0.0")
414+
public enum StatsLevel {
415+
INDICES("indices"),
416+
SHARDS("shards"),
417+
NODE("node");
418+
419+
private final String restName;
420+
421+
StatsLevel(String restName) {
422+
this.restName = restName;
423+
}
424+
425+
public String getRestName() {
426+
return restName;
427+
}
428+
286429
}
287430
}

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
233233
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
234234
nodesStatsRequest.indices().setLevels(levels);
235235
nodesStatsRequest.setIncludeDiscoveryNodes(false);
236+
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);
236237

237238
return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
238239
}

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ public void processResponse(final NodesInfoResponse nodesInfoResponse) {
148148
NodesStatsRequest.Metric.PROCESS.metricName(),
149149
NodesStatsRequest.Metric.SCRIPT.metricName()
150150
);
151+
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);
151152
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
152153
@Override
153154
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {

0 commit comments

Comments
 (0)