Skip to content

Commit c38dfef

Browse files
shailendra0811Shailendra Singh
and
Shailendra Singh
authored
Delete stale index routing table files resolves opensearch-project#14162 (opensearch-project#13909). (opensearch-project#14195)
Signed-off-by: Shailendra Singh <singhlhs@amazon.com> Co-authored-by: Shailendra Singh <singhlhs@amazon.com>
1 parent bc39354 commit c38dfef

8 files changed

+293
-9
lines changed

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+12
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
1314
import org.apache.lucene.store.IndexInput;
1415
import org.opensearch.action.LatchedActionListener;
1516
import org.opensearch.cluster.ClusterState;
@@ -297,4 +298,15 @@ protected void doStart() {
297298
@Override
298299
protected void doStop() {}
299300

301+
@Override
302+
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
303+
try {
304+
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);
305+
blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
306+
} catch (IOException e) {
307+
logger.error(() -> new ParameterizedMessage("Failed to delete some stale index routing paths from {}", stalePaths), e);
308+
throw e;
309+
}
310+
}
311+
300312
}

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

+5
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,9 @@ protected void doStop() {
7474
protected void doClose() throws IOException {
7575
// noop
7676
}
77+
78+
@Override
79+
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
80+
// noop
81+
}
7782
}

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

+2
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting
6161
List<String> indicesRoutingToDelete
6262
);
6363

64+
public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException;
65+
6466
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
1414
import org.apache.logging.log4j.util.Strings;
1515
import org.opensearch.cluster.ClusterState;
16+
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
1617
import org.opensearch.cluster.service.ClusterApplierService;
1718
import org.opensearch.cluster.service.ClusterService;
1819
import org.opensearch.common.blobstore.BlobMetadata;
@@ -72,8 +73,13 @@ public class RemoteClusterStateCleanupManager implements Closeable {
7273
private final ThreadPool threadpool;
7374
private final ClusterApplierService clusterApplierService;
7475
private RemoteManifestManager remoteManifestManager;
76+
private final RemoteRoutingTableService remoteRoutingTableService;
7577

76-
public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
78+
public RemoteClusterStateCleanupManager(
79+
RemoteClusterStateService remoteClusterStateService,
80+
ClusterService clusterService,
81+
RemoteRoutingTableService remoteRoutingTableService
82+
) {
7783
this.remoteClusterStateService = remoteClusterStateService;
7884
this.remoteStateStats = remoteClusterStateService.getStats();
7985
ClusterSettings clusterSettings = clusterService.getClusterSettings();
@@ -83,6 +89,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
8389
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
8490
this.lastCleanupAttemptStateVersion = 0;
8591
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
92+
this.remoteRoutingTableService = remoteRoutingTableService;
8693
}
8794

8895
void start() {
@@ -170,6 +177,7 @@ void deleteClusterMetadata(
170177
Set<String> staleManifestPaths = new HashSet<>();
171178
Set<String> staleIndexMetadataPaths = new HashSet<>();
172179
Set<String> staleGlobalMetadataPaths = new HashSet<>();
180+
Set<String> staleIndexRoutingPaths = new HashSet<>();
173181
activeManifestBlobMetadata.forEach(blobMetadata -> {
174182
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
175183
clusterName,
@@ -192,6 +200,10 @@ void deleteClusterMetadata(
192200
.values()
193201
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
194202
}
203+
if (clusterMetadataManifest.getIndicesRouting() != null) {
204+
clusterMetadataManifest.getIndicesRouting()
205+
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
206+
}
195207
});
196208
staleManifestBlobMetadata.forEach(blobMetadata -> {
197209
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
@@ -221,6 +233,19 @@ void deleteClusterMetadata(
221233
.filter(file -> filesToKeep.contains(file) == false)
222234
.forEach(staleGlobalMetadataPaths::add);
223235
}
236+
if (clusterMetadataManifest.getIndicesRouting() != null) {
237+
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
238+
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
239+
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
240+
logger.debug(
241+
() -> new ParameterizedMessage(
242+
"Indices routing paths in stale manifest: {}",
243+
uploadedIndicesRouting.getUploadedFilename()
244+
)
245+
);
246+
}
247+
});
248+
}
224249

225250
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
226251
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
@@ -238,6 +263,15 @@ void deleteClusterMetadata(
238263
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
239264
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
240265
deleteStalePaths(new ArrayList<>(staleManifestPaths));
266+
try {
267+
remoteRoutingTableService.deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
268+
} catch (IOException e) {
269+
logger.error(
270+
() -> new ParameterizedMessage("Error while deleting stale index routing files {}", staleIndexRoutingPaths),
271+
e
272+
);
273+
remoteStateStats.indexRoutingFilesCleanupAttemptFailed();
274+
}
241275
} catch (IllegalStateException e) {
242276
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
243277
} catch (IOException e) {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ public RemoteClusterStateService(
150150
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
151151
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
152152
this.remoteStateStats = new RemotePersistenceStats();
153-
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
154153
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
155154
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings);
155+
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
156156
}
157157

158158
/**

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

+12
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
*/
2020
public class RemotePersistenceStats extends PersistedStateStats {
2121
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
22+
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
2223
static final String REMOTE_UPLOAD = "remote_upload";
2324
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);
2425

26+
private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0);
27+
2528
public RemotePersistenceStats() {
2629
super(REMOTE_UPLOAD);
2730
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
31+
addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount);
2832
}
2933

