Skip to content

Commit 9f25c38

Browse files
authored
Implement interface changes for s3 plugin to read/write blob with obj… (#13113) (#13765)
* Implement interface changes for s3 plugin to read/write blob with object metadata --------- Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent faa592a commit 9f25c38

File tree

14 files changed

+220
-41
lines changed

14 files changed

+220
-41
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+52-8
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
6363
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
6464
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
65+
import software.amazon.awssdk.utils.CollectionUtils;
6566

6667
import org.apache.logging.log4j.LogManager;
6768
import org.apache.logging.log4j.Logger;
@@ -77,6 +78,7 @@
7778
import org.opensearch.common.blobstore.BlobPath;
7879
import org.opensearch.common.blobstore.BlobStoreException;
7980
import org.opensearch.common.blobstore.DeleteResult;
81+
import org.opensearch.common.blobstore.FetchBlobResult;
8082
import org.opensearch.common.blobstore.stream.read.ReadContext;
8183
import org.opensearch.common.blobstore.stream.write.WriteContext;
8284
import org.opensearch.common.blobstore.stream.write.WritePriority;
@@ -139,6 +141,13 @@ public boolean blobExists(String blobName) {
139141
}
140142
}
141143

144+
@ExperimentalApi
145+
@Override
146+
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
147+
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
148+
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
149+
}
150+
142151
@Override
143152
public InputStream readBlob(String blobName) throws IOException {
144153
return new S3RetryingInputStream(blobStore, buildKey(blobName));
@@ -170,12 +179,27 @@ public long readBlobPreferredLength() {
170179
*/
171180
@Override
172181
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
182+
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null);
183+
}
184+
185+
/**
186+
* Write blob with its object metadata.
187+
*/
188+
@ExperimentalApi
189+
@Override
190+
public void writeBlobWithMetadata(
191+
String blobName,
192+
InputStream inputStream,
193+
long blobSize,
194+
boolean failIfAlreadyExists,
195+
@Nullable Map<String, String> metadata
196+
) throws IOException {
173197
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
174198
SocketAccess.doPrivilegedIOException(() -> {
175199
if (blobSize <= getLargeBlobThresholdInBytes()) {
176-
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
200+
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
177201
} else {
178-
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
202+
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
179203
}
180204
return null;
181205
});
@@ -191,7 +215,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
191215
writeContext.getUploadFinalizer(),
192216
writeContext.doRemoteDataIntegrityCheck(),
193217
writeContext.getExpectedChecksum(),
194-
blobStore.isUploadRetryEnabled()
218+
blobStore.isUploadRetryEnabled(),
219+
writeContext.getMetadata()
195220
);
196221
try {
197222
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
@@ -211,7 +236,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
211236
blobStore,
212237
uploadRequest.getKey(),
213238
inputStream.getInputStream(),
214-
uploadRequest.getContentLength()
239+
uploadRequest.getContentLength(),
240+
uploadRequest.getMetadata()
215241
);
216242
completionListener.onResponse(null);
217243
} catch (Exception ex) {
@@ -582,8 +608,13 @@ private String buildKey(String blobName) {
582608
/**
583609
* Uploads a blob using a single upload request
584610
*/
585-
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
586-
throws IOException {
611+
void executeSingleUpload(
612+
final S3BlobStore blobStore,
613+
final String blobName,
614+
final InputStream input,
615+
final long blobSize,
616+
final Map<String, String> metadata
617+
) throws IOException {
587618

588619
// Extra safety checks
589620
if (blobSize > MAX_FILE_SIZE.getBytes()) {
@@ -600,6 +631,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
600631
.storageClass(blobStore.getStorageClass())
601632
.acl(blobStore.getCannedACL())
602633
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));
634+
635+
if (CollectionUtils.isNotEmpty(metadata)) {
636+
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
637+
}
603638
if (blobStore.serverSideEncryption()) {
604639
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
605640
}
@@ -623,8 +658,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
623658
/**
624659
* Uploads a blob using multipart upload requests.
625660
*/
626-
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
627-
throws IOException {
661+
void executeMultipartUpload(
662+
final S3BlobStore blobStore,
663+
final String blobName,
664+
final InputStream input,
665+
final long blobSize,
666+
final Map<String, String> metadata
667+
) throws IOException {
628668

629669
ensureMultiPartUploadSize(blobSize);
630670
final long partSize = blobStore.bufferSizeInBytes();
@@ -649,6 +689,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
649689
.acl(blobStore.getCannedACL())
650690
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));
651691

692+
if (CollectionUtils.isNotEmpty(metadata)) {
693+
createMultipartUploadRequestBuilder.metadata(metadata);
694+
}
695+
652696
if (blobStore.serverSideEncryption()) {
653697
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
654698
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java

+7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.nio.file.NoSuchFileException;
4949
import java.util.ArrayList;
5050
import java.util.List;
51+
import java.util.Map;
5152
import java.util.concurrent.atomic.AtomicBoolean;
5253

5354
/**
@@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
7778
private long currentOffset;
7879
private boolean closed;
7980
private boolean eof;
81+
private Map<String, String> metadata;
8082

8183
S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
8284
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
@@ -122,6 +124,7 @@ private void openStream() throws IOException {
122124
getObjectResponseInputStream.response().contentLength()
123125
);
124126
this.currentStream = getObjectResponseInputStream;
127+
this.metadata = getObjectResponseInputStream.response().metadata();
125128
this.isStreamAborted.set(false);
126129
} catch (final SdkException e) {
127130
if (e instanceof S3Exception) {
@@ -265,4 +268,8 @@ boolean isEof() {
265268
boolean isAborted() {
266269
return isStreamAborted.get();
267270
}
271+
272+
Map<String, String> getMetadata() {
273+
return this.metadata;
274+
}
268275
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
2424
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
2525
import software.amazon.awssdk.services.s3.model.S3Exception;
26+
import software.amazon.awssdk.utils.CollectionUtils;
2627
import software.amazon.awssdk.utils.CompletableFutureUtils;
2728

2829
import org.apache.logging.log4j.LogManager;
@@ -154,6 +155,10 @@ private void uploadInParts(
154155
.bucket(uploadRequest.getBucket())
155156
.key(uploadRequest.getKey())
156157
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));
158+
159+
if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
160+
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
161+
}
157162
if (uploadRequest.doRemoteDataIntegrityCheck()) {
158163
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
159164
}
@@ -352,6 +357,10 @@ private void uploadInOneChunk(
352357
.key(uploadRequest.getKey())
353358
.contentLength(uploadRequest.getContentLength())
354359
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));
360+
361+
if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
362+
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
363+
}
355364
if (uploadRequest.doRemoteDataIntegrityCheck()) {
356365
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
357366
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
package org.opensearch.repositories.s3.async;
1010

1111
import org.opensearch.common.CheckedConsumer;
12+
import org.opensearch.common.Nullable;
1213
import org.opensearch.common.blobstore.stream.write.WritePriority;
1314

1415
import java.io.IOException;
16+
import java.util.Map;
1517

1618
/**
1719
* A model encapsulating all details for an upload to S3
@@ -25,6 +27,7 @@ public class UploadRequest {
2527
private final boolean doRemoteDataIntegrityCheck;
2628
private final Long expectedChecksum;
2729
private final boolean uploadRetryEnabled;
30+
private final Map<String, String> metadata;
2831

2932
/**
3033
* Construct a new UploadRequest object
@@ -36,6 +39,7 @@ public class UploadRequest {
3639
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
3740
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
3841
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
42+
* @param metadata Metadata of the file being uploaded
3943
*/
4044
public UploadRequest(
4145
String bucket,
@@ -45,7 +49,8 @@ public UploadRequest(
4549
CheckedConsumer<Boolean, IOException> uploadFinalizer,
4650
boolean doRemoteDataIntegrityCheck,
4751
Long expectedChecksum,
48-
boolean uploadRetryEnabled
52+
boolean uploadRetryEnabled,
53+
@Nullable Map<String, String> metadata
4954
) {
5055
this.bucket = bucket;
5156
this.key = key;
@@ -55,6 +60,7 @@ public UploadRequest(
5560
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
5661
this.expectedChecksum = expectedChecksum;
5762
this.uploadRetryEnabled = uploadRetryEnabled;
63+
this.metadata = metadata;
5864
}
5965

6066
public String getBucket() {
@@ -88,4 +94,11 @@ public Long getExpectedChecksum() {
8894
public boolean isUploadRetryEnabled() {
8995
return uploadRetryEnabled;
9096
}
97+
98+
/**
99+
* @return metadata of the blob to be uploaded
100+
*/
101+
public Map<String, String> getMetadata() {
102+
return metadata;
103+
}
91104
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.nio.file.Path;
6161
import java.util.ArrayList;
6262
import java.util.Arrays;
63+
import java.util.HashMap;
6364
import java.util.List;
6465
import java.util.Locale;
6566
import java.util.concurrent.CompletableFuture;
@@ -79,6 +80,7 @@
7980

8081
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
8182
import static org.mockito.ArgumentMatchers.anyLong;
83+
import static org.mockito.ArgumentMatchers.anyMap;
8284
import static org.mockito.ArgumentMatchers.anyString;
8385
import static org.mockito.Mockito.any;
8486
import static org.mockito.Mockito.doAnswer;
@@ -722,6 +724,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
722724
.writePriority(writePriority)
723725
.uploadFinalizer(Assert::assertTrue)
724726
.doRemoteDataIntegrityCheck(false)
727+
.metadata(new HashMap<>())
725728
.build();
726729

727730
s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
@@ -731,7 +734,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
731734
} else {
732735
assertNull(exceptionRef.get());
733736
}
734-
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
737+
verify(s3BlobContainer, times(1)).executeMultipartUpload(
738+
any(S3BlobStore.class),
739+
anyString(),
740+
any(InputStream.class),
741+
anyLong(),
742+
anyMap()
743+
);
735744

736745
if (expectException) {
737746
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));

0 commit comments

Comments
 (0)