Skip to content

Commit d0467b3

Browse files
harishbhakuniHarish Bhakuni
and
Harish Bhakuni
authoredMar 15, 2024··
Simplify remote directory cleanup after snapshot delete to … (#12672)
* Simplify remote directory cleanup after snapshot delete to avoid concurrent cleanup task runs for same shard. Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com> * Address PR Comments. Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com> --------- Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com> Co-authored-by: Harish Bhakuni <hbhakuni@amazon.com>
1 parent fcecd00 commit d0467b3

File tree

3 files changed

+24
-42
lines changed

3 files changed

+24
-42
lines changed
 

‎server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -1153,11 +1153,12 @@ protected void releaseRemoteStoreLockAndCleanup(
11531153
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
11541154
if (!isIndexPresent(clusterService, indexUUID)) {
11551155
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
1156-
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
1157-
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
1158-
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
1159-
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
1160-
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
1156+
// index is already deleted. this is the best effort at the moment since shard cleanup will still happen
1157+
// asynchronously using REMOTE_PURGE thread pool. if it fails, it could leave some stale files in remote
1158+
// directory. this issue could even happen in cases of shard level remote store data cleanup which also
1159+
// happens asynchronously. in long term, we have plans to implement remote store GC poller mechanism which
1160+
// will take care of such stale data.
1161+
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
11611162
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
11621163
remoteStoreLockManagerFactory.getRepositoriesService(),
11631164
threadPool

‎server/src/main/java/org/opensearch/repositories/blobstore/RemoteStoreShardCleanupTask.java

+7-19
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.core.index.shard.ShardId;
1414

15-
import java.util.Map;
1615
import java.util.Set;
1716

18-
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
1917
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
2018

2119
/**
@@ -25,7 +23,6 @@ public class RemoteStoreShardCleanupTask implements Runnable {
2523
private final Runnable task;
2624
private final String shardIdentifier;
2725
final static Set<String> ongoingRemoteDirectoryCleanups = newConcurrentSet();
28-
final static Map<String, Runnable> pendingRemoteDirectoryCleanups = newConcurrentMap();
2926
private static final Logger staticLogger = LogManager.getLogger(RemoteStoreShardCleanupTask.class);
3027

3128
public RemoteStoreShardCleanupTask(Runnable task, String indexUUID, ShardId shardId) {
@@ -39,25 +36,16 @@ private static String indexShardIdentifier(String indexUUID, ShardId shardId) {
3936

4037
@Override
4138
public void run() {
42-
// TODO: this is the best effort at the moment since there is still a known race condition scenario in this
43-
// method which needs to be handled where one of the thread just came out of while loop and removed the
44-
// entry from ongoingRemoteDirectoryCleanup, and another thread added new pending task in the map.
45-
// we need to introduce semaphores/locks to avoid that situation which introduces the overhead of lock object
46-
// cleanups. however, there will be no scenario where two threads run cleanup for same shard at same time.
47-
// <issue-link>
48-
if (pendingRemoteDirectoryCleanups.put(shardIdentifier, task) == null) {
49-
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
50-
while (pendingRemoteDirectoryCleanups.containsKey(shardIdentifier)) {
51-
Runnable newTask = pendingRemoteDirectoryCleanups.get(shardIdentifier);
52-
pendingRemoteDirectoryCleanups.remove(shardIdentifier);
53-
newTask.run();
54-
}
39+
// If there is already a same task ongoing for a shard, we need to skip the new task to avoid multiple
40+
// concurrent cleanup of same shard.
41+
if (ongoingRemoteDirectoryCleanups.add(shardIdentifier)) {
42+
try {
43+
task.run();
44+
} finally {
5545
ongoingRemoteDirectoryCleanups.remove(shardIdentifier);
56-
} else {
57-
staticLogger.debug("one task is already ongoing for shard {}, we can leave entry in pending", shardIdentifier);
5846
}
5947
} else {
60-
staticLogger.debug("one cleanup task for shard {} is already in pending, we can skip this task", shardIdentifier);
48+
staticLogger.warn("one cleanup task for shard {} is already ongoing, need to skip this task", shardIdentifier);
6149
}
6250
}
6351
}

‎server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java

+11-18
Original file line numberDiff line numberDiff line change
@@ -321,38 +321,31 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo
321321
return repoData;
322322
}
323323

324+
private String getShardIdentifier(String indexUUID, String shardId) {
325+
return String.join("/", indexUUID, shardId);
326+
}
327+
324328
public void testRemoteStoreShardCleanupTask() {
325-
// todo: move it to separate class and add more scenarios.
326329
AtomicBoolean executed1 = new AtomicBoolean(false);
327330
Runnable task1 = () -> executed1.set(true);
328331
String indexName = "test-idx";
329332
String testIndexUUID = "test-idx-uuid";
330333
ShardId shardId = new ShardId(new Index(indexName, testIndexUUID), 0);
331334

332-
// Scenario 1: pending = empty, ongoing = false => executed
335+
// just adding random shards in ongoing cleanups.
336+
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "1"));
337+
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "2"));
338+
339+
// Scenario 1: ongoing = false => executed
333340
RemoteStoreShardCleanupTask remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId);
334341
remoteStoreShardCleanupTask.run();
335342
assertTrue(executed1.get());
336343

337-
// Scenario 2: pending = empty, ongoing = true => pending = currentTask
344+
// Scenario 2: ongoing = true => currentTask skipped.
338345
executed1.set(false);
339-
String shardIdentifier = String.join("/", testIndexUUID, String.valueOf(shardId.id()));
340-
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier);
341-
346+
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(getShardIdentifier(testIndexUUID, "0"));
342347
remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task1, testIndexUUID, shardId);
343348
remoteStoreShardCleanupTask.run();
344349
assertFalse(executed1.get());
345-
assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task1);
346-
347-
// Scenario3: pending = anotherTask, ongoing = true => pending = currentTask
348-
AtomicBoolean executed2 = new AtomicBoolean(false);
349-
Runnable task2 = () -> executed2.set(true);
350-
RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.put(shardIdentifier, task1);
351-
RemoteStoreShardCleanupTask.ongoingRemoteDirectoryCleanups.add(shardIdentifier);
352-
353-
remoteStoreShardCleanupTask = new RemoteStoreShardCleanupTask(task2, testIndexUUID, shardId);
354-
remoteStoreShardCleanupTask.run();
355-
assertFalse(executed1.get());
356-
assertSame(RemoteStoreShardCleanupTask.pendingRemoteDirectoryCleanups.get(shardIdentifier), task2);
357350
}
358351
}

0 commit comments

Comments
 (0)
Please sign in to comment.