Skip to content

Commit 3674aa2

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add download + indexOuput#write implementation to RemoteIndexBuildStrategy
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent c7ac05c commit 3674aa2

File tree

5 files changed

+136
-8
lines changed

5 files changed

+136
-8
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66

77
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
88
### Features
9+
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
910
### Enhancements
1011
### Bug Fixes
1112
### Infrastructure

src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategyFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
5151
&& indexSettings != null
5252
&& knnEngine.supportsRemoteIndexBuild()
5353
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings)) {
54-
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy);
54+
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy, indexSettings);
5555
} else {
5656
return strategy;
5757
}

src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java

+49-4
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,30 @@
55

66
package org.opensearch.knn.index.codec.nativeindex.remote;
77

8+
import com.google.common.annotations.VisibleForTesting;
89
import lombok.extern.log4j.Log4j2;
910
import org.apache.commons.lang.NotImplementedException;
1011
import org.apache.lucene.index.SegmentWriteState;
12+
import org.apache.lucene.store.IndexOutput;
1113
import org.opensearch.common.StopWatch;
14+
import org.opensearch.common.UUIDs;
1215
import org.opensearch.common.annotation.ExperimentalApi;
16+
import org.opensearch.common.blobstore.BlobContainer;
17+
import org.opensearch.common.blobstore.BlobPath;
1318
import org.opensearch.index.IndexSettings;
1419
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
1520
import org.opensearch.knn.index.KNNSettings;
1621
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
1722
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
23+
import org.opensearch.knn.index.engine.KNNEngine;
1824
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
1925
import org.opensearch.repositories.RepositoriesService;
2026
import org.opensearch.repositories.Repository;
2127
import org.opensearch.repositories.RepositoryMissingException;
2228
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2329

2430
import java.io.IOException;
31+
import java.io.InputStream;
2532
import java.util.function.Supplier;
2633

