Skip to content

Commit 689adc1

Browse files
authored
Add support for encrypted async blob read (opensearch-project#10131) (opensearch-project#10346)
* Add support for encrypted async blob read Signed-off-by: Kunal Kotwani <kkotwani@amazon.com> * Add async blob read support for encrypted containers Signed-off-by: Kunal Kotwani <kkotwani@amazon.com> --------- Signed-off-by: Kunal Kotwani <kkotwani@amazon.com> (cherry picked from commit c4c4ad8)
1 parent 912b60a commit 689adc1

File tree

6 files changed

+199
-10
lines changed

6 files changed

+199
-10
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
1010
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
1111
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
12+
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
1213
- Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110))
1314
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
1415
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))

server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java

+69-8
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
import org.opensearch.common.blobstore.stream.read.ReadContext;
1313
import org.opensearch.common.blobstore.stream.write.WriteContext;
1414
import org.opensearch.common.crypto.CryptoHandler;
15+
import org.opensearch.common.crypto.DecryptedRangedStreamProvider;
1516
import org.opensearch.common.io.InputStreamContainer;
1617
import org.opensearch.core.action.ActionListener;
17-
import org.opensearch.threadpool.ThreadPool;
1818

1919
import java.io.IOException;
20-
import java.nio.file.Path;
20+
import java.io.InputStream;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
2123

2224
/**
2325
* EncryptedBlobContainer is an encrypted BlobContainer that is backed by a
@@ -44,12 +46,17 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
4446

4547
@Override
4648
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
47-
throw new UnsupportedOperationException();
48-
}
49-
50-
@Override
51-
public void asyncBlobDownload(String blobName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
52-
throw new UnsupportedOperationException();
49+
try {
50+
final U cryptoContext = cryptoHandler.loadEncryptionMetadata(getEncryptedHeaderContentSupplier(blobName));
51+
ActionListener<ReadContext> decryptingCompletionListener = ActionListener.map(
52+
listener,
53+
readContext -> new DecryptedReadContext<>(readContext, cryptoHandler, cryptoContext)
54+
);
55+
56+
blobContainer.readBlobAsync(blobName, decryptingCompletionListener);
57+
} catch (Exception e) {
58+
listener.onFailure(e);
59+
}
5360
}
5461

5562
@Override
@@ -108,4 +115,58 @@ public InputStreamContainer provideStream(int partNumber) throws IOException {
108115
}
109116

110117
}
118+
119+
/**
120+
* DecryptedReadContext decrypts the encrypted {@link ReadContext} by acting as a transformation wrapper around
121+
* the encrypted object
122+
* @param <T> Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance
123+
* @param <U> Parsed Encryption Metadata / CryptoContext for the {@link CryptoHandler} instance
124+
*/
125+
static class DecryptedReadContext<T, U> extends ReadContext {
126+
127+
private final CryptoHandler<T, U> cryptoHandler;
128+
private final U cryptoContext;
129+
private Long blobSize;
130+
131+
public DecryptedReadContext(ReadContext readContext, CryptoHandler<T, U> cryptoHandler, U cryptoContext) {
132+
super(readContext);
133+
this.cryptoHandler = cryptoHandler;
134+
this.cryptoContext = cryptoContext;
135+
}
136+
137+
@Override
138+
public long getBlobSize() {
139+
// initializes the value lazily
140+
if (blobSize == null) {
141+
this.blobSize = this.cryptoHandler.estimateDecryptedLength(cryptoContext, super.getBlobSize());
142+
}
143+
return this.blobSize;
144+
}
145+
146+
@Override
147+
public List<InputStreamContainer> getPartStreams() {
148+
return super.getPartStreams().stream().map(this::decryptInputStreamContainer).collect(Collectors.toList());
149+
}
150+
151+
/**
152+
* Transforms an encrypted {@link InputStreamContainer} to a decrypted instance
153+
* @param inputStreamContainer encrypted input stream container instance
154+
* @return decrypted input stream container instance
155+
*/
156+
private InputStreamContainer decryptInputStreamContainer(InputStreamContainer inputStreamContainer) {
157+
long startOfStream = inputStreamContainer.getOffset();
158+
long endOfStream = startOfStream + inputStreamContainer.getContentLength() - 1;
159+
DecryptedRangedStreamProvider decryptedStreamProvider = cryptoHandler.createDecryptingStreamOfRange(
160+
cryptoContext,
161+
startOfStream,
162+
endOfStream
163+
);
164+
165+
long adjustedPos = decryptedStreamProvider.getAdjustedRange()[0];
166+
long adjustedLength = decryptedStreamProvider.getAdjustedRange()[1] - adjustedPos + 1;
167+
final InputStream decryptedStream = decryptedStreamProvider.getDecryptedStreamProvider()
168+
.apply(inputStreamContainer.getInputStream());
169+
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos);
170+
}
171+
}
111172
}

