Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 08c9432

Browse files
author
Sandeep Kumawat
committedApr 2, 2024·
Introduce interface changes to read/write blob with object metadata
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent a103b84 commit 08c9432

File tree

4 files changed

+129
-0
lines changed

4 files changed

+129
-0
lines changed
 

‎server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

+69
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ public interface BlobContainer {
7777
*/
7878
InputStream readBlob(String blobName) throws IOException;
7979

80+
/**
81+
* Creates a new {@link BlobDownloadResponse} for the given blob name.
82+
*
83+
* @param blobName
84+
* The name of the blob to get an {@link InputStream} for.
85+
* @return The {@code InputStream} to read the blob.
86+
* @throws NoSuchFileException if the blob does not exist
87+
* @throws IOException if the blob can not be read.
88+
*/
89+
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
90+
return null;
91+
};
92+
8093
/**
8194
* Creates a new {@link InputStream} that can be used to read the given blob starting from
8295
* a specific {@code position} in the blob. The {@code length} is an indication of the
@@ -128,6 +141,33 @@ default long readBlobPreferredLength() {
128141
*/
129142
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
130143

144+
/**
145+
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
146+
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
147+
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
148+
*
149+
* @param blobName
150+
* The name of the blob to write the contents of the input stream to.
151+
* @param inputStream
152+
* The input stream from which to retrieve the bytes to write to the blob.
153+
* @param metadata
154+
* The metadata to be associate with the blob upload.
155+
* @param blobSize
156+
* The size of the blob to be written, in bytes. It is implementation dependent whether
157+
* this value is used in writing the blob to the repository.
158+
* @param failIfAlreadyExists
159+
* whether to throw a FileAlreadyExistsException if the given blob already exists
160+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
161+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
162+
*/
163+
default void writeBlobWithMetadata(
164+
String blobName,
165+
InputStream inputStream,
166+
Map<String, String> metadata,
167+
long blobSize,
168+
boolean failIfAlreadyExists
169+
) throws IOException {};
170+
131171
/**
132172
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
133173
* using an atomic write operation if the implementation supports it.
@@ -149,6 +189,35 @@ default long readBlobPreferredLength() {
149189
*/
150190
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
151191

192+
/**
193+
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata
194+
* using an atomic write operation if the implementation supports it.
195+
* <p>
196+
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
197+
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
198+
*
199+
* @param blobName
200+
* The name of the blob to write the contents of the input stream to.
201+
* @param inputStream
202+
* The input stream from which to retrieve the bytes to write to the blob.
203+
* @param metadata
204+
* The metadata to be associate with the blob upload.
205+
* @param blobSize
206+
* The size of the blob to be written, in bytes. It is implementation dependent whether
207+
* this value is used in writing the blob to the repository.
208+
* @param failIfAlreadyExists
209+
* whether to throw a FileAlreadyExistsException if the given blob already exists
210+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
211+
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
212+
*/
213+
default void writeBlobAtomicWithMetadata(
214+
String blobName,
215+
InputStream inputStream,
216+
Map<String, String> metadata,
217+
long blobSize,
218+
boolean failIfAlreadyExists
219+
) throws IOException {};
220+
152221
/**
153222
* Deletes this container and all its contents from the repository.
154223
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore;
10+
11+
import java.io.InputStream;
12+
import java.util.Map;
13+
14+
/**
15+
* A class for blob download response
16+
*
17+
* @opensearch.internal
18+
*/
19+
public class BlobDownloadResponse {
20+
21+
/**
22+
* Downloaded blob InputStream
23+
*/
24+
private InputStream inputStream;
25+
26+
/**
27+
* Metadata of the downloaded blob
28+
*/
29+
private Map<String, String> metadata;
30+
31+
public InputStream getInputStream() {
32+
return inputStream;
33+
}
34+
35+
public Map<String, String> getMetadata() {
36+
return metadata;
37+
}
38+
39+
public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
40+
this.inputStream = inputStream;
41+
this.metadata = metadata;
42+
}
43+
44+
}

‎server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.action.ActionRunnable;
1515
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
1616
import org.opensearch.common.blobstore.BlobContainer;
17+
import org.opensearch.common.blobstore.BlobDownloadResponse;
1718
import org.opensearch.common.blobstore.BlobMetadata;
1819
import org.opensearch.common.blobstore.BlobPath;
1920
import org.opensearch.common.blobstore.BlobStore;
@@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
164165
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
165166
}
166167

168+
@Override
169+
public BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
170+
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
171+
}
172+
167173
@Override
168174
public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException {
169175
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);

‎server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java

+10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.translog.transfer;
1010

11+
import org.opensearch.common.blobstore.BlobDownloadResponse;
1112
import org.opensearch.common.blobstore.BlobMetadata;
1213
import org.opensearch.common.blobstore.BlobPath;
1314
import org.opensearch.common.blobstore.stream.write.WritePriority;
@@ -125,6 +126,15 @@ void uploadBlobs(
125126
*/
126127
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;
127128

129+
/**
130+
*
131+
* @param path the remote path from where download should be made
132+
* @param fileName the name of the file
133+
* @return {@link BlobDownloadResponse} of the remote file
134+
* @throws IOException the exception while reading the data
135+
*/
136+
BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException;
137+
128138
void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener);
129139

130140
void listAllInSortedOrderAsync(

0 commit comments

Comments
 (0)
Please sign in to comment.