3034
public void cleanUpAttemptFailed() {
@@ -34,4 +38,12 @@ public void cleanUpAttemptFailed() {
3438
public long getCleanupAttemptFailedCount() {
3539
return cleanupAttemptFailedCount.get();
3640
}
41+
42+
public void indexRoutingFilesCleanupAttemptFailed() {
43+
indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet();
44+
}
45+
46+
public long getIndexRoutingFilesCleanupAttemptFailedCount() {
47+
return indexRoutingFilesCleanupAttemptFailedCount.get();
48+
}
3749
}

server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,14 @@
4747
import org.junit.Before;
4848

4949
import java.io.IOException;
50+
import java.util.Arrays;
5051
import java.util.List;
5152
import java.util.Map;
5253
import java.util.concurrent.ConcurrentHashMap;
5354
import java.util.function.Supplier;
5455

5556
import org.mockito.ArgumentCaptor;
57+
import org.mockito.Mockito;
5658

5759
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX;
5860
import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
@@ -65,6 +67,7 @@
6567
import static org.mockito.ArgumentMatchers.eq;
6668
import static org.mockito.ArgumentMatchers.startsWith;
6769
import static org.mockito.Mockito.doAnswer;
70+
import static org.mockito.Mockito.doNothing;
6871
import static org.mockito.Mockito.doThrow;
6972
import static org.mockito.Mockito.mock;
7073
import static org.mockito.Mockito.times;
@@ -91,14 +94,12 @@ public void setup() {
9194
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
9295
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false)
9396
.build();
94-
9597
blobStoreRepository = mock(BlobStoreRepository.class);
9698
when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
9799
blobStore = mock(BlobStore.class);
98100
blobContainer = mock(BlobContainer.class);
99101
when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository);
100102
when(blobStoreRepository.blobStore()).thenReturn(blobStore);
101-
102103
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
103104
FeatureFlags.initializeFeatureFlags(nodeSettings);
104105

@@ -552,4 +553,28 @@ private BlobPath getPath() {
552553
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64
553554
);
554555
}
556+
557+
public void testDeleteStaleIndexRoutingPaths() throws IOException {
558+
doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any());
559+
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
560+
List<String> stalePaths = Arrays.asList("path1", "path2");
561+
remoteRoutingTableService.doStart();
562+
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
563+
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
564+
}
565+
566+
public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOException {
567+
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
568+
List<String> stalePaths = Arrays.asList("path1", "path2");
569+
// Simulate an IOException
570+
doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList());
571+
572+
remoteRoutingTableService.doStart();
573+
IOException thrown = assertThrows(IOException.class, () -> {
574+
remoteRoutingTableService.deleteStaleIndexRoutingPaths(stalePaths);
575+
});
576+
assertEquals("test exception", thrown.getMessage());
577+
verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths);
578+
}
579+
555580
}

0 commit comments

Comments
 (0)