Skip to content

Commit 9b0f578

Browse files
authored
Introduce interface changes to support read/write blob with object metadata (#13023)
* Introduce interface changes to read/write blob with object metadata --------- Signed-off-by: Sandeep Kumawat <skumwt@amazon.com> Co-authored-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent 8d9e12d commit 9b0f578

File tree

13 files changed

+406
-79
lines changed

13 files changed

+406
-79
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
285285
);
286286
}
287287
}
288-
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
288+
listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build());
289289
} catch (Exception ex) {
290290
listener.onFailure(ex);
291291
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

+58-42
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
import org.apache.lucene.store.IndexInput;
3232
import org.opensearch.cluster.metadata.RepositoryMetadata;
33-
import org.opensearch.common.CheckedTriFunction;
33+
import org.opensearch.common.CheckedConsumer;
3434
import org.opensearch.common.StreamContext;
3535
import org.opensearch.common.blobstore.BlobPath;
3636
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;
@@ -49,6 +49,7 @@
4949
import org.opensearch.repositories.s3.async.AsyncTransferManager;
5050
import org.opensearch.test.OpenSearchTestCase;
5151
import org.junit.After;
52+
import org.junit.Assert;
5253
import org.junit.Before;
5354

5455
import java.io.IOException;
@@ -466,24 +467,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
466467
exceptionRef.set(ex);
467468
countDownLatch.countDown();
468469
});
469-
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
470-
@Override
471-
public StreamContext supplyStreamContext(long partSize) {
472-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
473-
@Override
474-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
475-
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
476-
openInputStreams.add(inputStream);
477-
return new InputStreamContainer(inputStream, size, position);
478-
}
479-
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
480-
}
481-
}, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> {
470+
471+
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
472+
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
473+
openInputStreams.add(inputStream);
474+
return new InputStreamContainer(inputStream, size, position);
475+
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
476+
477+
CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
482478
assertTrue(uploadSuccess);
483479
if (throwExceptionOnFinalizeUpload) {
484480
throw new RuntimeException();
485481
}
486-
}, false, null), completionListener);
482+
};
483+
484+
WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
485+
.streamContextSupplier(streamContextSupplier)
486+
.fileSize(bytes.length)
487+
.failIfAlreadyExists(false)
488+
.writePriority(WritePriority.NORMAL)
489+
.uploadFinalizer(uploadFinalizer)
490+
.doRemoteDataIntegrityCheck(false)
491+
.build();
492+
493+
blobContainer.asyncBlobUpload(writeContext, completionListener);
487494

488495
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
489496
// wait for completableFuture to finish
@@ -516,24 +523,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
516523
countDownLatch.countDown();
517524
});
518525
List<InputStream> openInputStreams = new ArrayList<>();
519-
blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
520-
@Override
521-
public StreamContext supplyStreamContext(long partSize) {
522-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
523-
@Override
524-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
525-
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
526-
openInputStreams.add(inputStream);
527-
return new InputStreamContainer(inputStream, size, position);
528-
}
529-
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
530-
}
531-
}, blobSize, false, WritePriority.NORMAL, uploadSuccess -> {
526+
527+
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
528+
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
529+
openInputStreams.add(inputStream);
530+
return new InputStreamContainer(inputStream, size, position);
531+
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));
532+
533+
CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
532534
assertTrue(uploadSuccess);
533535
if (throwExceptionOnFinalizeUpload) {
534536
throw new RuntimeException();
535537
}
536-
}, false, null), completionListener);
538+
};
539+
540+
WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
541+
.streamContextSupplier(streamContextSupplier)
542+
.fileSize(blobSize)
543+
.failIfAlreadyExists(false)
544+
.writePriority(WritePriority.NORMAL)
545+
.uploadFinalizer(uploadFinalizer)
546+
.doRemoteDataIntegrityCheck(false)
547+
.build();
548+
549+
blobContainer.asyncBlobUpload(writeContext, completionListener);
537550

538551
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
539552
if (expectException || throwExceptionOnFinalizeUpload) {
@@ -632,20 +645,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
632645

633646
List<InputStream> openInputStreams = new ArrayList<>();
634647
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));
635-
s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
636-
@Override
637-
public StreamContext supplyStreamContext(long partSize) {
638-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
639-
@Override
640-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
641-
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
642-
openInputStreams.add(inputStream);
643-
return new InputStreamContainer(inputStream, size, position);
644-
}
645-
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
646-
}
647-
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener);
648648

649+
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
650+
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
651+
openInputStreams.add(inputStream);
652+
return new InputStreamContainer(inputStream, size, position);
653+
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));
654+
655+
WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
656+
.streamContextSupplier(streamContextSupplier)
657+
.fileSize(blobSize)
658+
.failIfAlreadyExists(false)
659+
.writePriority(WritePriority.HIGH)
660+
.uploadFinalizer(Assert::assertTrue)
661+
.doRemoteDataIntegrityCheck(false)
662+
.build();
663+
664+
s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
649665
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
650666
if (expectException) {
651667
assertNotNull(exceptionRef.get());

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
import org.apache.http.HttpStatus;
3939
import org.opensearch.cluster.metadata.RepositoryMetadata;
40-
import org.opensearch.common.CheckedTriFunction;
4140
import org.opensearch.common.Nullable;
4241
import org.opensearch.common.StreamContext;
4342
import org.opensearch.common.SuppressForbidden;
@@ -332,22 +331,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
332331
exceptionRef.set(ex);
333332
countDownLatch.countDown();
334333
});
335-
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
336-
@Override
337-
public StreamContext supplyStreamContext(long partSize) {
338-
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
339-
@Override
340-
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
341-
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
342-
openInputStreams.add(inputStream);
343-
return new InputStreamContainer(inputStream, size, position);
344-
}
345-
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
346-
}
347-
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener);
348334

