Skip to content

Commit 051d298

Browse files
Improve exception handling in S3BlobContainer synchronous operations (#17049) (#17056)
(cherry picked from commit 1b4a817) Signed-off-by: Ashish Singh <ssashish@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 8776e47 commit 051d298

File tree

2 files changed

+129
-4
lines changed

2 files changed

+129
-4
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import java.util.List;
100100
import java.util.Map;
101101
import java.util.concurrent.CompletableFuture;
102+
import java.util.concurrent.ExecutionException;
102103
import java.util.concurrent.atomic.AtomicLong;
103104
import java.util.function.Function;
104105
import java.util.stream.Collectors;
@@ -373,17 +374,31 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
373374
}
374375

375376
@Override
376-
public DeleteResult delete() {
377+
public DeleteResult delete() throws IOException {
377378
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
378379
deleteAsync(future);
379-
return future.actionGet();
380+
return getFutureValue(future);
380381
}
381382

382383
@Override
383-
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
384+
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
384385
PlainActionFuture<Void> future = new PlainActionFuture<>();
385386
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
386-
future.actionGet();
387+
getFutureValue(future);
388+
}
389+
390+
private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
391+
try {
392+
return future.get();
393+
} catch (InterruptedException e) {
394+
Thread.currentThread().interrupt();
395+
throw new IllegalStateException("Future got interrupted", e);
396+
} catch (ExecutionException e) {
397+
if (e.getCause() instanceof IOException) {
398+
throw (IOException) e.getCause();
399+
}
400+
throw new RuntimeException(e.getCause());
401+
}
387402
}
388403

389404
@Override

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java

+110
Original file line numberDiff line numberDiff line change
@@ -1947,6 +1947,116 @@ public void onFailure(Exception e) {
19471947
assertEquals(simulatedFailure, exceptionRef.get().getCause());
19481948
}
19491949

1950+
public void testDeleteWithInterruptedException() throws Exception {
1951+
final String bucketName = randomAlphaOfLengthBetween(1, 10);
1952+
final BlobPath blobPath = new BlobPath();
1953+
final S3BlobStore blobStore = mock(S3BlobStore.class);
1954+
when(blobStore.bucket()).thenReturn(bucketName);
1955+
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
1956+
1957+
final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
1958+
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
1959+
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
1960+
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));
1961+
1962+
// Mock the list operation to block indefinitely
1963+
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
1964+
doAnswer(invocation -> {
1965+
Thread.currentThread().interrupt();
1966+
return null;
1967+
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());
1968+
1969+
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);
1970+
1971+
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
1972+
1973+
IllegalStateException e = expectThrows(IllegalStateException.class, blobContainer::delete);
1974+
assertEquals("Future got interrupted", e.getMessage());
1975+
assertTrue(Thread.interrupted()); // Clear interrupted state
1976+
}
1977+
1978+
public void testDeleteWithExecutionException() throws Exception {
1979+
final String bucketName = randomAlphaOfLengthBetween(1, 10);
1980+
final BlobPath blobPath = new BlobPath();
1981+
final S3BlobStore blobStore = mock(S3BlobStore.class);
1982+
when(blobStore.bucket()).thenReturn(bucketName);
1983+
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
1984+
1985+
final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
1986+
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
1987+
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
1988+
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));
1989+
1990+
RuntimeException simulatedError = new RuntimeException("Simulated error");
1991+
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
1992+
doAnswer(invocation -> {
1993+
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
1994+
subscriber.onError(simulatedError);
1995+
return null;
1996+
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());
1997+
1998+
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);
1999+
2000+
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
2001+
2002+
IOException e = expectThrows(IOException.class, blobContainer::delete);
2003+
assertEquals("Failed to list objects for deletion", e.getMessage());
2004+
assertEquals(simulatedError, e.getCause());
2005+
}
2006+
2007+
public void testDeleteBlobsIgnoringIfNotExistsWithInterruptedException() throws Exception {
2008+
final String bucketName = randomAlphaOfLengthBetween(1, 10);
2009+
final BlobPath blobPath = new BlobPath();
2010+
final S3BlobStore blobStore = mock(S3BlobStore.class);
2011+
when(blobStore.bucket()).thenReturn(bucketName);
2012+
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
2013+
when(blobStore.getBulkDeletesSize()).thenReturn(5);
2014+
2015+
final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
2016+
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
2017+
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
2018+
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));
2019+
2020+
// Mock deleteObjects to block indefinitely
2021+
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenAnswer(invocation -> {
2022+
Thread.currentThread().interrupt();
2023+
return null;
2024+
});
2025+
2026+
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
2027+
List<String> blobNames = Arrays.asList("test1", "test2");
2028+
2029+
IllegalStateException e = expectThrows(IllegalStateException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
2030+
assertEquals("Future got interrupted", e.getMessage());
2031+
assertTrue(Thread.interrupted()); // Clear interrupted state
2032+
}
2033+
2034+
public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Exception {
2035+
final String bucketName = randomAlphaOfLengthBetween(1, 10);
2036+
final BlobPath blobPath = new BlobPath();
2037+
final S3BlobStore blobStore = mock(S3BlobStore.class);
2038+
when(blobStore.bucket()).thenReturn(bucketName);
2039+
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
2040+
when(blobStore.getBulkDeletesSize()).thenReturn(5);
2041+
2042+
final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
2043+
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
2044+
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
2045+
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));
2046+
2047+
RuntimeException simulatedError = new RuntimeException("Simulated delete error");
2048+
CompletableFuture<DeleteObjectsResponse> failedFuture = new CompletableFuture<>();
2049+
failedFuture.completeExceptionally(simulatedError);
2050+
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture);
2051+
2052+
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
2053+
List<String> blobNames = Arrays.asList("test1", "test2");
2054+
2055+
IOException e = expectThrows(IOException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
2056+
assertEquals("Failed to delete blobs " + blobNames, e.getMessage());
2057+
assertEquals(simulatedError, e.getCause().getCause());
2058+
}
2059+
19502060
private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {
19512061

19522062
final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));

0 commit comments

Comments
 (0)