Skip to content

Commit 7345371

Browse files
authored
Implement interface changes for s3 plugin to read/write blob with obj… (opensearch-project#13113)
* Implement interface changes for s3 plugin to read/write blob with object metadata --------- Signed-off-by: Sandeep Kumawat <skumwt@amazon.com> Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Co-authored-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent 02e3c56 commit 7345371

File tree

14 files changed

+220
-42
lines changed

14 files changed

+220
-42
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+52-8
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
6363
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
6464
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
65+
import software.amazon.awssdk.utils.CollectionUtils;
6566

6667
import org.apache.logging.log4j.LogManager;
6768
import org.apache.logging.log4j.Logger;
@@ -77,6 +78,7 @@
7778
import org.opensearch.common.blobstore.BlobPath;
7879
import org.opensearch.common.blobstore.BlobStoreException;
7980
import org.opensearch.common.blobstore.DeleteResult;
81+
import org.opensearch.common.blobstore.FetchBlobResult;
8082
import org.opensearch.common.blobstore.stream.read.ReadContext;
8183
import org.opensearch.common.blobstore.stream.write.WriteContext;
8284
import org.opensearch.common.blobstore.stream.write.WritePriority;
@@ -138,6 +140,13 @@ public boolean blobExists(String blobName) {
138140
}
139141
}
140142

