Skip to content

Commit a8063b5

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add vector data upload implementation to RemoteIndexBuildStrategy
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent c7ac05c commit a8063b5

File tree

12 files changed

+1124
-56
lines changed

12 files changed

+1124
-56
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66

77
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
88
### Features
9+
* [Remote Vector Index Build] Introduce Remote Native Index Build feature flag, settings, and initial skeleton [#2525](https://github.com/opensearch-project/k-NN/pull/2525)
10+
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
911
### Enhancements
1012
### Bug Fixes
1113
### Infrastructure

src/main/java/org/opensearch/knn/index/KNNSettings.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.opensearch.OpenSearchParseException;
1212
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1313
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
14-
import org.opensearch.transport.client.Client;
1514
import org.opensearch.cluster.metadata.IndexMetadata;
1615
import org.opensearch.cluster.service.ClusterService;
1716
import org.opensearch.common.Booleans;
@@ -28,6 +27,7 @@
2827
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
2928
import org.opensearch.monitor.jvm.JvmInfo;
3029
import org.opensearch.monitor.os.OsProbe;
30+
import org.opensearch.transport.client.Client;
3131

3232
import java.security.InvalidParameterException;
3333
import java.util.Arrays;
@@ -96,6 +96,7 @@ public class KNNSettings {
9696
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
9797
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
9898
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
99+
public static final String KNN_REMOTE_VECTOR_BUILD_THRESHOLD = "knn.remote_index_build.size_threshold";
99100

100101
/**
101102
* Default setting values
@@ -126,6 +127,8 @@ public class KNNSettings {
126127
// 10% of the JVM heap
127128
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES = 60;
128129
public static final boolean KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_VALUE = false;
130+
// TODO: Tune this default value based on benchmarking
131+
public static final ByteSizeValue KNN_REMOTE_VECTOR_BUILD_THRESHOLD_DEFAULT_VALUE = new ByteSizeValue(50, ByteSizeUnit.MB);
129132

130133
/**
131134
* Settings Definition
@@ -388,6 +391,15 @@ public class KNNSettings {
388391
*/
389392
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);
390393

394+
/**
395+
* Cluster level setting which indicates the size threshold above which remote vector builds will be enabled.
396+
*/
397+
public static final Setting<ByteSizeValue> KNN_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING = Setting.byteSizeSetting(
398+
KNN_REMOTE_VECTOR_BUILD_THRESHOLD,
399+
KNN_REMOTE_VECTOR_BUILD_THRESHOLD_DEFAULT_VALUE,
400+
Dynamic,
401+
NodeScope
402+
);
391403
/**
392404
* Dynamic settings
393405
*/
@@ -550,6 +562,10 @@ private Setting<?> getSetting(String key) {
550562
return KNN_REMOTE_VECTOR_REPO_SETTING;
551563
}
552564

565+
if (KNN_REMOTE_VECTOR_BUILD_THRESHOLD.equals(key)) {
566+
return KNN_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING;
567+
}
568+
553569
throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
554570
}
555571

@@ -577,7 +593,8 @@ public List<Setting<?>> getSettings() {
577593
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
578594
KNN_DERIVED_SOURCE_ENABLED_SETTING,
579595
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
580-
KNN_REMOTE_VECTOR_REPO_SETTING
596+
KNN_REMOTE_VECTOR_REPO_SETTING,
597+
KNN_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING
581598
);
582599
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
583600
.collect(Collectors.toList());
@@ -657,6 +674,10 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
657674
.getAsBoolean(KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED, false);
658675
}
659676

