Skip to content

Commit be9f942

Browse files
authored
[SnapshotV2] Add timestamp of last successful fetch of pinned timestamps in node stats (#15611)
--------- Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
1 parent 1ef6444 commit be9f942

File tree

13 files changed

+187
-7
lines changed

13 files changed

+187
-7
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1515
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
1616
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
1717
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
18+
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
1819

1920
### Dependencies
2021
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java

+41
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
package org.opensearch.remotestore;
1010

1111
import org.opensearch.action.LatchedActionListener;
12+
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
13+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1214
import org.opensearch.common.collect.Tuple;
1315
import org.opensearch.common.settings.Settings;
1416
import org.opensearch.common.unit.TimeValue;
@@ -20,6 +22,8 @@
2022
import java.util.Set;
2123
import java.util.concurrent.CountDownLatch;
2224

25+
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;
26+
2327
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2428
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
2529
static final String INDEX_NAME = "remote-store-test-idx-1";
@@ -180,4 +184,41 @@ public void onFailure(Exception e) {
180184
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
181185
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
182186
}
187+
188+
public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
189+
logger.info("Starting up cluster manager");
190+
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
191+
logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute");
192+
Settings pinnedTimestampEnabledSettings = Settings.builder()
193+
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
194+
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m")
195+
.build();
196+
internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings);
197+
String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0);
198+
ensureStableCluster(2);
199+
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
200+
RemoteStorePinnedTimestampService.class,
201+
remoteNodeName
202+
);
203+
204+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
205+
206+
assertBusy(() -> {
207+
long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
208+
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
209+
NodesStatsResponse nodesStatsResponse = internalCluster().client()
210+
.admin()
211+
.cluster()
212+
.prepareNodesStats()
213+
.addMetric(REMOTE_STORE.metricName())
214+
.execute()
215+
.actionGet();
216+
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
217+
long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps();
218+
assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps);
219+
}
220+
});
221+
222+
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
223+
}
183224
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.monitor.process.ProcessStats;
6060
import org.opensearch.node.AdaptiveSelectionStats;
6161
import org.opensearch.node.NodesResourceUsageStats;
62+
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
6263
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
6364
import org.opensearch.repositories.RepositoriesStats;
6465
import org.opensearch.script.ScriptCacheStats;
@@ -162,6 +163,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
162163
@Nullable
163164
private NodeCacheStats nodeCacheStats;
164165

166+
@Nullable
167+
private RemoteStoreNodeStats remoteStoreNodeStats;
168+
165169
public NodeStats(StreamInput in) throws IOException {
166170
super(in);
167171
timestamp = in.readVLong();
@@ -243,6 +247,12 @@ public NodeStats(StreamInput in) throws IOException {
243247
} else {
244248
nodeCacheStats = null;
245249
}
250+
// TODO: change version to V_2_18_0
251+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
252+
remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new);
253+
} else {
254+
remoteStoreNodeStats = null;
255+
}
246256
}
247257

248258
public NodeStats(
@@ -274,7 +284,8 @@ public NodeStats(
274284
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
275285
@Nullable RepositoriesStats repositoriesStats,
276286
@Nullable AdmissionControlStats admissionControlStats,
277-
@Nullable NodeCacheStats nodeCacheStats
287+
@Nullable NodeCacheStats nodeCacheStats,
288+
@Nullable RemoteStoreNodeStats remoteStoreNodeStats
278289
) {
279290
super(node);
280291
this.timestamp = timestamp;
@@ -305,6 +316,7 @@ public NodeStats(
305316
this.repositoriesStats = repositoriesStats;
306317
this.admissionControlStats = admissionControlStats;
307318
this.nodeCacheStats = nodeCacheStats;
319+
this.remoteStoreNodeStats = remoteStoreNodeStats;
308320
}
309321

310322
public long getTimestamp() {
@@ -467,6 +479,11 @@ public NodeCacheStats getNodeCacheStats() {
467479
return nodeCacheStats;
468480
}
469481

482+
@Nullable
483+
public RemoteStoreNodeStats getRemoteStoreNodeStats() {
484+
return remoteStoreNodeStats;
485+
}
486+
470487
@Override
471488
public void writeTo(StreamOutput out) throws IOException {
472489
super.writeTo(out);
@@ -525,6 +542,10 @@ public void writeTo(StreamOutput out) throws IOException {
525542
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
526543
out.writeOptionalWriteable(nodeCacheStats);
527544
}
545+
// TODO: change version to V_2_18_0
546+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
547+
out.writeOptionalWriteable(remoteStoreNodeStats);
548+
}
528549
}
529550

530551
@Override
@@ -631,6 +652,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
631652
if (getNodeCacheStats() != null) {
632653
getNodeCacheStats().toXContent(builder, params);
633654
}
655+
if (getRemoteStoreNodeStats() != null) {
656+
getRemoteStoreNodeStats().toXContent(builder, params);
657+
}
634658
return builder;
635659
}
636660
}

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ public enum Metric {
220220
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
221221
REPOSITORIES("repositories"),
222222
ADMISSION_CONTROL("admission_control"),
223-
CACHE_STATS("caches");
223+
CACHE_STATS("caches"),
224+
REMOTE_STORE("remote_store");
224225

225226
private String metricName;
226227

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
129129
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
130130
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
131131
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
132-
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics)
132+
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics),
133+
NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics)
133134
);
134135
}
135136

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
174174
false,
175175
false,
176176
false,
177+
false,
177178
false
178179
);
179180
List<ShardStats> shardsStats = new ArrayList<>();

