Skip to content

Commit 2b670cc

Browse files
authored
Add support for async deletion in S3BlobContainer (opensearch-project#15621)
* Add support for async deletion in S3BlobContainer Signed-off-by: Ashish Singh <ssashish@amazon.com> * Move helper methods to helper class Signed-off-by: Ashish Singh <ssashish@amazon.com> * Minor refactor Signed-off-by: Ashish Singh <ssashish@amazon.com> * Add UTs Signed-off-by: Ashish Singh <ssashish@amazon.com> * Add more tests Signed-off-by: Ashish Singh <ssashish@amazon.com> * Integrate async deletion in the snapshot interactions Signed-off-by: Ashish Singh <ssashish@amazon.com> --------- Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 9537d39 commit 2b670cc

File tree

16 files changed

+1086
-6
lines changed

16 files changed

+1086
-6
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
89
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
910
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
1011
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
153153
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
154154
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
155155
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
156+
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
156157
.put(super.nodeSettings(nodeOrdinal))
157158
.setSecureSettings(secureSettings);
158159

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java

+8
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@
5555

5656
public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
5757

58+
@Override
59+
protected Settings nodeSettings() {
60+
return Settings.builder()
61+
.put(super.nodeSettings())
62+
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
63+
.build();
64+
}
65+
5866
@Override
5967
@Before
6068
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+124
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
6363
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
6464
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
65+
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
6566
import software.amazon.awssdk.utils.CollectionUtils;
6667

6768
import org.apache.logging.log4j.LogManager;
@@ -90,6 +91,7 @@
9091
import org.opensearch.core.common.Strings;
9192
import org.opensearch.core.common.unit.ByteSizeUnit;
9293
import org.opensearch.core.common.unit.ByteSizeValue;
94+
import org.opensearch.repositories.s3.async.S3AsyncDeleteHelper;
9395
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9496
import org.opensearch.repositories.s3.async.UploadRequest;
9597
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
@@ -109,6 +111,9 @@
109111
import java.util.function.Function;
110112
import java.util.stream.Collectors;
111113

114+
import org.reactivestreams.Subscriber;
115+
import org.reactivestreams.Subscription;
116+
112117
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
113118
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
114119
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
@@ -875,4 +880,123 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A
875880

876881
return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
877882
}
883+
884+
@Override
885+
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
886+
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
887+
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();
888+
889+
ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
890+
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);
891+
892+
AtomicLong deletedBlobs = new AtomicLong();
893+
AtomicLong deletedBytes = new AtomicLong();
894+
895+
CompletableFuture<Void> listingFuture = new CompletableFuture<>();
896+
897+
listPublisher.subscribe(new Subscriber<>() {
898+
private Subscription subscription;
899+
private final List<String> objectsToDelete = new ArrayList<>();
900+
private CompletableFuture<Void> deletionChain = CompletableFuture.completedFuture(null);
901+
902+
@Override
903+
public void onSubscribe(Subscription s) {
904+
this.subscription = s;
905+
subscription.request(1);
906+
}
907+
908+
@Override
909+
public void onNext(ListObjectsV2Response response) {
910+
response.contents().forEach(s3Object -> {
911+
deletedBlobs.incrementAndGet();
912+
deletedBytes.addAndGet(s3Object.size());
913+
objectsToDelete.add(s3Object.key());
914+
});
915+
916+
int bulkDeleteSize = blobStore.getBulkDeletesSize();
917+
if (objectsToDelete.size() >= bulkDeleteSize) {
918+
int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize;
919+
int itemsToDelete = fullBatchesCount * bulkDeleteSize;
920+
921+
List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
922+
objectsToDelete.subList(0, itemsToDelete).clear();
923+
924+
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
925+
s3AsyncClient,
926+
blobStore,
927+
batchToDelete,
928+
deletionChain,
929+
() -> subscription.request(1)
930+
);
931+
} else {
932+
subscription.request(1);
933+
}
934+
}
935+
936+
@Override
937+
public void onError(Throwable t) {
938+
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
939+
}
940+
941+
@Override
942+
public void onComplete() {
943+
if (!objectsToDelete.isEmpty()) {
944+
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
945+
s3AsyncClient,
946+
blobStore,
947+
objectsToDelete,
948+
deletionChain,
949+
null
950+
);
951+
}
952+
deletionChain.whenComplete((v, throwable) -> {
953+
if (throwable != null) {
954+
listingFuture.completeExceptionally(throwable);
955+
} else {
956+
listingFuture.complete(null);
957+
}
958+
});
959+
}
960+
});
961+
962+
listingFuture.whenComplete((v, throwable) -> {
963+
if (throwable != null) {
964+
completionListener.onFailure(
965+
throwable instanceof Exception
966+
? (Exception) throwable
967+
: new IOException("Unexpected error during async deletion", throwable)
968+
);
969+
} else {
970+
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
971+
}
972+
});
973+
} catch (Exception e) {
974+
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
975+
}
976+
}
977+
978+
@Override
979+
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
980+
if (blobNames.isEmpty()) {
981+
completionListener.onResponse(null);
982+
return;
983+
}
984+
985+
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
986+
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();
987+
988+
List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());
989+
990+
S3AsyncDeleteHelper.executeDeleteChain(s3AsyncClient, blobStore, keysToDelete, CompletableFuture.completedFuture(null), null)
991+
.whenComplete((v, throwable) -> {
992+
if (throwable != null) {
993+
completionListener.onFailure(new IOException("Failed to delete blobs " + blobNames, throwable));
994+
} else {
995+
completionListener.onResponse(null);
996+
}
997+
});
998+
} catch (Exception e) {
999+
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
1000+
}
1001+
}
8781002
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
6464
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;
6565

