Skip to content

Commit 29a7a03

Browse files
soosinhawangdongyu.danny
authored and
wangdongyu.danny
committed
Create new IndexInput for multi part upload (opensearch-project#14888)
* Create new IndexInput for multi part upload Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent 7c75162 commit 29a7a03

File tree

4 files changed

+86
-54
lines changed

4 files changed

+86
-54
lines changed

server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java

+17-18
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,18 @@ public void uploadBlob(
131131
}
132132
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")";
133133
byte[] bytes = inputStream.readAllBytes();
134-
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
135-
long expectedChecksum = computeChecksum(input, resourceDescription);
136-
uploadBlobAsyncInternal(
137-
fileName,
138-
fileName,
139-
bytes.length,
140-
blobPath,
141-
writePriority,
142-
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
143-
expectedChecksum,
144-
listener,
145-
null
146-
);
147-
}
134+
long expectedChecksum = computeChecksum(bytes, resourceDescription);
135+
uploadBlobAsyncInternal(
136+
fileName,
137+
fileName,
138+
bytes.length,
139+
blobPath,
140+
writePriority,
141+
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
142+
expectedChecksum,
143+
listener,
144+
null
145+
);
148146
}
149147

150148
// Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file.
@@ -220,7 +218,8 @@ private void uploadBlob(
220218

221219
}
222220

223-
private void uploadBlobAsyncInternal(
221+
// package private for testing
222+
void uploadBlobAsyncInternal(
224223
String fileName,
225224
String remoteFileName,
226225
long contentLength,
@@ -335,10 +334,10 @@ public void listAllInSortedOrderAsync(
335334
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
336335
}
337336

338-
private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException {
337+
private static long computeChecksum(byte[] bytes, String resourceDescription) throws ChecksumCombinationException {
339338
long expectedChecksum;
340-
try {
341-
expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH);
339+
try (IndexInput indexInput = new ByteArrayIndexInput(resourceDescription, bytes)) {
340+
expectedChecksum = checksumOfChecksum(indexInput, CHECKSUM_BYTES_LENGTH);
342341
} catch (Exception e) {
343342
throw new ChecksumCombinationException(
344343
"Potentially corrupted file: Checksum combination failed while combining stored checksum "

server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java

+18-17
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,11 @@ private void writeAsyncWithPriority(
223223
return;
224224
}
225225
final String blobName = blobName(name);
226-
final BytesReference bytes = serialize(obj, blobName, compressor, params);
226+
final BytesReference bytesReference = serialize(obj, blobName, compressor, params);
227227
final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")";
228-
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) {
229-
long expectedChecksum;
228+
byte[] bytes = BytesReference.toBytes(bytesReference);
229+
long expectedChecksum;
230+
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
230231
try {
231232
expectedChecksum = checksumOfChecksum(input.clone(), 8);
232233
} catch (Exception e) {
@@ -237,21 +238,21 @@ private void writeAsyncWithPriority(
237238
e
238239
);
239240
}
241+
}
240242

241-
try (
242-
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
243-
blobName,
244-
blobName,
245-
bytes.length(),
246-
true,
247-
priority,
248-
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
249-
expectedChecksum,
250-
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
251-
)
252-
) {
253-
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
254-
}
243+
try (
244+
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
245+
blobName,
246+
blobName,
247+
bytes.length,
248+
true,
249+
priority,
250+
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
251+
expectedChecksum,
252+
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
253+
)
254+
) {
255+
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
255256
}
256257
}
257258

server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java

+23-17
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.repositories.blobstore;
1010

11-
import org.apache.lucene.store.IndexInput;
1211
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
1312
import org.opensearch.common.blobstore.BlobContainer;
1413
import org.opensearch.common.blobstore.stream.write.WritePriority;
@@ -51,23 +50,30 @@ public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, Str
5150
return;
5251
}
5352
String blobName = blobName(name);
54-
BytesReference bytes = serialize(obj, blobName, new NoneCompressor(), ToXContent.EMPTY_PARAMS, XContentType.JSON, null, null);
53+
BytesReference bytesReference = serialize(
54+
obj,
55+
blobName,
56+
new NoneCompressor(),
57+
ToXContent.EMPTY_PARAMS,
58+
XContentType.JSON,
59+
null,
60+
null
61+
);
5562
String resourceDescription = "BlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")";
56-
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) {
57-
try (
58-
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
59-
blobName,
60-
blobName,
61-
bytes.length(),
62-
true,
63-
WritePriority.URGENT,
64-
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
65-
null,
66-
false
67-
)
68-
) {
69-
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
70-
}
63+
byte[] bytes = BytesReference.toBytes(bytesReference);
64+
try (
65+
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
66+
blobName,
67+
blobName,
68+
bytes.length,
69+
true,
70+
WritePriority.URGENT,
71+
(size, position) -> new OffsetRangeIndexInputStream(new ByteArrayIndexInput(resourceDescription, bytes), size, position),
72+
null,
73+
false
74+
)
75+
) {
76+
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
7177
}
7278
}
7379
}