server/src/main/java/org/opensearch/node/NodeService.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.opensearch.indices.IndicesService;
5555
import org.opensearch.ingest.IngestService;
5656
import org.opensearch.monitor.MonitorService;
57+
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
5758
import org.opensearch.plugins.PluginsService;
5859
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
5960
import org.opensearch.repositories.RepositoriesService;
@@ -241,7 +242,8 @@ public NodeStats stats(
241242
boolean segmentReplicationTrackerStats,
242243
boolean repositoriesStats,
243244
boolean admissionControl,
244-
boolean cacheService
245+
boolean cacheService,
246+
boolean remoteStoreNodeStats
245247
) {
246248
// for indices stats we want to include previous allocated shards stats as well (it will
247249
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
@@ -274,7 +276,8 @@ public NodeStats stats(
274276
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
275277
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
276278
admissionControl ? this.admissionControlService.stats() : null,
277-
cacheService ? this.cacheService.stats(indices) : null
279+
cacheService ? this.cacheService.stats(indices) : null,
280+
remoteStoreNodeStats ? new RemoteStoreNodeStats() : null
278281
);
279282
}
280283

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.node.remotestore;
10+
11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.common.io.stream.Writeable;
14+
import org.opensearch.core.xcontent.ToXContentFragment;
15+
import org.opensearch.core.xcontent.XContentBuilder;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* Node level remote store stats
22+
* @opensearch.internal
23+
*/
24+
public class RemoteStoreNodeStats implements Writeable, ToXContentFragment {
25+
26+
public static final String STATS_NAME = "remote_store";
27+
public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps";
28+
29+
/**
30+
* Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService}
31+
*/
32+
private final long lastSuccessfulFetchOfPinnedTimestamps;
33+
34+
public RemoteStoreNodeStats() {
35+
this.lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
36+
}
37+
38+
public long getLastSuccessfulFetchOfPinnedTimestamps() {
39+
return this.lastSuccessfulFetchOfPinnedTimestamps;
40+
}
41+
42+
public RemoteStoreNodeStats(StreamInput in) throws IOException {
43+
this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong();
44+
}
45+
46+
@Override
47+
public void writeTo(StreamOutput out) throws IOException {
48+
out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps);
49+
}
50+
51+
@Override
52+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
53+
builder.startObject(STATS_NAME);
54+
builder.field(LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS, this.lastSuccessfulFetchOfPinnedTimestamps);
55+
return builder.endObject();
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}";
61+
}
62+
63+
@Override
64+
public boolean equals(Object o) {
65+
if (o == null) {
66+
return false;
67+
}
68+
if (o.getClass() != RemoteStoreNodeStats.class) {
69+
return false;
70+
}
71+
RemoteStoreNodeStats other = (RemoteStoreNodeStats) o;
72+
return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps;
73+
}
74+
75+
@Override
76+
public int hashCode() {
77+
return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps);
78+
}
79+
}

server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import org.opensearch.node.NodeResourceUsageStats;
9696
import org.opensearch.node.NodesResourceUsageStats;
9797
import org.opensearch.node.ResponseCollectorService;
98+
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
9899
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
99100
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
100101
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
@@ -614,6 +615,14 @@ public void testSerialization() throws IOException {
614615
} else {
615616
assertEquals(nodeCacheStats, deserializedNodeCacheStats);
616617
}
618+
619+
RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats();
620+
RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats();
621+
if (remoteStoreNodeStats == null) {
622+
assertNull(deserializedRemoteStoreNodeStats);
623+
} else {
624+
assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats);
625+
}
617626
}
618627
}
619628
}
@@ -996,6 +1005,16 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
9961005
nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags);
9971006
}
9981007

1008+
RemoteStoreNodeStats remoteStoreNodeStats = null;
1009+
if (frequently()) {
1010+
remoteStoreNodeStats = new RemoteStoreNodeStats() {
1011+
@Override
1012+
public long getLastSuccessfulFetchOfPinnedTimestamps() {
1013+
return 123456L;
1014+
}
1015+
};
1016+
}
1017+
9991018
// TODO: Only remote_store based aspects of NodeIndicesStats are being tested here.
10001019
// It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now
10011020
return new NodeStats(
@@ -1027,7 +1046,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
10271046
segmentReplicationRejectionStats,
10281047
null,
10291048
admissionControlStats,
1030-
nodeCacheStats
1049+
nodeCacheStats,
1050+
remoteStoreNodeStats
10311051
);
10321052
}
10331053

server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse(
349349
null,
350350
null,
351351
null,
352+
null,
352353
null
353354
);
354355
if (defaultBehavior) {

server/src/test/java/org/opensearch/cluster/DiskUsageTests.java

+6
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ public void testFillDiskUsage() {
195195
null,
196196
null,
197197
null,
198+
null,
198199
null
199200
),
200201
new NodeStats(
@@ -226,6 +227,7 @@ public void testFillDiskUsage() {
226227
null,
227228
null,
228229
null,
230+
null,
229231
null
230232
),
231233
new NodeStats(
@@ -257,6 +259,7 @@ public void testFillDiskUsage() {
257259
null,
258260
null,
259261
null,
262+
null,
260263
null
261264
)
262265
);
@@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() {
319322
null,
320323
null,
321324
null,
325+
null,
322326
null
323327
),
324328
new NodeStats(
@@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() {
350354
null,
351355
null,
352356
null,
357+
null,
353358
null
354359
),
355360
new NodeStats(
@@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() {
381386
null,
382387
null,
383388
null,
389+
null,
384390
null
385391
)
386392
);

0 commit comments

Comments
 (0)