677+
public static ByteSizeValue getKnnRemoteVectorBuildThreshold() {
678+
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_VECTOR_BUILD_THRESHOLD);
679+
}
680+
660681
public void initialize(Client client, ClusterService clusterService) {
661682
this.client = client;
662683
this.clusterService = clusterService;

src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategyFactory.java

+22-7
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@
77

88
import org.apache.lucene.index.FieldInfo;
99
import org.opensearch.index.IndexSettings;
10+
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
1011
import org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy;
1112
import org.opensearch.knn.index.engine.KNNEngine;
13+
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
1214
import org.opensearch.repositories.RepositoriesService;
1315

16+
import java.io.IOException;
1417
import java.util.function.Supplier;
1518

1619
import static org.opensearch.knn.common.FieldInfoExtractor.extractKNNEngine;
1720
import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
21+
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
1822

1923
/**
2024
* Creates the {@link NativeIndexBuildStrategy}
@@ -34,11 +38,18 @@ public NativeIndexBuildStrategyFactory(Supplier<RepositoriesService> repositorie
3438
}
3539

3640
/**
37-
* Creates or returns the desired {@link NativeIndexBuildStrategy} implementation. Intended to be used by {@link NativeIndexWriter}
38-
* @param fieldInfo
39-
* @return
41+
* @param fieldInfo Field related attributes/info
42+
* @param totalLiveDocs Number of documents with the vector field. This values comes from {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#flush}
43+
* and {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#mergeOneField}
44+
* @param knnVectorValues An instance of {@link KNNVectorValues} which is used to evaluate the size threshold KNN_REMOTE_VECTOR_BUILD_THRESHOLD
45+
* @return The {@link NativeIndexBuildStrategy} to be used. Intended to be used by {@link NativeIndexWriter}
46+
* @throws IOException
4047
*/
41-
public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
48+
public NativeIndexBuildStrategy getBuildStrategy(
49+
final FieldInfo fieldInfo,
50+
final int totalLiveDocs,
51+
final KNNVectorValues<?> knnVectorValues
52+
) throws IOException {
4253
final KNNEngine knnEngine = extractKNNEngine(fieldInfo);
4354
boolean isTemplate = fieldInfo.attributes().containsKey(MODEL_ID);
4455
boolean iterative = !isTemplate && KNNEngine.FAISS == knnEngine;
@@ -47,11 +58,15 @@ public NativeIndexBuildStrategy getBuildStrategy(final FieldInfo fieldInfo) {
4758
? MemOptimizedNativeIndexBuildStrategy.getInstance()
4859
: DefaultIndexBuildStrategy.getInstance();
4960

50-
if (repositoriesServiceSupplier != null
61+
initializeVectorValues(knnVectorValues);
62+
long vectorBlobLength = ((long) knnVectorValues.bytesPerVector()) * totalLiveDocs;
63+
64+
if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
65+
&& repositoriesServiceSupplier != null
5166
&& indexSettings != null
5267
&& knnEngine.supportsRemoteIndexBuild()
53-
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings)) {
54-
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy);
68+
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings, vectorBlobLength)) {
69+
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy, indexSettings);
5570
} else {
5671
return strategy;
5772
}