server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.opensearch.common.blobstore.stream.read.ReadContext;
2323
import org.opensearch.common.blobstore.stream.write.WriteContext;
2424
import org.opensearch.common.blobstore.stream.write.WritePriority;
25+
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
26+
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
2527
import org.opensearch.common.settings.ClusterSettings;
2628
import org.opensearch.common.settings.Settings;
2729
import org.opensearch.core.action.ActionListener;
@@ -54,9 +56,13 @@
5456
import java.util.concurrent.TimeUnit;
5557
import java.util.concurrent.atomic.AtomicBoolean;
5658

59+
import org.mockito.ArgumentCaptor;
60+
import org.mockito.Mockito;
61+
5762
import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY;
5863
import static org.mockito.ArgumentMatchers.any;
5964
import static org.mockito.Mockito.mock;
65+
import static org.mockito.Mockito.verify;
6066
import static org.mockito.Mockito.when;
6167

6268
public class BlobStoreTransferServiceTests extends OpenSearchTestCase {
@@ -139,8 +145,28 @@ public void testUploadBlobFromInputStreamAsyncFSRepo() throws IOException, Inter
139145
FsBlobStore fsBlobStore = mock(FsBlobStore.class);
140146
when(fsBlobStore.blobContainer(any())).thenReturn(mockAsyncFsContainer);
141147

142-
TransferService transferService = new BlobStoreTransferService(fsBlobStore, threadPool);
143-
uploadBlobFromInputStream(transferService);
148+
BlobStoreTransferService transferServiceSpy = Mockito.spy(new BlobStoreTransferService(fsBlobStore, threadPool));
149+
uploadBlobFromInputStream(transferServiceSpy);
150+
151+
ArgumentCaptor<RemoteTransferContainer.OffsetRangeInputStreamSupplier> inputStreamCaptor = ArgumentCaptor.forClass(
152+
RemoteTransferContainer.OffsetRangeInputStreamSupplier.class
153+
);
154+
verify(transferServiceSpy).uploadBlobAsyncInternal(
155+
Mockito.anyString(),
156+
Mockito.anyString(),
157+
Mockito.anyLong(),
158+
Mockito.any(),
159+
Mockito.any(),
160+
inputStreamCaptor.capture(),
161+
Mockito.anyLong(),
162+
Mockito.any(),
163+
Mockito.any()
164+
);
165+
RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier = inputStreamCaptor.getValue();
166+
OffsetRangeInputStream inputStream1 = inputStreamSupplier.get(1, 0);
167+
OffsetRangeInputStream inputStream2 = inputStreamSupplier.get(1, 2);
168+
assertNotEquals(inputStream1, inputStream2);
169+
assertNotEquals(inputStream1.getFilePointer(), inputStream2.getFilePointer());
144170
}
145171

146172
private IndexMetadata getIndexMetadata() {

0 commit comments

Comments
 (0)