Skip to content

Commit b54e867

Browse files
authored
[Remote Publication] Add remote download stats (opensearch-project#15291)
--------- Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent c37d5f6 commit b54e867

15 files changed

+412
-119
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
4141
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
4242
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
43+
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
4344

4445
### Dependencies
4546
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Base64;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.Objects;
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.atomic.AtomicLong;
3637

@@ -40,6 +41,7 @@
4041
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
4142
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
4243
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
44+
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
4345
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
4446
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
4547
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
253255
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
254256
assertNotNull(discoveryStats.getClusterStateStats());
255257
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
256-
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
257-
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
258-
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
259-
.get();
260-
assertEquals(0, cleanupAttemptFailedCount);
258+
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
259+
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
260+
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
261+
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
262+
.get();
263+
assertEquals(0, cleanupAttemptFailedCount);
264+
}
261265
}
262266
}
263267

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

+35
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99
package org.opensearch.gateway.remote;
1010

11+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
12+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1113
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1214
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1315
import org.opensearch.client.Client;
1416
import org.opensearch.common.blobstore.BlobPath;
1517
import org.opensearch.common.settings.Settings;
1618
import org.opensearch.common.util.FeatureFlags;
19+
import org.opensearch.discovery.DiscoveryStats;
1720
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
1821
import org.opensearch.indices.recovery.RecoverySettings;
1922
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
@@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
155158
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
156159
}
157160

161+
public void testRemotePublicationDownloadStats() {
162+
int shardCount = randomIntBetween(1, 2);
163+
int replicaCount = 1;
164+
int dataNodeCount = shardCount * (replicaCount + 1);
165+
int clusterManagerNodeCount = 1;
166+
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
167+
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);
168+
169+
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
170+
.cluster()
171+
.prepareNodesStats(dataNode)
172+
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
173+
.get();
174+
175+
assertDataNodeDownloadStats(nodesStatsResponseDataNode);
176+
177+
}
178+
179+
private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
180+
// assert cluster state stats for data node
181+
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
182+
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
183+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
184+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
185+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
186+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);
187+
188+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
189+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
190+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
191+
}
192+
158193
private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
159194
BlobPath metadataPath = repository.basePath()
160195
.add(

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+4
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,10 @@ public DiscoveryStats stats() {
901901
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
902902
}
903903
});
904+
if (coordinationState.get().isRemotePublicationEnabled()) {
905+
stats.add(publicationHandler.getFullDownloadStats());
906+
stats.add(publicationHandler.getDiffDownloadStats());
907+
}
904908
clusterStateStats.setPersistenceStats(stats);
905909
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
906910
}

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

+78-61
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,14 @@ public PublishClusterStateStats stats() {
178178
);
179179
}
180180

181+
public PersistedStateStats getFullDownloadStats() {
182+
return remoteClusterStateService.getFullDownloadStats();
183+
}
184+
185+
public PersistedStateStats getDiffDownloadStats() {
186+
return remoteClusterStateService.getDiffDownloadStats();
187+
}
188+
181189
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
182190
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
183191
ClusterState incomingState;
@@ -231,69 +239,78 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
231239
}
232240