66-
class S3BlobStore implements BlobStore {
66+
public class S3BlobStore implements BlobStore {
6767

6868
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
6969

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

+4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) {
9595
public void close() {}
9696
};
9797

98+
public MetricPublisher getDeleteObjectsMetricPublisher() {
99+
return deleteObjectsMetricPublisher;
100+
}
101+
98102
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
99103
@Override
100104
public void publish(MetricCollection metricCollection) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3.async;
10+
11+
import software.amazon.awssdk.services.s3.S3AsyncClient;
12+
import software.amazon.awssdk.services.s3.model.Delete;
13+
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
14+
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
15+
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
16+
17+
import org.apache.logging.log4j.LogManager;
18+
import org.apache.logging.log4j.Logger;
19+
import org.apache.logging.log4j.message.ParameterizedMessage;
20+
import org.opensearch.repositories.s3.S3BlobStore;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.stream.Collectors;
26+
27+
public class S3AsyncDeleteHelper {
28+
private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class);
29+
30+
public static CompletableFuture<Void> executeDeleteChain(
31+
S3AsyncClient s3AsyncClient,
32+
S3BlobStore blobStore,
33+
List<String> objectsToDelete,
34+
CompletableFuture<Void> currentChain,
35+
Runnable afterDeleteAction
36+
) {
37+
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
38+
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches));
39+
if (afterDeleteAction != null) {
40+
newChain = newChain.thenRun(afterDeleteAction);
41+
}
42+
return newChain;
43+
}
44+
45+
static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteSize) {
46+
List<List<String>> batches = new ArrayList<>();
47+
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
48+
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
49+
}
50+
return batches;
51+
}
52+
53+
static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
54+
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);
55+
56+
for (List<String> batch : batches) {
57+
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch));
58+
}
59+
60+
return allDeletesFuture;
61+
}
62+
63+
static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
64+
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
65+
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
66+
}
67+
68+
static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
69+
if (!deleteObjectsResponse.errors().isEmpty()) {
70+
logger.warn(
71+
() -> new ParameterizedMessage(
72+
"Failed to delete some blobs {}",
73+
deleteObjectsResponse.errors()
74+
.stream()
75+
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
76+
.collect(Collectors.toList())
77+
)
78+
);
79+
}
80+
return null;
81+
}
82+
83+
static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
84+
return DeleteObjectsRequest.builder()
85+
.bucket(bucket)
86+
.delete(
87+
Delete.builder()
88+
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
89+
.quiet(true)
90+
.build()
91+
)
92+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher()))
93+
.build();
94+
}
95+
}

0 commit comments

Comments
 (0)