Skip to content

Commit 90aa335

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add download + indexOuput#write implementation to RemoteIndexBuildStrategy
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 7511f21 commit 90aa335

File tree

4 files changed

+116
-6
lines changed

4 files changed

+116
-6
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
### Features
99
* [Remote Vector Index Build] Introduce Remote Native Index Build feature flag, settings, and initial skeleton [#2525](https://github.com/opensearch-project/k-NN/pull/2525)
1010
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
11+
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
1112
### Enhancements
1213
### Bug Fixes
1314
### Infrastructure

src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.knn.index.codec.nativeindex.remote;
77

8+
import com.google.common.annotations.VisibleForTesting;
89
import lombok.extern.log4j.Log4j2;
910
import org.apache.commons.lang.NotImplementedException;
1011
import org.opensearch.action.LatchedActionListener;
@@ -25,6 +26,7 @@
2526
import org.opensearch.knn.index.VectorDataType;
2627
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
2728
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
29+
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
2830
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
2931
import org.opensearch.repositories.RepositoriesService;
3032
import org.opensearch.repositories.Repository;
@@ -138,12 +140,12 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
138140
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
139141

140142
stopWatch = new StopWatch().start();
141-
awaitVectorBuild();
143+
BlobPath downloadPath = awaitVectorBuild();
142144
time_in_millis = stopWatch.stop().totalTime().millis();
143145
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
144146

145147
stopWatch = new StopWatch().start();
146-
readFromRepository();
148+
readFromRepository(downloadPath, indexInfo.getIndexOutputWithBuffer());
147149
time_in_millis = stopWatch.stop().totalTime().millis();
148150
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
149151
} catch (Exception e) {
@@ -335,15 +337,22 @@ private void submitVectorBuild() {
335337

336338
/**
337339
* Wait on remote vector build to complete
340+
* @return BlobPath The path from which we should perform download
338341
*/
339-
private void awaitVectorBuild() {
342+
private BlobPath awaitVectorBuild() throws NotImplementedException {
340343
throw new NotImplementedException();
341344
}
342345

343346
/**
344347
* Read constructed vector file from remote repository and write to IndexOutput
345348
*/
346-
private void readFromRepository() {
347-
throw new NotImplementedException();
349+
@VisibleForTesting
350+
void readFromRepository(BlobPath downloadPath, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
351+
BlobContainer blobContainer = getRepository().blobStore().blobContainer(downloadPath.parent());
352+
// TODO: We are using the sequential download API as multi-part parallel download is difficult for us to implement today and
353+
// requires some changes in core. For more details, see: https://github.com/opensearch-project/k-NN/issues/2464
354+
String fileName = downloadPath.toArray()[downloadPath.toArray().length - 1];
355+
InputStream graphStream = blobContainer.readBlob(fileName);
356+
indexOutputWithBuffer.writeFromStreamWithBuffer(graphStream);
348357
}
349358
}

src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,25 @@
88
import org.apache.lucene.store.IndexOutput;
99

1010
import java.io.IOException;
11+
import java.io.InputStream;
1112

13+
/**
14+
* Wrapper around {@link IndexOutput} to perform writes in a buffered manner. This class is created per flush/merge, and may be used twice if
15+
* {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} needs to fall back to a different build strategy.
16+
*/
1217
public class IndexOutputWithBuffer {
1318
// Underlying `IndexOutput` obtained from Lucene's Directory.
1419
private IndexOutput indexOutput;
1520
// Write buffer. Native engine will copy bytes into this buffer.
1621
// Allocating 64KB here since it show better performance in NMSLIB with the size. (We had slightly improvement in FAISS than having 4KB)
1722
// NMSLIB writes an adjacent list size first, then followed by serializing the list. Since we usually have more adjacent lists, having
1823
// 64KB to accumulate bytes as possible to reduce the times of calling `writeBytes`.
19-
private byte[] buffer = new byte[64 * 1024];
24+
private static final int CHUNK_SIZE = 64 * 1024;
25+
private final byte[] buffer;
2026

2127
public IndexOutputWithBuffer(IndexOutput indexOutput) {
2228
this.indexOutput = indexOutput;
29+
this.buffer = new byte[CHUNK_SIZE];
2330
}
2431

2532
// This method will be called in JNI layer which precisely knows
@@ -33,6 +40,43 @@ public void writeBytes(int length) {
3340
}
3441
}
3542

43+
/**
44+
* Writes to the {@link IndexOutput} by buffering bytes into the existing buffer in this class.
45+
*
46+
* @param inputStream The stream from which we are reading bytes to write
47+
* @throws IOException
48+
* @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream, byte[])
49+
*/
50+
public void writeFromStreamWithBuffer(InputStream inputStream) throws IOException {
51+
writeFromStreamWithBuffer(inputStream, this.buffer);
52+
}
53+
54+
/**
55+
* Writes to the {@link IndexOutput} by buffering bytes with @param outputBuffer. This method allows
56+
* {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} to provide a separate, larger buffer as that buffer is for buffering
57+
* bytes downloaded from the repository, so it may be more performant to use a larger buffer.
58+
* We do not change the size of the existing buffer in case a fallback to the existing build strategy is needed.
59+
* TODO: Tune the size of the buffer used by RemoteIndexBuildStrategy based on benchmarking
60+
*
61+
* @param inputStream The stream from which we are reading bytes to write
62+
* @param outputBuffer The buffer used to buffer bytes
63+
* @throws IOException
64+
* @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream)
65+
*/
66+
public void writeFromStreamWithBuffer(InputStream inputStream, byte[] outputBuffer) throws IOException {
67+
int bytesRead = 0;
68+
// InputStream uses -1 indicates there are no more bytes to be read
69+
while (bytesRead != -1) {
70+
// Try to read CHUNK_SIZE into the buffer. The actual amount read may be less.
71+
bytesRead = inputStream.read(outputBuffer, 0, CHUNK_SIZE);
72+
assert bytesRead <= CHUNK_SIZE;
73+
// However many bytes we read, write it to the IndexOutput if != -1
74+
if (bytesRead != -1) {
75+
indexOutput.writeBytes(outputBuffer, 0, bytesRead);
76+
}
77+
}
78+
}
79+
3680
@Override
3781
public String toString() {
3882
return "{indexOutput=" + indexOutput + ", len(buffer)=" + buffer.length + "}";

src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategyTests.java

+56
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.apache.lucene.search.Sort;
1313
import org.apache.lucene.store.Directory;
1414
import org.apache.lucene.store.IOContext;
15+
import org.apache.lucene.store.IndexInput;
16+
import org.apache.lucene.store.IndexOutput;
1517
import org.apache.lucene.util.InfoStream;
1618
import org.apache.lucene.util.Version;
1719
import org.junit.Before;
@@ -42,11 +44,13 @@
4244
import org.opensearch.repositories.RepositoryMissingException;
4345
import org.opensearch.repositories.blobstore.BlobStoreRepository;
4446

47+
import java.io.ByteArrayInputStream;
4548
import java.io.IOException;
4649
import java.io.InputStream;
4750
import java.nio.file.Path;
4851
import java.util.List;
4952
import java.util.Map;
53+
import java.util.Random;
5054

5155
import static org.mockito.ArgumentMatchers.any;
5256
import static org.mockito.Mockito.mock;
@@ -208,4 +212,56 @@ public void testRepositoryInteraction() throws IOException {
208212
verify(mockBlobStore).blobContainer(any());
209213
verify(mockRepository).basePath();
210214
}
215+
216+
/**
217+
* Verify the buffered read method in {@link RemoteIndexBuildStrategy#readFromRepository} produces the correct result
218+
*/
219+
public void testRepositoryRead() throws IOException {
220+
// Create an InputStream with random values
221+
int TEST_ARRAY_SIZE = 64 * 1024 * 10;
222+
byte[] byteArray = new byte[TEST_ARRAY_SIZE];
223+
Random random = new Random();
224+
random.nextBytes(byteArray);
225+
InputStream randomStream = new ByteArrayInputStream(byteArray);
226+
227+
// Create a test segment that we will read/write from
228+
Directory directory;
229+
directory = newFSDirectory(createTempDir());
230+
String TEST_SEGMENT_NAME = "test-segment-name";
231+
IndexOutput testIndexOutput = directory.createOutput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
232+
IndexOutputWithBuffer testIndexOutputWithBuffer = new IndexOutputWithBuffer(testIndexOutput);
233+
234+
// Set up RemoteIndexBuildStrategy and write to IndexOutput
235+
RepositoriesService repositoriesService = mock(RepositoriesService.class);
236+
BlobStoreRepository mockRepository = mock(BlobStoreRepository.class);
237+
BlobPath testBasePath = new BlobPath().add("testBasePath");
238+
BlobStore mockBlobStore = mock(BlobStore.class);
239+
AsyncMultiStreamBlobContainer mockBlobContainer = mock(AsyncMultiStreamBlobContainer.class);
240+
241+
when(repositoriesService.repository(any())).thenReturn(mockRepository);
242+
when(mockRepository.basePath()).thenReturn(testBasePath);
243+
when(mockRepository.blobStore()).thenReturn(mockBlobStore);
244+
when(mockBlobStore.blobContainer(any())).thenReturn(mockBlobContainer);
245+
when(mockBlobContainer.readBlob("testFile")).thenReturn(randomStream);
246+
247+
RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy(
248+
() -> repositoriesService,
249+
mock(NativeIndexBuildStrategy.class),
250+
mock(IndexSettings.class)
251+
);
252+
// This should read from randomStream into testIndexOutput
253+
BlobPath testPath = new BlobPath().add("testBasePath").add("testDirectory").add("testFile");
254+
objectUnderTest.readFromRepository(testPath, testIndexOutputWithBuffer);
255+
testIndexOutput.close();
256+
257+
// Now try to read from the IndexOutput
258+
IndexInput testIndexInput = directory.openInput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
259+
byte[] resultByteArray = new byte[TEST_ARRAY_SIZE];
260+
testIndexInput.readBytes(resultByteArray, 0, TEST_ARRAY_SIZE);
261+
assertArrayEquals(byteArray, resultByteArray);
262+
263+
// Test Cleanup
264+
testIndexInput.close();
265+
directory.close();
266+
}
211267
}

0 commit comments

Comments
 (0)