Skip to content

Commit f2c4efa

Browse files
committed
Introduce interface changes to support read/write blob with object metadata (opensearch-project#13023)
* Introduce interface changes to read/write blob with object metadata --------- Signed-off-by: Sandeep Kumawat <skumwt@amazon.com> Co-authored-by: Sandeep Kumawat <skumwt@amazon.com> (cherry picked from commit 9b0f578) Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com>
1 parent cf20a61 commit f2c4efa

File tree

13 files changed

+406
-79
lines changed

13 files changed

+406
-79
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
325325
);
326326
}
327327
}
328-
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
328+
listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build());
329329
} catch (Exception ex) {
330330
listener.onFailure(ex);
331331
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

+58-42
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
import org.apache.lucene.store.IndexInput;
3232
import org.opensearch.cluster.metadata.RepositoryMetadata;
33-
import org.opensearch.common.CheckedTriFunction;
33+
import org.opensearch.common.CheckedConsumer;
3434
import org.opensearch.common.StreamContext;
3535
import org.opensearch.common.blobstore.BlobPath;
3636
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;
@@ -52,6 +52,7 @@
5252
import org.opensearch.test.OpenSearchTestCase;
5353
import org.opensearch.threadpool.Scheduler;
5454
import org.junit.After;
55+
import org.junit.Assert;
5556
import org.junit.Before;
5657

5758
import java.io.IOException;
@@ -513,24 +514,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
513514
exceptionRef.set(ex);
514515
countDownLatch.countDown();
515516
});
516-
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
517-
@Override
518-
public StreamContext supplyStreamContext(long partSize) {
519-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
520-
@Override
521-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
522-
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
523-
openInputStreams.add(inputStream);
524-
return new InputStreamContainer(inputStream, size, position);
525-
}
526-
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
527-
}
528-
}, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> {
517+
518+
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
519+
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
520+
openInputStreams.add(inputStream);
521+
return new InputStreamContainer(inputStream, size, position);
522+
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
523+
524+
CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
529525
assertTrue(uploadSuccess);
530526
if (throwExceptionOnFinalizeUpload) {
531527
throw new RuntimeException();
532528
}
533-
}, false, null), completionListener);
529+
};
530+
531+
WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
532+
.streamContextSupplier(streamContextSupplier)
533+
.fileSize(bytes.length)
534+
.failIfAlreadyExists(false)
535+
.writePriority(WritePriority.NORMAL)
536+
.uploadFinalizer(uploadFinalizer)
537+
.doRemoteDataIntegrityCheck(false)
538+
.build();
539+
540+
blobContainer.asyncBlobUpload(writeContext, completionListener);
534541

535542
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
536543
// wait for completableFuture to finish
@@ -563,24 +570,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
563570
countDownLatch.countDown();
564571
});
565572
List<InputStream> openInputStreams = new ArrayList<>();
566-
blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
567-
@Override
568-
public StreamContext supplyStreamContext(long partSize) {
569-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
570-
@Override
571-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
572-
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
573-
openInputStreams.add(inputStream);
574-
return new InputStreamContainer(inputStream, size, position);
575-
}
576-
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
577-
}
578-
}, blobSize, false, WritePriority.NORMAL, uploadSuccess -> {
573+
574+
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
575+
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
576+
openInputStreams.add(inputStream);
577+
return new InputStreamContainer(inputStream, size, position);
578+
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));
579+
580+
CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
579581
assertTrue(uploadSuccess);
580582
if (throwExceptionOnFinalizeUpload) {
581583
throw new RuntimeException();
582584
}
583-
}, false, null), completionListener);
585+
};
586+
587+
WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
588+
.streamContextSupplier(streamContextSupplier)
589+
.fileSize(blobSize)
590+
.failIfAlreadyExists(false)
591+
.writePriority(WritePriority.NORMAL)
592+
.uploadFinalizer(uploadFinalizer)
593+
.doRemoteDataIntegrityCheck(false)
594+
.build();
595+
596+
blobContainer.asyncBlobUpload(writeContext, completionListener);
584597

585598
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
586599
if (expectException || throwExceptionOnFinalizeUpload) {
@@ -695,20 +708,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
695708

696709
List<InputStream> openInputStreams = new ArrayList<>();
697710
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));
698-
s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
699-
@Override
700-
public StreamContext supplyStreamContext(long partSize) {
701-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
702-
@Override
703-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
704-
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
705-
openInputStreams.add(inputStream);
706-
return new InputStreamContainer(inputStream, size, position);
707-
}
708-
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
709-
}
710-
}, blobSize, false, writePriority, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener);
711711

712+
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
713+
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
714+
openInputStreams.add(inputStream);
715+
return new InputStreamContainer(inputStream, size, position);
716+
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));
717+
718+
WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
719+
.streamContextSupplier(streamContextSupplier)
720+
.fileSize(blobSize)
721+
.failIfAlreadyExists(false)
722+
.writePriority(writePriority)
723+
.uploadFinalizer(Assert::assertTrue)
724+
.doRemoteDataIntegrityCheck(false)
725+
.build();
726+
727+
s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
712728
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
713729
if (expectException) {
714730
assertNotNull(exceptionRef.get());

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
import org.apache.http.HttpStatus;
3939
import org.opensearch.cluster.metadata.RepositoryMetadata;
40-
import org.opensearch.common.CheckedTriFunction;
4140
import org.opensearch.common.Nullable;
4241
import org.opensearch.common.StreamContext;
4342
import org.opensearch.common.SuppressForbidden;
@@ -375,22 +374,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
375374
exceptionRef.set(ex);
376375
countDownLatch.countDown();
377376
});
378-
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
379-
@Override
380-
public StreamContext supplyStreamContext(long partSize) {
381-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
382-
@Override
383-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
384-
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
385-
openInputStreams.add(inputStream);
386-
return new InputStreamContainer(inputStream, size, position);
387-
}
388-
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
389-
}
390-
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener);
391377