143+
@ExperimentalApi
144+
@Override
145+
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
146+
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
147+
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
148+
}
149+
141150
@Override
142151
public InputStream readBlob(String blobName) throws IOException {
143152
return new S3RetryingInputStream(blobStore, buildKey(blobName));
@@ -169,12 +178,27 @@ public long readBlobPreferredLength() {
169178
*/
170179
@Override
171180
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
181+
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null);
182+
}
183+
184+
/**
185+
* Write blob with its object metadata.
186+
*/
187+
@ExperimentalApi
188+
@Override
189+
public void writeBlobWithMetadata(
190+
String blobName,
191+
InputStream inputStream,
192+
long blobSize,
193+
boolean failIfAlreadyExists,
194+
@Nullable Map<String, String> metadata
195+
) throws IOException {
172196
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
173197
SocketAccess.doPrivilegedIOException(() -> {
174198
if (blobSize <= getLargeBlobThresholdInBytes()) {
175-
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
199+
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
176200
} else {
177-
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
201+
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
178202
}
179203
return null;
180204
});
@@ -190,7 +214,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
190214
writeContext.getUploadFinalizer(),
191215
writeContext.doRemoteDataIntegrityCheck(),
192216
writeContext.getExpectedChecksum(),
193-
blobStore.isUploadRetryEnabled()
217+
blobStore.isUploadRetryEnabled(),
218+
writeContext.getMetadata()
194219
);
195220
try {
196221
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
@@ -203,7 +228,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
203228
blobStore,
204229
uploadRequest.getKey(),
205230
inputStream.getInputStream(),
206-
uploadRequest.getContentLength()
231+
uploadRequest.getContentLength(),
232+
uploadRequest.getMetadata()
207233
);
208234
completionListener.onResponse(null);
209235
} catch (Exception ex) {
@@ -542,8 +568,13 @@ private String buildKey(String blobName) {
542568
/**
543569
* Uploads a blob using a single upload request
544570
*/
545-
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
546-
throws IOException {
571+
void executeSingleUpload(
572+
final S3BlobStore blobStore,
573+
final String blobName,
574+
final InputStream input,
575+
final long blobSize,
576+
final Map<String, String> metadata
577+
) throws IOException {
547578

548579
// Extra safety checks
549580
if (blobSize > MAX_FILE_SIZE.getBytes()) {
@@ -560,6 +591,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
560591
.storageClass(blobStore.getStorageClass())
561592
.acl(blobStore.getCannedACL())
562593
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));
594+
595+
if (CollectionUtils.isNotEmpty(metadata)) {
596+
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
597+
}
563598
if (blobStore.serverSideEncryption()) {
564599
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
565600
}
@@ -583,8 +618,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
583618
/**
584619
* Uploads a blob using multipart upload requests.
585620
*/
586-
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
587-
throws IOException {
621+
void executeMultipartUpload(
622+
final S3BlobStore blobStore,
623+
final String blobName,
624+
final InputStream input,
625+
final long blobSize,
626+
final Map<String, String> metadata
627+
) throws IOException {
588628

589629
ensureMultiPartUploadSize(blobSize);
590630
final long partSize = blobStore.bufferSizeInBytes();
@@ -609,6 +649,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
609649
.acl(blobStore.getCannedACL())
610650
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));
611651

652+
if (CollectionUtils.isNotEmpty(metadata)) {
653+
createMultipartUploadRequestBuilder.metadata(metadata);
654+
}
655+
612656
if (blobStore.serverSideEncryption()) {
613657
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
614658
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java

+7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.nio.file.NoSuchFileException;
4949
import java.util.ArrayList;
5050
import java.util.List;
51+
import java.util.Map;
5152
import java.util.concurrent.atomic.AtomicBoolean;
5253

5354
/**
@@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
7778
private long currentOffset;
7879
private boolean closed;
7980
private boolean eof;
81+
private Map<String, String> metadata;
8082

8183
S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
8284
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
@@ -122,6 +124,7 @@ private void openStream() throws IOException {
122124
getObjectResponseInputStream.response().contentLength()
123125
);
124126
this.currentStream = getObjectResponseInputStream;
127+
this.metadata = getObjectResponseInputStream.response().metadata();
125128
this.isStreamAborted.set(false);
126129
} catch (final SdkException e) {
127130
if (e instanceof S3Exception) {
@@ -265,4 +268,8 @@ boolean isEof() {
265268
boolean isAborted() {
266269
return isStreamAborted.get();
267270
}
271+
272+
Map<String, String> getMetadata() {
273+
return this.metadata;
274+
}
268275
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

+9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
2323
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
2424
import software.amazon.awssdk.services.s3.model.S3Exception;
25+
import software.amazon.awssdk.utils.CollectionUtils;
2526
import software.amazon.awssdk.utils.CompletableFutureUtils;
2627

2728
import org.apache.logging.log4j.LogManager;
@@ -131,6 +132,10 @@ private void uploadInParts(
131132
.bucket(uploadRequest.getBucket())
132133
.key(uploadRequest.getKey())
133134
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));
135+
136+
if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
137+
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
138+
}
134139
if (uploadRequest.doRemoteDataIntegrityCheck()) {
135140
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
136141
}
@@ -327,6 +332,10 @@ private void uploadInOneChunk(
327332
.key(uploadRequest.getKey())
328333
.contentLength(uploadRequest.getContentLength())
329334
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));
335+
336+
if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
337+
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
338+
}
330339
if (uploadRequest.doRemoteDataIntegrityCheck()) {
331340
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
332341
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
package org.opensearch.repositories.s3.async;
1010

1111
import org.opensearch.common.CheckedConsumer;
12+
import org.opensearch.common.Nullable;
1213
import org.opensearch.common.blobstore.stream.write.WritePriority;
1314

1415
import java.io.IOException;
16+
import java.util.Map;
1517

1618
/**
1719
* A model encapsulating all details for an upload to S3
@@ -24,8 +26,8 @@ public class UploadRequest {
2426
private final CheckedConsumer<Boolean, IOException> uploadFinalizer;
2527
private final boolean doRemoteDataIntegrityCheck;
2628
private final Long expectedChecksum;
27-
2829
private boolean uploadRetryEnabled;
30+
private final Map<String, String> metadata;
2931

3032
/**
3133
* Construct a new UploadRequest object
@@ -37,6 +39,7 @@ public class UploadRequest {
3739
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
3840
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
3941
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
42+
* @param metadata Metadata of the file being uploaded
4043
*/
4144
public UploadRequest(
4245
String bucket,
@@ -46,7 +49,8 @@ public UploadRequest(
4649
CheckedConsumer<Boolean, IOException> uploadFinalizer,
4750
boolean doRemoteDataIntegrityCheck,
4851
Long expectedChecksum,
49-
boolean uploadRetryEnabled
52+
boolean uploadRetryEnabled,
53+
@Nullable Map<String, String> metadata
5054
) {
5155
this.bucket = bucket;
5256
this.key = key;
@@ -56,6 +60,7 @@ public UploadRequest(
5660
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
5761
this.expectedChecksum = expectedChecksum;
5862
this.uploadRetryEnabled = uploadRetryEnabled;
63+
this.metadata = metadata;
5964
}
6065

6166
public String getBucket() {
@@ -89,4 +94,11 @@ public Long getExpectedChecksum() {
8994
public boolean isUploadRetryEnabled() {
9095
return uploadRetryEnabled;
9196
}
97+
98+
/**
99+
* @return metadata of the blob to be uploaded
100+
*/
101+
public Map<String, String> getMetadata() {
102+
return metadata;
103+
}
92104
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.nio.file.Path;
5858
import java.util.ArrayList;
5959
import java.util.Arrays;
60+
import java.util.HashMap;
6061
import java.util.List;
6162
import java.util.Locale;
6263
import java.util.concurrent.CompletableFuture;
@@ -75,6 +76,7 @@
7576

7677
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
7778
import static org.mockito.ArgumentMatchers.anyLong;
79+
import static org.mockito.ArgumentMatchers.anyMap;
7880
import static org.mockito.ArgumentMatchers.anyString;
7981
import static org.mockito.Mockito.any;
8082
import static org.mockito.Mockito.doAnswer;
@@ -659,6 +661,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
659661
.writePriority(WritePriority.HIGH)
660662
.uploadFinalizer(Assert::assertTrue)
661663
.doRemoteDataIntegrityCheck(false)
664+
.metadata(new HashMap<>())
662665
.build();
663666

664667
s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
@@ -668,7 +671,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
668671
} else {
669672
assertNull(exceptionRef.get());
670673
}
671-
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
674+
verify(s3BlobContainer, times(1)).executeMultipartUpload(
675+
any(S3BlobStore.class),
676+
anyString(),
677+
any(InputStream.class),
678+
anyLong(),
679+
anyMap()
680+
);
672681

673682
if (expectException) {
674683
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));

0 commit comments

Comments
 (0)