Skip to content

Commit 81fd088

Browse files
authored
Create Remote Object managers and use them in orchestration from RemoteClusterStateService (opensearch-project#13924)
* Create Remote Object managers and use them in orchestration from RemoteClusterStateService Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent d17e092 commit 81fd088

37 files changed

+1487
-1195
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
import java.util.function.Function;
2727
import java.util.stream.Collectors;
2828

29-
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
30-
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
31-
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
32-
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
3329
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
34-
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
35-
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
30+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
31+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
32+
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
33+
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
34+
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
35+
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
3636

3737
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3838
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
5858
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
5959
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
60+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
6061
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
6162
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
6263
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -326,9 +327,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
326327
// Step - 3 Delete index metadata file in remote
327328
try {
328329
Files.move(
329-
segmentRepoPath.resolve(
330-
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
331-
),
330+
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
332331
segmentRepoPath.resolve("cluster-state/")
333332
);
334333
} catch (IOException e) {
@@ -354,10 +353,7 @@ public void testRemoteStateFullRestart() throws Exception {
354353
try {
355354
Files.move(
356355
segmentRepoPath.resolve(
357-
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
358-
+ "/cluster-state/"
359-
+ prevClusterUUID
360-
+ "/manifest"
356+
encodeString(clusterService().state().getClusterName().value()) + "/cluster-state/" + prevClusterUUID + "/manifest"
361357
),
362358
segmentRepoPath.resolve("cluster-state/")
363359
);

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.opensearch.core.action.ActionListener;
3434
import org.opensearch.core.common.bytes.BytesReference;
3535
import org.opensearch.gateway.remote.ClusterMetadataManifest;
36-
import org.opensearch.gateway.remote.RemoteClusterStateService;
36+
import org.opensearch.gateway.remote.RemoteStateTransferException;
3737
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
3838
import org.opensearch.index.remote.RemoteStoreEnums;
3939
import org.opensearch.index.remote.RemoteStorePathStrategy;
@@ -52,6 +52,7 @@
5252
import java.util.function.Supplier;
5353
import java.util.stream.Collectors;
5454

55+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
5556
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
5657

5758
/**
@@ -87,7 +88,6 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
8788

8889
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
8990
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
90-
public static final String DELIMITER = "__";
9191
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
9292

9393
private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
@@ -175,10 +175,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
175175
)
176176
),
177177
ex -> latchedActionListener.onFailure(
178-
new RemoteClusterStateService.RemoteStateTransferException(
179-
"Exception in writing index to remote store: " + indexRouting.getIndex().toString(),
180-
ex
181-
)
178+
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
182179
)
183180
);
184181

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public AbstractRemoteWritableBlobEntity(
4242

4343
public abstract BlobPathParameters getBlobPathParameters();
4444

45+
public abstract String getType();
46+
4547
public String getFullBlobName() {
4648
return blobName;
4749
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@
181181
import java.util.Set;
182182
import java.util.function.Predicate;
183183

184+
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING;
185+
import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
186+
import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING;
187+
184188
/**
185189
* Encapsulates all valid cluster level settings.
186190
*
@@ -717,9 +721,9 @@ public void apply(Settings value, Settings current, Settings previous) {
717721
// Remote cluster state settings
718722
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
719723
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
720-
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
721-
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
722-
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
724+
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
725+
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
726+
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
723727
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
724728
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
725729
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,

server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -1158,8 +1158,7 @@ public String getComponent() {
11581158
}
11591159

11601160
public String getUploadedFilename() {
1161-
String[] splitPath = uploadedFilename.split("/");
1162-
return splitPath[splitPath.length - 1];
1161+
return uploadedFilename;
11631162
}
11641163

11651164
public String getIndexName() {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

+43-44
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,9 @@
3434
import java.util.Set;
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636

37-
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
38-
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
39-
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
40-
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
41-
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
42-
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
37+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN;
38+
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
39+
import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT;
4340

4441
/**
4542
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
@@ -74,6 +71,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
7471
private long lastCleanupAttemptStateVersion;
7572
private final ThreadPool threadpool;
7673
private final ClusterApplierService clusterApplierService;
74+
private RemoteManifestManager remoteManifestManager;
7775

7876
public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
7977
this.remoteClusterStateService = remoteClusterStateService;
@@ -89,6 +87,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
8987

9088
void start() {
9189
staleFileDeletionTask = new AsyncStaleFileDeletion(this);
90+
remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
9291
}
9392

9493
@Override
@@ -172,13 +171,17 @@ void deleteClusterMetadata(
172171
Set<String> staleIndexMetadataPaths = new HashSet<>();
173172
Set<String> staleGlobalMetadataPaths = new HashSet<>();
174173
activeManifestBlobMetadata.forEach(blobMetadata -> {
175-
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
174+
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
176175
clusterName,
177176
clusterUUID,
178177
blobMetadata.name()
179178
);
180179
clusterMetadataManifest.getIndices()
181-
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
180+
.forEach(
181+
uploadedIndexMetadata -> filesToKeep.add(
182+
RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename())
183+
)
184+
);
182185
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
183186
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
184187
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
@@ -191,43 +194,38 @@ void deleteClusterMetadata(
191194
}
192195
});
193196
staleManifestBlobMetadata.forEach(blobMetadata -> {
194-
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
197+
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
195198
clusterName,
196199
clusterUUID,
197200
blobMetadata.name()
198201
);
199-
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
202+
staleManifestPaths.add(
203+
remoteManifestManager.getManifestFolderPath(clusterName, clusterUUID).buildAsString() + blobMetadata.name()
204+
);
200205
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
201206
addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
202207
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
203-
addStaleGlobalMetadataPath(
204-
clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
205-
filesToKeep,
206-
staleGlobalMetadataPaths
207-
);
208-
addStaleGlobalMetadataPath(
209-
clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(),
210-
filesToKeep,
211-
staleGlobalMetadataPaths
212-
);
213-
addStaleGlobalMetadataPath(
214-
clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(),
215-
filesToKeep,
216-
staleGlobalMetadataPaths
217-
);
208+
if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) {
209+
staleGlobalMetadataPaths.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
210+
}
211+
if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) {
212+
staleGlobalMetadataPaths.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
213+
}
214+
if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) {
215+
staleGlobalMetadataPaths.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
216+
}
218217
clusterMetadataManifest.getCustomMetadataMap()
219218
.values()
220-
.forEach(
221-
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
222-
);
219+
.stream()
220+
.map(ClusterMetadataManifest.UploadedMetadataAttribute::getUploadedFilename)
221+
.filter(file -> filesToKeep.contains(file) == false)
222+
.forEach(staleGlobalMetadataPaths::add);
223223
}
224224

225225
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
226-
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
227-
staleIndexMetadataPaths.add(
228-
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
229-
+ INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
230-
);
226+
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
227+
if (filesToKeep.contains(fileName) == false) {
228+
staleIndexMetadataPaths.add(fileName);
231229
}
232230
});
233231
});
@@ -237,9 +235,9 @@ void deleteClusterMetadata(
237235
return;
238236
}
239237

240-
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
241-
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
242-
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
238+
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
239+
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
240+
deleteStalePaths(new ArrayList<>(staleManifestPaths));
243241
} catch (IllegalStateException e) {
244242
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
245243
} catch (IOException e) {
@@ -267,8 +265,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
267265
try {
268266
getBlobStoreTransferService().listAllInSortedOrderAsync(
269267
ThreadPool.Names.REMOTE_PURGE,
270-
remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
271-
MANIFEST_FILE_PREFIX,
268+
remoteManifestManager.getManifestFolderPath(clusterName, clusterUUID),
269+
MANIFEST,
272270
Integer.MAX_VALUE,
273271
new ActionListener<>() {
274272
@Override
@@ -312,7 +310,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
312310
clusterUUIDs.forEach(
313311
clusterUUID -> getBlobStoreTransferService().deleteAsync(
314312
ThreadPool.Names.REMOTE_PURGE,
315-
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
313+
RemoteClusterStateUtils.getClusterMetadataBasePath(
314+
remoteClusterStateService.getBlobStoreRepository(),
315+
clusterName,
316+
clusterUUID
317+
),
316318
new ActionListener<>() {
317319
@Override
318320
public void onResponse(Void unused) {
@@ -336,12 +338,9 @@ public void onFailure(Exception e) {
336338
}
337339

338340
// package private for testing
339-
void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
341+
void deleteStalePaths(List<String> stalePaths) throws IOException {
340342
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
341-
getBlobStoreTransferService().deleteBlobs(
342-
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
343-
stalePaths
344-
);
343+
getBlobStoreTransferService().deleteBlobs(BlobPath.cleanPath(), stalePaths);
345344
}
346345

347346
/**

0 commit comments

Comments
 (0)