Skip to content

Commit 3de5d3c

Browse files
authored
Optimize remote state stale file deletion (opensearch-project#13995)
* Optimize remote state stale file deletion Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 156eca3 commit 3de5d3c

File tree

10 files changed

+1071
-443
lines changed

10 files changed

+1071
-443
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
12+
import org.opensearch.common.blobstore.BlobPath;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
15+
import org.opensearch.repositories.RepositoriesService;
16+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
import org.junit.Before;
19+
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Base64;
22+
import java.util.Map;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
26+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
27+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
28+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
29+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
30+
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
31+
32+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
33+
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
34+
35+
private static final String INDEX_NAME = "test-index";
36+
37+
@Before
38+
public void setup() {
39+
asyncUploadMockFsRepo = false;
40+
}
41+
42+
@Override
43+
protected Settings nodeSettings(int nodeOrdinal) {
44+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
45+
}
46+
47+
private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
48+
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
49+
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
50+
assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
51+
ensureGreen(INDEX_NAME);
52+
return indexStats;
53+
}
54+
55+
public void testRemoteCleanupTaskUpdated() {
56+
int shardCount = randomIntBetween(1, 2);
57+
int replicaCount = 1;
58+
int dataNodeCount = shardCount * (replicaCount + 1);
59+
int clusterManagerNodeCount = 1;
60+
61+
initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
62+
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
63+
RemoteClusterStateCleanupManager.class
64+
);
65+
66+
assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
67+
assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
68+
69+
// now disable
70+
client().admin()
71+
.cluster()
72+
.prepareUpdateSettings()
73+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
74+
.get();
75+
76+
assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
77+
assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
78+
79+
// now set Clean up interval to 1 min
80+
client().admin()
81+
.cluster()
82+
.prepareUpdateSettings()
83+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
84+
.get();
85+
assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
86+
}
87+
88+
public void testRemoteCleanupDeleteStale() throws Exception {
89+
int shardCount = randomIntBetween(1, 2);
90+
int replicaCount = 1;
91+
int dataNodeCount = shardCount * (replicaCount + 1);
92+
int clusterManagerNodeCount = 1;
93+
94+
initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
95+
96+
// update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
97+
// to repository, if manifest files are less than that it means clean up has run
98+
updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);
99+
100+
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
101+
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
102+
BlobPath baseMetadataPath = repository.basePath()
103+
.add(
104+
Base64.getUrlEncoder()
105+
.withoutPadding()
106+
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
107+
)
108+
.add("cluster-state")
109+
.add(getClusterState().metadata().clusterUUID());
110+
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
111+
112+
// set cleanup interval to 100 ms to make the test faster
113+
ClusterUpdateSettingsResponse response = client().admin()
114+
.cluster()
115+
.prepareUpdateSettings()
116+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
117+
.get();
118+
119+
assertTrue(response.isAcknowledged());
120+
121+
assertBusy(() -> {
122+
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
123+
logger.info("number of current manifest file: {}", manifestFiles);
124+
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
125+
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
126+
// Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
127+
assertTrue(
128+
"Current number of manifest files: " + manifestFiles,
129+
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
130+
);
131+
}, 500, TimeUnit.MILLISECONDS);
132+
133+
// disable the clean up to avoid race condition during shutdown
134+
response = client().admin()
135+
.cluster()
136+
.prepareUpdateSettings()
137+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "-1"))
138+
.get();
139+
140+
assertTrue(response.isAcknowledged());
141+
}
142+
143+
private void updateClusterStateNTimes(int n) {
144+
int newReplicaCount = randomIntBetween(0, 3);
145+
for (int i = n; i > 0; i--) {
146+
ClusterUpdateSettingsResponse response = client().admin()
147+
.cluster()
148+
.prepareUpdateSettings()
149+
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
150+
.get();
151+
assertTrue(response.isAcknowledged());
152+
}
153+
}
154+
}

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java

-63
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
1212
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
13-
import org.opensearch.cluster.metadata.IndexMetadata;
1413
import org.opensearch.common.blobstore.BlobPath;
1514
import org.opensearch.common.settings.Settings;
1615
import org.opensearch.discovery.DiscoveryStats;
@@ -27,7 +26,6 @@
2726
import java.util.function.Function;
2827
import java.util.stream.Collectors;
2928

30-
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
3129
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
3230
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
3331
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
@@ -51,16 +49,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
5149
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
5250
}
5351

54-
private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
55-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
56-
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
57-
for (String index : indices.split(",")) {
58-
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
59-
ensureYellowAndNoInitializingShards(index);
60-
ensureGreen(index);
61-
}
62-
}
63-
6452
private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
6553
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
6654
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
@@ -69,49 +57,6 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
6957
return indexStats;
7058
}
7159

72-
public void testFullClusterRestoreStaleDelete() throws Exception {
73-
int shardCount = randomIntBetween(1, 2);
74-
int replicaCount = 1;
75-
int dataNodeCount = shardCount * (replicaCount + 1);
76-
int clusterManagerNodeCount = 1;
77-
78-
initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
79-
setReplicaCount(0);
80-
setReplicaCount(2);
81-
setReplicaCount(0);
82-
setReplicaCount(1);
83-
setReplicaCount(0);
84-
setReplicaCount(1);
85-
setReplicaCount(0);
86-
setReplicaCount(2);
87-
setReplicaCount(0);
88-
89-
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
90-
RemoteClusterStateService.class
91-
);
92-
93-
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
94-
95-
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
96-
BlobPath baseMetadataPath = repository.basePath()
97-
.add(
98-
Base64.getUrlEncoder()
99-
.withoutPadding()
100-
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
101-
)
102-
.add("cluster-state")
103-
.add(getClusterState().metadata().clusterUUID());
104-
105-
assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());
106-
107-
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
108-
cluster().getClusterName(),
109-
getClusterState().metadata().clusterUUID()
110-
).getMetadata().getIndices();
111-
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
112-
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
113-
}
114-
11560
public void testRemoteStateStats() {
11661
int shardCount = randomIntBetween(1, 2);
11762
int replicaCount = 1;
@@ -241,12 +186,4 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
241186
assertNotNull(nodesStatsResponse.getNodes().get(0));
242187
assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats());
243188
}
244-
245-
private void setReplicaCount(int replicaCount) {
246-
client().admin()
247-
.indices()
248-
.prepareUpdateSettings(INDEX_NAME)
249-
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
250-
.get();
251-
}
252189
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

+10
Original file line numberDiff line numberDiff line change
@@ -350,4 +350,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
350350
PlainActionFuture.newFuture()
351351
);
352352
}
353+
354+
protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
355+
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
356+
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
357+
for (String index : indices.split(",")) {
358+
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
359+
ensureYellowAndNoInitializingShards(index);
360+
ensureGreen(index);
361+
}
362+
}
353363
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.opensearch.gateway.GatewayService;
105105
import org.opensearch.gateway.PersistedClusterStateService;
106106
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
107+
import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
107108
import org.opensearch.gateway.remote.RemoteClusterStateService;
108109
import org.opensearch.http.HttpTransportSettings;
109110
import org.opensearch.index.IndexModule;
@@ -711,6 +712,7 @@ public void apply(Settings value, Settings current, Settings previous) {
711712
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL,
712713

713714
// Remote cluster state settings
715+
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
714716
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
715717
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
716718
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,

0 commit comments

Comments
 (0)