Skip to content

Commit 6187dfb

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add download + indexOuput#write implementation to RemoteIndexBuildStrategy
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 5873add commit 6187dfb

File tree

6 files changed

+157
-9
lines changed

6 files changed

+157
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
### Features
99
* [Remote Vector Index Build] Introduce Remote Native Index Build feature flag, settings, and initial skeleton [#2525](https://github.com/opensearch-project/k-NN/pull/2525)
1010
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
11+
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
1112
### Enhancements
1213
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
1314
### Bug Fixes

src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/DefaultVectorRepositoryAccessor.java

+30
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
import org.opensearch.core.action.ActionListener;
2020
import org.opensearch.index.IndexSettings;
2121
import org.opensearch.knn.index.VectorDataType;
22+
import org.opensearch.knn.index.engine.KNNEngine;
23+
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
2224
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
2325
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2426

2527
import java.io.BufferedInputStream;
2628
import java.io.IOException;
2729
import java.io.InputStream;
30+
import java.nio.file.Path;
31+
import java.nio.file.Paths;
2832
import java.util.concurrent.CountDownLatch;
2933
import java.util.concurrent.atomic.AtomicReference;
3034
import java.util.function.Supplier;
@@ -210,4 +214,30 @@ private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOExceptio
210214
return new InputStreamContainer(vectorValuesInputStream, size, position);
211215
});
212216
}
217+
218+
@Override
219+
public void readFromRepository(String path, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
220+
if (path == null || path.isEmpty()) {
221+
throw new IllegalArgumentException("download path is null or empty");
222+
}
223+
Path downloadPath = Paths.get(path);
224+
String fileName = downloadPath.getFileName().toString();
225+
if (!fileName.endsWith(KNNEngine.FAISS.getExtension())) {
226+
log.error("download path [{}] does not end with extension [{}}", downloadPath, KNNEngine.FAISS.getExtension());
227+
throw new IllegalArgumentException("download path has incorrect file extension");
228+
}
229+
230+
BlobPath blobContainerPath = new BlobPath();
231+
if (downloadPath.getParent() != null) {
232+
for (Path p : downloadPath.getParent()) {
233+
blobContainerPath = blobContainerPath.add(p.getFileName().toString());
234+
}
235+
}
236+
237+
BlobContainer blobContainer = repository.blobStore().blobContainer(blobContainerPath);
238+
// TODO: We are using the sequential download API as multi-part parallel download is difficult for us to implement today and
239+
// requires some changes in core. For more details, see: https://github.com/opensearch-project/k-NN/issues/2464
240+
InputStream graphStream = blobContainer.readBlob(fileName);
241+
indexOutputWithBuffer.writeFromStreamWithBuffer(graphStream);
242+
}
213243
}

src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,12 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
131131
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
132132

133133
stopWatch = new StopWatch().start();
134-
awaitVectorBuild();
134+
String downloadPath = awaitVectorBuild();
135135
time_in_millis = stopWatch.stop().totalTime().millis();
136136
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
137137