server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public InputStream readBlob(String blobName) throws IOException {
5050
return cryptoHandler.createDecryptingStream(inputStream);
5151
}
5252

53-
private EncryptedHeaderContentSupplier getEncryptedHeaderContentSupplier(String blobName) {
53+
EncryptedHeaderContentSupplier getEncryptedHeaderContentSupplier(String blobName) {
5454
return (start, end) -> {
5555
byte[] buffer;
5656
int length = (int) (end - start + 1);

server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java

+6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String
2828
this.blobChecksum = blobChecksum;
2929
}
3030

31+
public ReadContext(ReadContext readContext) {
32+
this.blobSize = readContext.blobSize;
33+
this.partStreams = readContext.partStreams;
34+
this.blobChecksum = readContext.blobChecksum;
35+
}
36+
3137
public String getBlobChecksum() {
3238
return blobChecksum;
3339
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore;
10+
11+
import org.opensearch.common.Randomness;
12+
import org.opensearch.common.blobstore.stream.read.ReadContext;
13+
import org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils;
14+
import org.opensearch.common.crypto.CryptoHandler;
15+
import org.opensearch.common.crypto.DecryptedRangedStreamProvider;
16+
import org.opensearch.common.io.InputStreamContainer;
17+
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.test.OpenSearchTestCase;
19+
20+
import java.io.ByteArrayInputStream;
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.function.UnaryOperator;
24+
25+
import org.mockito.Mockito;
26+
27+
import static org.mockito.ArgumentMatchers.any;
28+
import static org.mockito.ArgumentMatchers.anyLong;
29+
import static org.mockito.ArgumentMatchers.eq;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.when;
32+
33+
public class AsyncMultiStreamEncryptedBlobContainerTests extends OpenSearchTestCase {
34+
35+
// Tests the happy path scenario for decrypting a read context
36+
@SuppressWarnings("unchecked")
37+
public void testReadBlobAsync() throws Exception {
38+
String testBlobName = "testBlobName";
39+
int size = 100;
40+
41+
// Mock objects needed for the test
42+
AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class);
43+
CryptoHandler<Object, Object> cryptoHandler = mock(CryptoHandler.class);
44+
Object cryptoContext = mock(Object.class);
45+
when(cryptoHandler.loadEncryptionMetadata(any())).thenReturn(cryptoContext);
46+
when(cryptoHandler.estimateDecryptedLength(any(), anyLong())).thenReturn((long) size);
47+
long[] adjustedRanges = { 0, size - 1 };
48+
DecryptedRangedStreamProvider rangedStreamProvider = new DecryptedRangedStreamProvider(adjustedRanges, UnaryOperator.identity());
49+
when(cryptoHandler.createDecryptingStreamOfRange(eq(cryptoContext), anyLong(), anyLong())).thenReturn(rangedStreamProvider);
50+
51+
// Objects needed for API call
52+
final byte[] data = new byte[size];
53+
Randomness.get().nextBytes(data);
54+
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0);
55+
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
56+
new ListenerTestUtils.CountingCompletionListener<>();
57+
final ReadContext readContext = new ReadContext(size, List.of(inputStreamContainer), null);
58+
59+
Mockito.doAnswer(invocation -> {
60+
ActionListener<ReadContext> readContextActionListener = invocation.getArgument(1);
61+
readContextActionListener.onResponse(readContext);
62+
return null;
63+
}).when(blobContainer).readBlobAsync(eq(testBlobName), any());
64+
65+
AsyncMultiStreamEncryptedBlobContainer<Object, Object> asyncMultiStreamEncryptedBlobContainer =
66+
new AsyncMultiStreamEncryptedBlobContainer<>(blobContainer, cryptoHandler);
67+
asyncMultiStreamEncryptedBlobContainer.readBlobAsync(testBlobName, completionListener);
68+
69+
// Assert results
70+
ReadContext response = completionListener.getResponse();
71+
assertEquals(0, completionListener.getFailureCount());
72+
assertEquals(1, completionListener.getResponseCount());
73+
assertNull(completionListener.getException());
74+
75+
assertTrue(response instanceof AsyncMultiStreamEncryptedBlobContainer.DecryptedReadContext);
76+
assertEquals(1, response.getNumberOfParts());
77+
assertEquals(size, response.getBlobSize());
78+
79+
InputStreamContainer responseContainer = response.getPartStreams().get(0);
80+
assertEquals(0, responseContainer.getOffset());
81+
assertEquals(size, responseContainer.getContentLength());
82+
assertEquals(100, responseContainer.getInputStream().available());
83+
}
84+
85+
// Tests the exception scenario for decrypting a read context
86+
@SuppressWarnings("unchecked")
87+
public void testReadBlobAsyncException() throws Exception {
88+
String testBlobName = "testBlobName";
89+
int size = 100;
90+
91+
// Mock objects needed for the test
92+
AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class);
93+
CryptoHandler<Object, Object> cryptoHandler = mock(CryptoHandler.class);
94+
when(cryptoHandler.loadEncryptionMetadata(any())).thenThrow(new IOException());
95+
96+
// Objects needed for API call
97+
final byte[] data = new byte[size];
98+
Randomness.get().nextBytes(data);
99+
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0);
100+
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
101+
new ListenerTestUtils.CountingCompletionListener<>();
102+
final ReadContext readContext = new ReadContext(size, List.of(inputStreamContainer), null);
103+
104+
Mockito.doAnswer(invocation -> {
105+
ActionListener<ReadContext> readContextActionListener = invocation.getArgument(1);
106+
readContextActionListener.onResponse(readContext);
107+
return null;
108+
}).when(blobContainer).readBlobAsync(eq(testBlobName), any());
109+
110+
AsyncMultiStreamEncryptedBlobContainer<Object, Object> asyncMultiStreamEncryptedBlobContainer =
111+
new AsyncMultiStreamEncryptedBlobContainer<>(blobContainer, cryptoHandler);
112+
asyncMultiStreamEncryptedBlobContainer.readBlobAsync(testBlobName, completionListener);
113+
114+
// Assert results
115+
assertEquals(1, completionListener.getFailureCount());
116+
assertEquals(0, completionListener.getResponseCount());
117+
assertNull(completionListener.getResponse());
118+
assertTrue(completionListener.getException() instanceof IOException);
119+
}
120+
121+
}

server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ListenerTestUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class ListenerTestUtils {
1919
* CountingCompletionListener acts as a verification instance for wrapping listener based calls.
2020
* Keeps track of the last response, failure and count of response and failure invocations.
2121
*/
22-
static class CountingCompletionListener<T> implements ActionListener<T> {
22+
public static class CountingCompletionListener<T> implements ActionListener<T> {
2323
private int responseCount;
2424
private int failureCount;
2525
private T response;

0 commit comments

Comments
 (0)