2734
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
@@ -37,17 +44,25 @@ public class RemoteIndexBuildStrategy implements NativeIndexBuildStrategy {
3744

3845
private final Supplier<RepositoriesService> repositoriesServiceSupplier;
3946
private final NativeIndexBuildStrategy fallbackStrategy;
47+
private final IndexSettings indexSettings;
48+
4049
private static final String VECTOR_BLOB_FILE_EXTENSION = ".knnvec";
4150
private static final String DOC_ID_FILE_EXTENSION = ".knndid";
51+
private static final String VECTORS_PATH = "_vectors";
4252

4353
/**
4454
* Public constructor
4555
*
4656
* @param repositoriesServiceSupplier A supplier for {@link RepositoriesService} used for interacting with repository
4757
*/
48-
public RemoteIndexBuildStrategy(Supplier<RepositoriesService> repositoriesServiceSupplier, NativeIndexBuildStrategy fallbackStrategy) {
58+
public RemoteIndexBuildStrategy(
59+
Supplier<RepositoriesService> repositoriesServiceSupplier,
60+
NativeIndexBuildStrategy fallbackStrategy,
61+
IndexSettings indexSettings
62+
) {
4963
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
5064
this.fallbackStrategy = fallbackStrategy;
65+
this.indexSettings = indexSettings;
5166
}
5267

5368
/**
@@ -98,7 +113,9 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
98113
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
99114

100115
stopWatch = new StopWatch().start();
101-
readFromRepository();
116+
// TODO: This blob will be retrieved from the remote vector build service status response
117+
String blobName = UUIDs.base64UUID() + "_" + indexInfo.getFieldName() + "_" + indexInfo.getSegmentWriteState().segmentInfo.name;
118+
readFromRepository(blobName + KNNEngine.FAISS.getExtension(), indexInfo.getIndexOutputWithBuffer().getIndexOutput());
102119
time_in_millis = stopWatch.stop().totalTime().millis();
103120
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
104121
} catch (Exception e) {
@@ -126,6 +143,14 @@ private BlobStoreRepository getRepository() throws RepositoryMissingException {
126143
return (BlobStoreRepository) repository;
127144
}
128145

146+
/**
147+
* @return The blob container to read/write from, determined from the repository base path and index settings. This container is where all blobs will be written to.
148+
*/
149+
private BlobContainer getBlobContainer() {
150+
BlobPath path = getRepository().basePath().add(indexSettings.getUUID() + VECTORS_PATH);
151+
return getRepository().blobStore().blobContainer(path);
152+
}
153+
129154
/**
130155
* Write relevant vector data to repository
131156
*
@@ -163,7 +188,27 @@ private void awaitVectorBuild() {
163188
/**
164189
* Read constructed vector file from remote repository and write to IndexOutput
165190
*/
166-
private void readFromRepository() {
167-
throw new NotImplementedException();
191+
@VisibleForTesting
192+
void readFromRepository(String blobName, IndexOutput indexOutput) throws IOException {
193+
BlobContainer blobContainer = getBlobContainer();
194+
// TODO: We are using the sequential download API as multi-part parallel download is difficult for us to implement today and
195+
// requires some changes in core. For more details, see: https://github.com/opensearch-project/k-NN/issues/2464
196+
InputStream graphStream = blobContainer.readBlob(blobName);
197+
198+
// Allocate buffer of 64KB, same as used for CPU builds, see: IndexOutputWithBuffer
199+
int CHUNK_SIZE = 64 * 1024;
200+
byte[] buffer = new byte[CHUNK_SIZE];
201+
202+
int bytesRead = 0;
203+
// InputStream uses -1 indicates there are no more bytes to be read
204+
while (bytesRead != -1) {
205+
// Try to read CHUNK_SIZE into the buffer. The actual amount read may be less.
206+
bytesRead = graphStream.read(buffer, 0, CHUNK_SIZE);
207+
assert bytesRead <= CHUNK_SIZE;
208+
// However many bytes we read, write it to the IndexOutput if != -1
209+
if (bytesRead != -1) {
210+
indexOutput.writeBytes(buffer, 0, bytesRead);
211+
}
212+
}
168213
}
169214
}

src/main/java/org/opensearch/knn/index/store/IndexOutputWithBuffer.java

+2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
package org.opensearch.knn.index.store;
77

8+
import lombok.Getter;
89
import org.apache.lucene.store.IndexOutput;
910

1011
import java.io.IOException;
1112

1213
public class IndexOutputWithBuffer {
1314
// Underlying `IndexOutput` obtained from Lucene's Directory.
15+
@Getter
1416
private IndexOutput indexOutput;
1517
// Write buffer. Native engine will copy bytes into this buffer.
1618
// Allocating 64KB here since it show better performance in NMSLIB with the size. (We had slightly improvement in FAISS than having 4KB)

src/test/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategyTests.java

+83-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,19 @@
55

66
package org.opensearch.knn.index.codec.nativeindex.remote;
77

8+
import org.apache.lucene.store.Directory;
9+
import org.apache.lucene.store.IOContext;
10+
import org.apache.lucene.store.IndexInput;
11+
import org.apache.lucene.store.IndexOutput;
12+
import org.junit.Before;
813
import org.mockito.Mockito;
14+
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
15+
import org.opensearch.common.blobstore.BlobPath;
16+
import org.opensearch.common.blobstore.BlobStore;
17+
import org.opensearch.common.settings.ClusterSettings;
18+
import org.opensearch.index.IndexSettings;
19+
import org.opensearch.knn.KNNTestCase;
20+
import org.opensearch.knn.index.KNNSettings;
921
import org.opensearch.knn.index.VectorDataType;
1022
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
1123
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
@@ -16,17 +28,21 @@
1628
import org.opensearch.knn.index.vectorvalues.TestVectorValues;
1729
import org.opensearch.repositories.RepositoriesService;
1830
import org.opensearch.repositories.RepositoryMissingException;
19-
import org.opensearch.test.OpenSearchTestCase;
31+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2032

33+
import java.io.ByteArrayInputStream;
2134
import java.io.IOException;
35+
import java.io.InputStream;
2236
import java.util.List;
2337
import java.util.Map;
38+
import java.util.Random;
2439

2540
import static org.mockito.ArgumentMatchers.any;
2641
import static org.mockito.Mockito.mock;
2742
import static org.mockito.Mockito.when;
43+
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING;
2844

29-
public class RemoteIndexBuildStrategyTests extends OpenSearchTestCase {
45+
public class RemoteIndexBuildStrategyTests extends KNNTestCase {
3046

3147
static int fallbackCounter = 0;
3248

@@ -38,6 +54,16 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
3854
}
3955
}
4056

57+
@Before
58+
@Override
59+
public void setUp() throws Exception {
60+
super.setUp();
61+
ClusterSettings clusterSettings = mock(ClusterSettings.class);
62+
when(clusterSettings.get(KNN_REMOTE_VECTOR_REPO_SETTING)).thenReturn("test-repo-name");
63+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
64+
KNNSettings.state().setClusterService(clusterService);
65+
}
66+
4167
public void testFallback() throws IOException {
4268
List<float[]> vectorValues = List.of(new float[] { 1, 2 }, new float[] { 2, 3 }, new float[] { 3, 4 });
4369
final TestVectorValues.PreDefinedFloatVectorValues randomVectorValues = new TestVectorValues.PreDefinedFloatVectorValues(
@@ -48,7 +74,11 @@ public void testFallback() throws IOException {
4874
RepositoriesService repositoriesService = mock(RepositoriesService.class);
4975
when(repositoriesService.repository(any())).thenThrow(new RepositoryMissingException("Fallback"));
5076

51-
RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy(() -> repositoriesService, new TestIndexBuildStrategy());
77+
RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy(
78+
() -> repositoriesService,
79+
new TestIndexBuildStrategy(),
80+
mock(IndexSettings.class)
81+
);
5282

5383
IndexOutputWithBuffer indexOutputWithBuffer = Mockito.mock(IndexOutputWithBuffer.class);
5484

@@ -64,4 +94,54 @@ public void testFallback() throws IOException {
6494
objectUnderTest.buildAndWriteIndex(buildIndexParams);
6595
assertEquals(1, fallbackCounter);
6696
}
97+
98+
/**
99+
* Verify the buffered read method in {@link RemoteIndexBuildStrategy#readFromRepository} produces the correct result
100+
*/
101+
public void testRepositoryRead() throws IOException {
102+
// Create an InputStream with random values
103+
int TEST_ARRAY_SIZE = 64 * 1024 * 10;
104+
byte[] byteArray = new byte[TEST_ARRAY_SIZE];
105+
Random random = new Random();
106+
random.nextBytes(byteArray);
107+
InputStream randomStream = new ByteArrayInputStream(byteArray);
108+
109+
// Create a test segment that we will read/write from
110+
Directory directory;
111+
directory = newFSDirectory(createTempDir());
112+
String TEST_SEGMENT_NAME = "test-segment-name";
113+
IndexOutput testIndexOutput = directory.createOutput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
114+
115+
// Set up RemoteIndexBuildStrategy and write to IndexOutput
116+
RepositoriesService repositoriesService = mock(RepositoriesService.class);
117+
BlobStoreRepository mockRepository = mock(BlobStoreRepository.class);
118+
BlobPath testBasePath = new BlobPath().add("testBasePath");
119+
BlobStore mockBlobStore = mock(BlobStore.class);
120+
AsyncMultiStreamBlobContainer mockBlobContainer = mock(AsyncMultiStreamBlobContainer.class);
121+
122+
when(repositoriesService.repository(any())).thenReturn(mockRepository);
123+
when(mockRepository.basePath()).thenReturn(testBasePath);
124+
when(mockRepository.blobStore()).thenReturn(mockBlobStore);
125+
when(mockBlobStore.blobContainer(any())).thenReturn(mockBlobContainer);
126+
when(mockBlobContainer.readBlob("test-blob")).thenReturn(randomStream);
127+
128+
RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy(
129+
() -> repositoriesService,
130+
mock(NativeIndexBuildStrategy.class),
131+
mock(IndexSettings.class)
132+
);
133+
// This should read from randomStream into testIndexOutput
134+
objectUnderTest.readFromRepository("test-blob", testIndexOutput);
135+
testIndexOutput.close();
136+
137+
// Now try to read from the IndexOutput
138+
IndexInput testIndexInput = directory.openInput(TEST_SEGMENT_NAME, IOContext.DEFAULT);
139+
byte[] resultByteArray = new byte[TEST_ARRAY_SIZE];
140+
testIndexInput.readBytes(resultByteArray, 0, TEST_ARRAY_SIZE);
141+
assertArrayEquals(byteArray, resultByteArray);
142+
143+
// Test Cleanup
144+
testIndexInput.close();
145+
directory.close();
146+
}
67147
}

0 commit comments

Comments
 (0)