138138
stopWatch = new StopWatch().start();
139-
vectorRepositoryAccessor.readFromRepository();
139+
vectorRepositoryAccessor.readFromRepository(downloadPath, indexInfo.getIndexOutputWithBuffer());
140140
time_in_millis = stopWatch.stop().totalTime().millis();
141141
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
142142
} catch (Exception e) {
@@ -174,8 +174,9 @@ private void submitVectorBuild() {
174174

175175
/**
176176
* Wait on remote vector build to complete
177+
* @return String The path from which we should perform download, delimited by "/"
177178
*/
178-
private void awaitVectorBuild() {
179+
private String awaitVectorBuild() throws NotImplementedException {
179180
throw new NotImplementedException();
180181
}
181182
}

src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/VectorRepositoryAccessor.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
package org.opensearch.knn.index.codec.nativeindex.remote;
77

8-
import org.apache.commons.lang.NotImplementedException;
98
import org.opensearch.knn.index.VectorDataType;
9+
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
1010
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
1111

1212
import java.io.IOException;
@@ -15,7 +15,7 @@
1515
/**
1616
* Interface which dictates how we use we interact with a {@link org.opensearch.repositories.blobstore.BlobStoreRepository} from {@link RemoteIndexBuildStrategy}
1717
*/
18-
public interface VectorRepositoryAccessor {
18+
interface VectorRepositoryAccessor {
1919
/**
2020
* This method is responsible for writing both the vector blobs and doc ids provided by {@param knnVectorValuesSupplier} to the configured repository
2121
*
@@ -35,8 +35,9 @@ void writeToRepository(
3535

3636
/**
3737
* Read constructed vector file from remote repository and write to IndexOutput
38+
* @param path File path as String
39+
* @param indexOutputWithBuffer {@link IndexOutputWithBuffer} which will be used to write to the underlying {@link org.apache.lucene.store.IndexOutput}
40+
* @throws IOException
3841
*/
39-
default void readFromRepository() {
40-
throw new NotImplementedException();
41-
}
42+
void readFromRepository(String path, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException;
4243
}

src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,25 @@
88
import org.apache.lucene.store.IndexOutput;
99

1010
import java.io.IOException;
11+
import java.io.InputStream;
1112

13+
/**
14+
* Wrapper around {@link IndexOutput} to perform writes in a buffered manner. This class is created per flush/merge, and may be used twice if
15+
* {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} needs to fall back to a different build strategy.
16+
*/
1217
public class IndexOutputWithBuffer {
1318
// Underlying `IndexOutput` obtained from Lucene's Directory.
1419
private IndexOutput indexOutput;
1520
// Write buffer. Native engine will copy bytes into this buffer.
1621
// Allocating 64KB here since it show better performance in NMSLIB with the size. (We had slightly improvement in FAISS than having 4KB)
1722
// NMSLIB writes an adjacent list size first, then followed by serializing the list. Since we usually have more adjacent lists, having
1823
// 64KB to accumulate bytes as possible to reduce the times of calling `writeBytes`.
19-
private byte[] buffer = new byte[64 * 1024];
24+
private static final int CHUNK_SIZE = 64 * 1024;
25+
private final byte[] buffer;
2026

2127
public IndexOutputWithBuffer(IndexOutput indexOutput) {
2228
this.indexOutput = indexOutput;
29+
this.buffer = new byte[CHUNK_SIZE];
2330
}
2431

2532
// This method will be called in JNI layer which precisely knows
@@ -33,6 +40,43 @@ public void writeBytes(int length) {
3340
}
3441
}
3542

43+
/**
44+
* Writes to the {@link IndexOutput} by buffering bytes into the existing buffer in this class.
45+
*
46+
* @param inputStream The stream from which we are reading bytes to write
47+
* @throws IOException
48+
* @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream, byte[])
49+
*/
50+
public void writeFromStreamWithBuffer(InputStream inputStream) throws IOException {
51+
writeFromStreamWithBuffer(inputStream, this.buffer);
52+
}
53+
54+
/**
55+
* Writes to the {@link IndexOutput} by buffering bytes with @param outputBuffer. This method allows
56+
* {@link org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy} to provide a separate, larger buffer as that buffer is for buffering
57+
* bytes downloaded from the repository, so it may be more performant to use a larger buffer.
58+
* We do not change the size of the existing buffer in case a fallback to the existing build strategy is needed.
59+
* TODO: Tune the size of the buffer used by RemoteIndexBuildStrategy based on benchmarking
60+
*
61+
* @param inputStream The stream from which we are reading bytes to write
62+
* @param outputBuffer The buffer used to buffer bytes
63+
* @throws IOException
64+
* @see IndexOutputWithBuffer#writeFromStreamWithBuffer(InputStream)
65+
*/
66+
private void writeFromStreamWithBuffer(InputStream inputStream, byte[] outputBuffer) throws IOException {
67+
int bytesRead = 0;
68+
// InputStream uses -1 indicates there are no more bytes to be read
69+
while (bytesRead != -1) {
70+
// Try to read CHUNK_SIZE into the buffer. The actual amount read may be less.
71+
bytesRead = inputStream.read(outputBuffer, 0, CHUNK_SIZE);
72+
assert bytesRead <= CHUNK_SIZE;
73+
// However many bytes we read, write it to the IndexOutput if != -1
74+
if (bytesRead != -1) {
75+
indexOutput.writeBytes(outputBuffer, 0, bytesRead);
76+
}
77+
}
78+
}
79+
3680
@Override
3781
public String toString() {
3882
return "{indexOutput=" + indexOutput + ", len(buffer)=" + buffer.length + "}";

src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/DefaultVectorRepositoryAccessorTests.java

+71
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
package org.opensearch.knn.index.codec.nativeindex.remote;
77

8+
import org.apache.lucene.store.Directory;
9+
import org.apache.lucene.store.IOContext;
10+
import org.apache.lucene.store.IndexInput;
11+
import org.apache.lucene.store.IndexOutput;
812
import org.mockito.Mockito;
913
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
1014
import org.opensearch.common.blobstore.BlobContainer;
@@ -13,12 +17,18 @@
1317
import org.opensearch.common.blobstore.fs.FsBlobStore;
1418
import org.opensearch.index.IndexSettings;
1519
import org.opensearch.knn.index.VectorDataType;
20+
import org.opensearch.knn.index.engine.KNNEngine;
21+
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
1622
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
1723
import org.opensearch.repositories.RepositoriesService;
1824
import org.opensearch.repositories.blobstore.BlobStoreRepository;
1925

26+
import java.io.ByteArrayInputStream;
2027
import java.io.IOException;
28+
import java.io.InputStream;
2129
import java.nio.file.Path;
30+
import java.util.List;
31+
import java.util.Random;
2232

2333
import static org.mockito.ArgumentMatchers.any;
2434
import static org.mockito.ArgumentMatchers.eq;
@@ -129,4 +139,65 @@ public void testAsyncUploadThrowsException() throws InterruptedException, IOExce
129139
verify(mockBlobStore).blobContainer(any());
130140
verify(mockRepository).basePath();
131141
}
142+
143+
/**
144+
* Verify the buffered read method in {@link DefaultVectorRepositoryAccessor#readFromRepository} produces the correct result
145+
*/
146+
public void testRepositoryRead() throws IOException {
147+
String TEST_FILE_NAME = randomAlphaOfLength(8) + KNNEngine.FAISS.getExtension();
148+
149+
// Create an InputStream with random values
150+
int TEST_ARRAY_SIZE = 64 * 1024 * 10;
151+
byte[] byteArray = new byte[TEST_ARRAY_SIZE];
152+
Random random = new Random();
153+
random.nextBytes(byteArray);
154+
InputStream randomStream = new ByteArrayInputStream(byteArray);
155+
156+
// Create a test segment that we will read/write from
157+
Directory directory;
158+
directory = newFSDirectory(createTempDir());
159+
String TEST_SEGMENT_NAME = "test-segment-name";
160+
IndexOutput testIndexOutput = directory.createOutput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
161+
IndexOutputWithBuffer testIndexOutputWithBuffer = new IndexOutputWithBuffer(testIndexOutput);
162+
163+
// Set up RemoteIndexBuildStrategy and write to IndexOutput
164+
RepositoriesService repositoriesService = mock(RepositoriesService.class);
165+
BlobStoreRepository mockRepository = mock(BlobStoreRepository.class);
166+
BlobPath testBasePath = new BlobPath().add("testBasePath");
167+
BlobStore mockBlobStore = mock(BlobStore.class);
168+
AsyncMultiStreamBlobContainer mockBlobContainer = mock(AsyncMultiStreamBlobContainer.class);
169+
170+
when(repositoriesService.repository(any())).thenReturn(mockRepository);
171+
when(mockRepository.basePath()).thenReturn(testBasePath);
172+
when(mockRepository.blobStore()).thenReturn(mockBlobStore);
173+
when(mockBlobStore.blobContainer(any())).thenReturn(mockBlobContainer);
174+
when(mockBlobContainer.readBlob(TEST_FILE_NAME)).thenReturn(randomStream);
175+
176+
VectorRepositoryAccessor objectUnderTest = new DefaultVectorRepositoryAccessor(mockRepository, mock(IndexSettings.class));
177+
178+
// Verify file extension check
179+
assertThrows(IllegalArgumentException.class, () -> objectUnderTest.readFromRepository("test_file.txt", testIndexOutputWithBuffer));
180+
181+
// Now test with valid file extensions
182+
String testPath = randomFrom(
183+
List.of(
184+
"testBasePath/testDirectory/" + TEST_FILE_NAME, // Test with subdirectory
185+
"testBasePath/" + TEST_FILE_NAME, // Test with only base path
186+
TEST_FILE_NAME // test with no base path
187+
)
188+
);
189+
// This should read from randomStream into testIndexOutput
190+
objectUnderTest.readFromRepository(testPath, testIndexOutputWithBuffer);
191+
testIndexOutput.close();
192+
193+
// Now try to read from the IndexOutput
194+
IndexInput testIndexInput = directory.openInput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
195+
byte[] resultByteArray = new byte[TEST_ARRAY_SIZE];
196+
testIndexInput.readBytes(resultByteArray, 0, TEST_ARRAY_SIZE);
197+
assertArrayEquals(byteArray, resultByteArray);
198+
199+
// Test Cleanup
200+
testIndexInput.close();
201+
directory.close();
202+
}
132203
}

0 commit comments

Comments
 (0)