233241
// package private for testing
234-
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
235-
if (transportService.getLocalNode().equals(request.getSourceNode())) {
236-
return acceptRemoteStateOnLocalNode(request);
237-
}
238-
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
239-
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
240-
request.getClusterUUID(),
241-
request.getManifestFile()
242-
);
243-
if (manifest == null) {
244-
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
245-
}
242+
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
246243
boolean applyFullState = false;
247-
final ClusterState lastSeen = lastSeenClusterState.get();
248-
if (lastSeen == null) {
249-
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
250-
applyFullState = true;
251-
} else if (manifest.getDiffManifest() == null) {
252-
logger.trace(() -> "There is no diff in the manifest");
253-
applyFullState = true;
254-
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
255-
logger.debug(() -> "Last cluster state not compatible with the diff");
256-
applyFullState = true;
257-
}
258-
259-
if (applyFullState == true) {
260-
logger.debug(
261-
() -> new ParameterizedMessage(
262-
"Downloading full cluster state for term {}, version {}, stateUUID {}",
263-
manifest.getClusterTerm(),
264-
manifest.getStateVersion(),
265-
manifest.getStateUUID()
266-
)
267-
);
268-
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
269-
request.getClusterName(),
270-
manifest,
271-
transportService.getLocalNode().getId(),
272-
true
273-
);
274-
fullClusterStateReceivedCount.incrementAndGet();
275-
final PublishWithJoinResponse response = acceptState(clusterState);
276-
lastSeenClusterState.set(clusterState);
277-
return response;
278-
} else {
279-
logger.debug(
280-
() -> new ParameterizedMessage(
281-
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
282-
manifest.getClusterTerm(),
283-
manifest.getStateVersion(),
284-
manifest.getDiffManifest().getFromStateUUID(),
285-
manifest.getStateUUID()
286-
)
287-
);
288-
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
289-
manifest,
290-
lastSeen,
291-
transportService.getLocalNode().getId()
244+
try {
245+
if (transportService.getLocalNode().equals(request.getSourceNode())) {
246+
return acceptRemoteStateOnLocalNode(request);
247+
}
248+
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
249+
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
250+
request.getClusterUUID(),
251+
request.getManifestFile()
292252
);
293-
compatibleClusterStateDiffReceivedCount.incrementAndGet();
294-
final PublishWithJoinResponse response = acceptState(clusterState);
295-
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
296-
return response;
253+
if (manifest == null) {
254+
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
255+
}
256+
final ClusterState lastSeen = lastSeenClusterState.get();
257+
if (lastSeen == null) {
258+
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
259+
applyFullState = true;
260+
} else if (manifest.getDiffManifest() == null) {
261+
logger.debug(() -> "There is no diff in the manifest");
262+
applyFullState = true;
263+
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
264+
logger.debug(() -> "Last cluster state not compatible with the diff");
265+
applyFullState = true;
266+
}
267+
268+
if (applyFullState == true) {
269+
logger.debug(
270+
() -> new ParameterizedMessage(
271+
"Downloading full cluster state for term {}, version {}, stateUUID {}",
272+
manifest.getClusterTerm(),
273+
manifest.getStateVersion(),
274+
manifest.getStateUUID()
275+
)
276+
);
277+
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
278+
request.getClusterName(),
279+
manifest,
280+
transportService.getLocalNode().getId(),
281+
true
282+
);
283+
fullClusterStateReceivedCount.incrementAndGet();
284+
final PublishWithJoinResponse response = acceptState(clusterState);
285+
lastSeenClusterState.set(clusterState);
286+
return response;
287+
} else {
288+
logger.debug(
289+
() -> new ParameterizedMessage(
290+
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
291+
manifest.getClusterTerm(),
292+
manifest.getStateVersion(),
293+
manifest.getDiffManifest().getFromStateUUID(),
294+
manifest.getStateUUID()
295+
)
296+
);
297+
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
298+
manifest,
299+
lastSeen,
300+
transportService.getLocalNode().getId()
301+
);
302+
compatibleClusterStateDiffReceivedCount.incrementAndGet();
303+
final PublishWithJoinResponse response = acceptState(clusterState);
304+
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
305+
return response;
306+
}
307+
} catch (Exception e) {
308+
if (applyFullState) {
309+
remoteClusterStateService.fullDownloadFailed();
310+
} else {
311+
remoteClusterStateService.diffDownloadFailed();
312+
}
313+
throw e;
297314
}
298315
}
299316

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
746746

747747
@Override
748748
public PersistedStateStats getStats() {
749-
return remoteClusterStateService.getStats();
749+
return remoteClusterStateService.getUploadStats();
750750
}
751751

752752
private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public RemoteClusterStateCleanupManager(
8181
RemoteRoutingTableService remoteRoutingTableService
8282
) {
8383
this.remoteClusterStateService = remoteClusterStateService;
84-
this.remoteStateStats = remoteClusterStateService.getStats();
84+
this.remoteStateStats = remoteClusterStateService.getRemoteStateStats();
8585
ClusterSettings clusterSettings = clusterService.getClusterSettings();
8686
this.clusterApplierService = clusterService.getClusterApplierService();
8787
this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);

0 commit comments

Comments
 (0)