34
34
import java .util .Set ;
35
35
import java .util .concurrent .atomic .AtomicBoolean ;
36
36
37
- import static org .opensearch .gateway .remote .RemoteClusterStateService .GLOBAL_METADATA_FORMAT ;
38
- import static org .opensearch .gateway .remote .RemoteClusterStateService .GLOBAL_METADATA_PATH_TOKEN ;
39
- import static org .opensearch .gateway .remote .RemoteClusterStateService .INDEX_METADATA_FORMAT ;
40
- import static org .opensearch .gateway .remote .RemoteClusterStateService .INDEX_PATH_TOKEN ;
41
- import static org .opensearch .gateway .remote .RemoteClusterStateService .MANIFEST_FILE_PREFIX ;
42
- import static org .opensearch .gateway .remote .RemoteClusterStateService .MANIFEST_PATH_TOKEN ;
37
+ import static org .opensearch .gateway .remote .RemoteClusterStateUtils .GLOBAL_METADATA_PATH_TOKEN ;
38
+ import static org .opensearch .gateway .remote .model .RemoteClusterMetadataManifest .MANIFEST ;
39
+ import static org .opensearch .gateway .remote .model .RemoteGlobalMetadata .GLOBAL_METADATA_FORMAT ;
43
40
44
41
/**
45
42
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
@@ -74,6 +71,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
74
71
private long lastCleanupAttemptStateVersion ;
75
72
private final ThreadPool threadpool ;
76
73
private final ClusterApplierService clusterApplierService ;
74
+ private RemoteManifestManager remoteManifestManager ;
77
75
78
76
public RemoteClusterStateCleanupManager (RemoteClusterStateService remoteClusterStateService , ClusterService clusterService ) {
79
77
this .remoteClusterStateService = remoteClusterStateService ;
@@ -89,6 +87,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
89
87
90
88
void start () {
91
89
staleFileDeletionTask = new AsyncStaleFileDeletion (this );
90
+ remoteManifestManager = remoteClusterStateService .getRemoteManifestManager ();
92
91
}
93
92
94
93
@ Override
@@ -172,13 +171,17 @@ void deleteClusterMetadata(
172
171
Set <String > staleIndexMetadataPaths = new HashSet <>();
173
172
Set <String > staleGlobalMetadataPaths = new HashSet <>();
174
173
activeManifestBlobMetadata .forEach (blobMetadata -> {
175
- ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService .fetchRemoteClusterMetadataManifest (
174
+ ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager .fetchRemoteClusterMetadataManifest (
176
175
clusterName ,
177
176
clusterUUID ,
178
177
blobMetadata .name ()
179
178
);
180
179
clusterMetadataManifest .getIndices ()
181
- .forEach (uploadedIndexMetadata -> filesToKeep .add (uploadedIndexMetadata .getUploadedFilename ()));
180
+ .forEach (
181
+ uploadedIndexMetadata -> filesToKeep .add (
182
+ RemoteClusterStateUtils .getFormattedIndexFileName (uploadedIndexMetadata .getUploadedFilename ())
183
+ )
184
+ );
182
185
if (clusterMetadataManifest .getCodecVersion () == ClusterMetadataManifest .CODEC_V1 ) {
183
186
filesToKeep .add (clusterMetadataManifest .getGlobalMetadataFileName ());
184
187
} else if (clusterMetadataManifest .getCodecVersion () >= ClusterMetadataManifest .CODEC_V2 ) {
@@ -191,43 +194,38 @@ void deleteClusterMetadata(
191
194
}
192
195
});
193
196
staleManifestBlobMetadata .forEach (blobMetadata -> {
194
- ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService .fetchRemoteClusterMetadataManifest (
197
+ ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager .fetchRemoteClusterMetadataManifest (
195
198
clusterName ,
196
199
clusterUUID ,
197
200
blobMetadata .name ()
198
201
);
199
- staleManifestPaths .add (new BlobPath ().add (MANIFEST_PATH_TOKEN ).buildAsString () + blobMetadata .name ());
202
+ staleManifestPaths .add (
203
+ remoteManifestManager .getManifestFolderPath (clusterName , clusterUUID ).buildAsString () + blobMetadata .name ()
204
+ );
200
205
if (clusterMetadataManifest .getCodecVersion () == ClusterMetadataManifest .CODEC_V1 ) {
201
206
addStaleGlobalMetadataPath (clusterMetadataManifest .getGlobalMetadataFileName (), filesToKeep , staleGlobalMetadataPaths );
202
207
} else if (clusterMetadataManifest .getCodecVersion () >= ClusterMetadataManifest .CODEC_V2 ) {
203
- addStaleGlobalMetadataPath (
204
- clusterMetadataManifest .getCoordinationMetadata ().getUploadedFilename (),
205
- filesToKeep ,
206
- staleGlobalMetadataPaths
207
- );
208
- addStaleGlobalMetadataPath (
209
- clusterMetadataManifest .getSettingsMetadata ().getUploadedFilename (),
210
- filesToKeep ,
211
- staleGlobalMetadataPaths
212
- );
213
- addStaleGlobalMetadataPath (
214
- clusterMetadataManifest .getTemplatesMetadata ().getUploadedFilename (),
215
- filesToKeep ,
216
- staleGlobalMetadataPaths
217
- );
208
+ if (filesToKeep .contains (clusterMetadataManifest .getCoordinationMetadata ().getUploadedFilename ()) == false ) {
209
+ staleGlobalMetadataPaths .add (clusterMetadataManifest .getCoordinationMetadata ().getUploadedFilename ());
210
+ }
211
+ if (filesToKeep .contains (clusterMetadataManifest .getSettingsMetadata ().getUploadedFilename ()) == false ) {
212
+ staleGlobalMetadataPaths .add (clusterMetadataManifest .getSettingsMetadata ().getUploadedFilename ());
213
+ }
214
+ if (filesToKeep .contains (clusterMetadataManifest .getTemplatesMetadata ().getUploadedFilename ()) == false ) {
215
+ staleGlobalMetadataPaths .add (clusterMetadataManifest .getTemplatesMetadata ().getUploadedFilename ());
216
+ }
218
217
clusterMetadataManifest .getCustomMetadataMap ()
219
218
.values ()
220
- .forEach (
221
- attribute -> addStaleGlobalMetadataPath (attribute .getUploadedFilename (), filesToKeep , staleGlobalMetadataPaths )
222
- );
219
+ .stream ()
220
+ .map (ClusterMetadataManifest .UploadedMetadataAttribute ::getUploadedFilename )
221
+ .filter (file -> filesToKeep .contains (file ) == false )
222
+ .forEach (staleGlobalMetadataPaths ::add );
223
223
}
224
224
225
225
clusterMetadataManifest .getIndices ().forEach (uploadedIndexMetadata -> {
226
- if (filesToKeep .contains (uploadedIndexMetadata .getUploadedFilename ()) == false ) {
227
- staleIndexMetadataPaths .add (
228
- new BlobPath ().add (INDEX_PATH_TOKEN ).add (uploadedIndexMetadata .getIndexUUID ()).buildAsString ()
229
- + INDEX_METADATA_FORMAT .blobName (uploadedIndexMetadata .getUploadedFilename ())
230
- );
226
+ String fileName = RemoteClusterStateUtils .getFormattedIndexFileName (uploadedIndexMetadata .getUploadedFilename ());
227
+ if (filesToKeep .contains (fileName ) == false ) {
228
+ staleIndexMetadataPaths .add (fileName );
231
229
}
232
230
});
233
231
});
@@ -237,9 +235,9 @@ void deleteClusterMetadata(
237
235
return ;
238
236
}
239
237
240
- deleteStalePaths (clusterName , clusterUUID , new ArrayList <>(staleGlobalMetadataPaths ));
241
- deleteStalePaths (clusterName , clusterUUID , new ArrayList <>(staleIndexMetadataPaths ));
242
- deleteStalePaths (clusterName , clusterUUID , new ArrayList <>(staleManifestPaths ));
238
+ deleteStalePaths (new ArrayList <>(staleGlobalMetadataPaths ));
239
+ deleteStalePaths (new ArrayList <>(staleIndexMetadataPaths ));
240
+ deleteStalePaths (new ArrayList <>(staleManifestPaths ));
243
241
} catch (IllegalStateException e ) {
244
242
logger .error ("Error while fetching Remote Cluster Metadata manifests" , e );
245
243
} catch (IOException e ) {
@@ -267,8 +265,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
267
265
try {
268
266
getBlobStoreTransferService ().listAllInSortedOrderAsync (
269
267
ThreadPool .Names .REMOTE_PURGE ,
270
- remoteClusterStateService .getManifestFolderPath (clusterName , clusterUUID ),
271
- MANIFEST_FILE_PREFIX ,
268
+ remoteManifestManager .getManifestFolderPath (clusterName , clusterUUID ),
269
+ MANIFEST ,
272
270
Integer .MAX_VALUE ,
273
271
new ActionListener <>() {
274
272
@ Override
@@ -312,7 +310,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
312
310
clusterUUIDs .forEach (
313
311
clusterUUID -> getBlobStoreTransferService ().deleteAsync (
314
312
ThreadPool .Names .REMOTE_PURGE ,
315
- remoteClusterStateService .getCusterMetadataBasePath (clusterName , clusterUUID ),
313
+ RemoteClusterStateUtils .getClusterMetadataBasePath (
314
+ remoteClusterStateService .getBlobStoreRepository (),
315
+ clusterName ,
316
+ clusterUUID
317
+ ),
316
318
new ActionListener <>() {
317
319
@ Override
318
320
public void onResponse (Void unused ) {
@@ -336,12 +338,9 @@ public void onFailure(Exception e) {
336
338
}
337
339
338
340
// package private for testing
339
- void deleteStalePaths (String clusterName , String clusterUUID , List <String > stalePaths ) throws IOException {
341
+ void deleteStalePaths (List <String > stalePaths ) throws IOException {
340
342
logger .debug (String .format (Locale .ROOT , "Deleting stale files from remote - %s" , stalePaths ));
341
- getBlobStoreTransferService ().deleteBlobs (
342
- remoteClusterStateService .getCusterMetadataBasePath (clusterName , clusterUUID ),
343
- stalePaths
344
- );
343
+ getBlobStoreTransferService ().deleteBlobs (BlobPath .cleanPath (), stalePaths );
345
344
}
346
345
347
346
/**
0 commit comments