src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class NativeIndexWriter {
5858

5959
private final SegmentWriteState state;
6060
private final FieldInfo fieldInfo;
61-
private final NativeIndexBuildStrategy indexBuilder;
61+
private final NativeIndexBuildStrategyFactory indexBuilderFactory;
6262
@Nullable
6363
private final QuantizationState quantizationState;
6464

@@ -148,6 +148,11 @@ private void buildAndWriteIndex(final Supplier<KNNVectorValues<?>> knnVectorValu
148148
knnVectorValuesSupplier,
149149
totalLiveDocs
150150
);
151+
NativeIndexBuildStrategy indexBuilder = indexBuilderFactory.getBuildStrategy(
152+
fieldInfo,
153+
totalLiveDocs,
154+
knnVectorValuesSupplier.get()
155+
);
151156
indexBuilder.buildAndWriteIndex(nativeIndexParams);
152157
CodecUtil.writeFooter(output);
153158
}
@@ -316,6 +321,6 @@ private static NativeIndexWriter createWriter(
316321
@Nullable final QuantizationState quantizationState,
317322
NativeIndexBuildStrategyFactory nativeIndexBuildStrategyFactory
318323
) {
319-
return new NativeIndexWriter(state, fieldInfo, nativeIndexBuildStrategyFactory.getBuildStrategy(fieldInfo), quantizationState);
324+
return new NativeIndexWriter(state, fieldInfo, nativeIndexBuildStrategyFactory, quantizationState);
320325
}
321326
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.codec.nativeindex.remote;
7+
8+
import lombok.extern.log4j.Log4j2;
9+
import org.apache.lucene.search.DocIdSetIterator;
10+
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
11+
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.nio.ByteBuffer;
15+
import java.nio.ByteOrder;
16+
17+
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
18+
19+
/**
20+
* {@link InputStream} implementation of doc ids backed by {@link KNNVectorValues} rather than any file. Intended for use by {@link RemoteIndexBuildStrategy}
21+
*/
22+
@Log4j2
23+
class DocIdInputStream extends InputStream {
24+
private final KNNVectorValues<?> knnVectorValues;
25+
// Doc ids are 4 byte integers, byte read() only returns a single byte, so we will need to track the byte position within a doc id.
26+
// For simplicity, and to maintain the byte ordering, we use a buffer with size of 1 int.
27+
private ByteBuffer currentBuffer;
28+
29+
/**
30+
* Use to represent the doc ids of a {@link KNNVectorValues} as an {@link InputStream}. Expected to be used only with {@link org.opensearch.common.blobstore.BlobContainer#writeBlob}.
31+
* @param knnVectorValues
32+
* @throws IOException
33+
* @see VectorValuesInputStream
34+
*/
35+
public DocIdInputStream(KNNVectorValues<?> knnVectorValues) throws IOException {
36+
this.currentBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.LITTLE_ENDIAN);
37+
this.knnVectorValues = knnVectorValues;
38+
initializeVectorValues(this.knnVectorValues);
39+
reloadBuffer();
40+
}
41+
42+
@Override
43+
public int read() throws IOException {
44+
if (currentBuffer == null) {
45+
return -1;
46+
}
47+
48+
if (!currentBuffer.hasRemaining()) {
49+
advanceAndReloadBuffer();
50+
if (currentBuffer == null) {
51+
return -1;
52+
}
53+
}
54+
55+
// Unsigned byte conversion is not technically needed as we are using a ByteBuffer, however we perform this operation still just in
56+
// case.
57+
return currentBuffer.get() & 0xFF;
58+
}
59+
60+
@Override
61+
public int read(byte[] b, int off, int len) throws IOException {
62+
if (currentBuffer == null) {
63+
return -1;
64+
}
65+
66+
int available = currentBuffer.remaining();
67+
if (available <= 0) {
68+
advanceAndReloadBuffer();
69+
if (currentBuffer == null) {
70+
return -1;
71+
}
72+
available = currentBuffer.remaining();
73+
}
74+
75+
int bytesToRead = Math.min(available, len);
76+
currentBuffer.get(b, off, bytesToRead);
77+
return bytesToRead;
78+
}
79+
80+
/**
81+
* Advances to the next doc, and then refills the buffer with the new doc.
82+
* @throws IOException
83+
*/
84+
private void advanceAndReloadBuffer() throws IOException {
85+
int docId = knnVectorValues.nextDoc();
86+
if (docId != -1 && docId != DocIdSetIterator.NO_MORE_DOCS) {
87+
reloadBuffer();
88+
} else {
89+
// Reset buffer to null to indicate that there are no more docs to be read
90+
currentBuffer = null;
91+
}
92+
}
93+
94+
/**
95+
* Reload {@link currentBuffer} with the current doc id that {@link knnVectorValues} is pointing to
96+
* @throws IOException
97+
*/
98+
private void reloadBuffer() throws IOException {
99+
currentBuffer.clear();
100+
currentBuffer.putInt(knnVectorValues.docId());
101+
currentBuffer.position(0);
102+
}
103+
}

0 commit comments

Comments
 (0)