335+
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
336+
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
337+
openInputStreams.add(inputStream);
338+
return new InputStreamContainer(inputStream, size, position);
339+
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
340+
341+
WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
342+
.streamContextSupplier(streamContextSupplier)
343+
.fileSize(bytes.length)
344+
.failIfAlreadyExists(false)
345+
.writePriority(WritePriority.NORMAL)
346+
.uploadFinalizer(Assert::assertTrue)
347+
.doRemoteDataIntegrityCheck(false)
348+
.build();
349+
350+
blobContainer.asyncBlobUpload(writeContext, completionListener);
349351
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
350-
351352
assertThat(countDown.isCountedDown(), is(true));
352353

353354
openInputStreams.forEach(inputStream -> {

server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
131131
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
132132
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
133133
}
134-
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
134+
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
135135
listener.onResponse(blobReadContext);
136136
} catch (Exception e) {
137137
listener.onFailure(e);

server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

+77
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.common.blobstore;
3434

35+
import org.opensearch.common.annotation.ExperimentalApi;
3536
import org.opensearch.core.action.ActionListener;
3637

3738
import java.io.IOException;
@@ -77,6 +78,20 @@ public interface BlobContainer {
7778
*/
7879
InputStream readBlob(String blobName) throws IOException;
7980

81+
/**
82+
* Creates a new {@link BlobDownloadResponse} for the given blob name.
83+
*
84+
* @param blobName
85+
* The name of the blob to get an {@link InputStream} for.
86+
* @return The {@link BlobDownloadResponse} of the blob.
87+
* @throws NoSuchFileException if the blob does not exist
88+
* @throws IOException if the blob can not be read.
89+
*/
90+
@ExperimentalApi
91+
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
92+
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");
93+
};
94+
8095
/**
8196
* Creates a new {@link InputStream} that can be used to read the given blob starting from
8297
* a specific {@code position} in the blob. The {@code length} is an indication of the
@@ -128,6 +143,36 @@ default long readBlobPreferredLength() {
128143
*/
129144
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
130145

146+
/**
147+
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
148+
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
149+
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
150+
*
151+
* @param blobName
152+
* The name of the blob to write the contents of the input stream to.
153+
* @param inputStream
154+
* The input stream from which to retrieve the bytes to write to the blob.
155+
* @param metadata
156+
* The metadata to be associate with the blob upload.
157+
* @param blobSize
158+
* The size of the blob to be written, in bytes. It is implementation dependent whether
159+
* this value is used in writing the blob to the repository.
160+
* @param failIfAlreadyExists
161+
* whether to throw a FileAlreadyExistsException if the given blob already exists
162+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
163+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
164+
*/
165+
@ExperimentalApi
166+
default void writeBlobWithMetadata(
167+
String blobName,
168+
InputStream inputStream,
169+
Map<String, String> metadata,
170+
long blobSize,
171+
boolean failIfAlreadyExists
172+
) throws IOException {
173+
throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet");
174+
};
175+
131176
/**
132177
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
133178
* using an atomic write operation if the implementation supports it.
@@ -149,6 +194,38 @@ default long readBlobPreferredLength() {
149194
*/
150195
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
151196

197+
/**
198+
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata
199+
* using an atomic write operation if the implementation supports it.
200+
* <p>
201+
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
202+
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
203+
*
204+
* @param blobName
205+
* The name of the blob to write the contents of the input stream to.
206+
* @param inputStream
207+
* The input stream from which to retrieve the bytes to write to the blob.
208+
* @param metadata
209+
* The metadata to be associate with the blob upload.
210+
* @param blobSize
211+
* The size of the blob to be written, in bytes. It is implementation dependent whether
212+
* this value is used in writing the blob to the repository.
213+
* @param failIfAlreadyExists
214+
* whether to throw a FileAlreadyExistsException if the given blob already exists
215+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
216+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
217+
*/
218+
@ExperimentalApi
219+
default void writeBlobAtomicWithMetadata(
220+
String blobName,
221+
InputStream inputStream,
222+
Map<String, String> metadata,
223+
long blobSize,
224+
boolean failIfAlreadyExists
225+
) throws IOException {
226+
throw new UnsupportedOperationException("writeBlobAtomicWithMetadata is not implemented yet");
227+
};
228+
152229
/**
153230
* Deletes this container and all its contents from the repository.
154231
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore;
10+
11+
import java.io.InputStream;
12+
import java.util.Map;
13+
14+
/**
15+
* Represents the response from a blob download operation, containing both the
16+
* input stream of the blob content and the associated metadata.
17+
*
18+
* @opensearch.experimental
19+
*/
20+
public class BlobDownloadResponse {
21+
22+
/**
23+
* Downloaded blob InputStream
24+
*/
25+
private final InputStream inputStream;
26+
27+
/**
28+
* Metadata of the downloaded blob
29+
*/
30+
private final Map<String, String> metadata;
31+
32+
public InputStream getInputStream() {
33+
return inputStream;
34+
}
35+
36+
public Map<String, String> getMetadata() {
37+
return metadata;
38+
}
39+
40+
public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
41+
this.inputStream = inputStream;
42+
this.metadata = metadata;
43+
}
44+
45+
}

0 commit comments

Comments
 (0)