378+
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
379+
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
380+
openInputStreams.add(inputStream);
381+
return new InputStreamContainer(inputStream, size, position);
382+
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
383+
384+
WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
385+
.streamContextSupplier(streamContextSupplier)
386+
.fileSize(bytes.length)
387+
.failIfAlreadyExists(false)
388+
.writePriority(WritePriority.NORMAL)
389+
.uploadFinalizer(Assert::assertTrue)
390+
.doRemoteDataIntegrityCheck(false)
391+
.build();
392+
393+
blobContainer.asyncBlobUpload(writeContext, completionListener);
392394
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
393-
394395
assertThat(countDown.isCountedDown(), is(true));
395396

396397
openInputStreams.forEach(inputStream -> {

server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
131131
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
132132
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
133133
}
134-
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
134+
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
135135
listener.onResponse(blobReadContext);
136136
} catch (Exception e) {
137137
listener.onFailure(e);

server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

+77
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.common.blobstore;
3434

35+
import org.opensearch.common.annotation.ExperimentalApi;
3536
import org.opensearch.core.action.ActionListener;
3637

3738
import java.io.IOException;
@@ -77,6 +78,20 @@ public interface BlobContainer {
7778
*/
7879
InputStream readBlob(String blobName) throws IOException;
7980

81+
/**
82+
* Creates a new {@link BlobDownloadResponse} for the given blob name.
83+
*
84+
* @param blobName
85+
* The name of the blob to get an {@link InputStream} for.
86+
* @return The {@link BlobDownloadResponse} of the blob.
87+
* @throws NoSuchFileException if the blob does not exist
88+
* @throws IOException if the blob can not be read.
89+
*/
90+
@ExperimentalApi
91+
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
92+
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");
93+
};
94+
8095
/**
8196
* Creates a new {@link InputStream} that can be used to read the given blob starting from
8297
* a specific {@code position} in the blob. The {@code length} is an indication of the
@@ -128,6 +143,36 @@ default long readBlobPreferredLength() {
128143
*/
129144
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
130145

146+
/**
147+
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
148+
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
149+
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
150+
*
151+
* @param blobName
152+
* The name of the blob to write the contents of the input stream to.
153+
* @param inputStream
154+
* The input stream from which to retrieve the bytes to write to the blob.
155+
* @param metadata
156+
* The metadata to be associate with the blob upload.
157+
* @param blobSize
158+
* The size of the blob to be written, in bytes. It is implementation dependent whether
159+
* this value is used in writing the blob to the repository.
160+
* @param failIfAlreadyExists
161+
* whether to throw a FileAlreadyExistsException if the given blob already exists
162+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
163+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
164+
*/
165+
@ExperimentalApi
166+
default void writeBlobWithMetadata(
167+
String blobName,
168+
InputStream inputStream,
169+
Map<String, String> metadata,
170+
long blobSize,
171+
boolean failIfAlreadyExists
172+
) throws IOException {
173+
throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet");
174+
};
175+
131176
/**
132177
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
133178
* using an atomic write operation if the implementation supports it.
@@ -149,6 +194,38 @@ default long readBlobPreferredLength() {
149194
*/
150195
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
151196

197+
/**
198+
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata
199+
* using an atomic write operation if the implementation supports it.
200+
* <p>
201+
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
202+
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
203+
*
204+
* @param blobName
205+
* The name of the blob to write the contents of the input stream to.
206+
* @param inputStream
207+
* The input stream from which to retrieve the bytes to write to the blob.
208+
* @param metadata
209+
* The metadata to be associate with the blob upload.
210+
* @param blobSize
211+
* The size of the blob to be written, in bytes. It is implementation dependent whether
212+
* this value is used in writing the blob to the repository.
213+
* @param failIfAlreadyExists
214+
* whether to throw a FileAlreadyExistsException if the given blob already exists
215+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
216+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
217+
*/
218+
@ExperimentalApi
219+
default void writeBlobAtomicWithMetadata(
220+
String blobName,
221+
InputStream inputStream,
222+
Map<String, String> metadata,
223+
long blobSize,
224+
boolean failIfAlreadyExists
225+
) throws IOException {
226+
throw new UnsupportedOperationException("writeBlobAtomicWithMetadata is not implemented yet");
227+
};
228+
152229
/**
153230
* Deletes this container and all its contents from the repository.
154231
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore;
10+
11+
import java.io.InputStream;
12+
import java.util.Map;
13+
14+
/**
15+
* Represents the response from a blob download operation, containing both the
16+
* input stream of the blob content and the associated metadata.
17+
*
18+
* @opensearch.experimental
19+
*/
20+
public class BlobDownloadResponse {
21+
22+
/**
23+
* Downloaded blob InputStream
24+
*/
25+
private final InputStream inputStream;
26+
27+
/**
28+
* Metadata of the downloaded blob
29+
*/
30+
private final Map<String, String> metadata;
31+
32+
public InputStream getInputStream() {
33+
return inputStream;
34+
}
35+
36+
public Map<String, String> getMetadata() {
37+
return metadata;
38+
}
39+
40+
public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
41+
this.inputStream = inputStream;
42+
this.metadata = metadata;
43+
}
44+
45+
}

0 commit comments

Comments
 (0)