Skip to content

Commit c50fdbf

Browse files
committed
Introduce RemoteIndexClient
Add RemoteIndexClient initial implementation, its accompanying dependencies, and Build Request, Retry Strategy, and test files Signed-off-by: owenhalpert <ohalpert@gmail.com>
1 parent edcbe31 commit c50fdbf

File tree

9 files changed

+571
-10
lines changed

9 files changed

+571
-10
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66

77
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
88
### Features
9+
* [Remote Vector Index Build] Introduce Client Skeleton + Build Request implementation [#2548](https://github.com/opensearch-project/k-NN/pull/2548/files)
910
### Enhancements
1011
### Bug Fixes
1112
### Infrastructure

build.gradle

+5-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,11 @@ dependencies {
321321
api "net.java.dev.jna:jna-platform:5.13.0"
322322
// OpenSearch core is using slf4j 1.7.36. Therefore, we cannot change the version here.
323323
implementation 'org.slf4j:slf4j-api:1.7.36'
324-
324+
api "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
325+
api "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
326+
api "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
327+
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
328+
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
325329
zipArchive group: 'org.opensearch.plugin', name:'opensearch-security', version: "${opensearch_build}"
326330
}
327331

src/main/java/org/opensearch/knn/index/KNNSettings.java

+77-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
import org.opensearch.cluster.metadata.IndexMetadata;
1616
import org.opensearch.cluster.service.ClusterService;
1717
import org.opensearch.common.Booleans;
18+
import org.opensearch.common.settings.SecureSetting;
1819
import org.opensearch.common.settings.Setting;
1920
import org.opensearch.common.settings.Settings;
2021
import org.opensearch.common.unit.TimeValue;
2122
import org.opensearch.core.action.ActionListener;
23+
import org.opensearch.core.common.settings.SecureString;
2224
import org.opensearch.core.common.unit.ByteSizeUnit;
2325
import org.opensearch.core.common.unit.ByteSizeValue;
2426
import org.opensearch.index.IndexModule;
@@ -96,6 +98,11 @@ public class KNNSettings {
9698
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
9799
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
98100
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
101+
public static final String KNN_REMOTE_BUILD_SERVICE_ENDPOINT = "knn.remote_build_service.endpoint";
102+
public static final String KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL = "knn.remote_build_service.poll_interval";
103+
public static final String KNN_REMOTE_BUILD_SERVICE_TIMEOUT = "knn.remote_build_service.timeout";
104+
public static final String KNN_REMOTE_BUILD_SERVICE_USERNAME = "knn.remote_build_service.username";
105+
public static final String KNN_REMOTE_BUILD_SERVICE_PASSWORD = "knn.remote_build_service.password";
99106

100107
/**
101108
* Default setting values
@@ -127,6 +134,9 @@ public class KNNSettings {
127134
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES = 60;
128135
public static final boolean KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_VALUE = false;
129136

137+
public static final Integer KNN_DEFAULT_REMOTE_BUILD_SERVICE_TIMEOUT_MINUTES = 60;
138+
public static final Integer KNN_DEFAULT_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SECONDS = 30;
139+
130140
/**
131141
* Settings Definition
132142
*/
@@ -388,6 +398,47 @@ public class KNNSettings {
388398
*/
389399
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);
390400

401+
/**
402+
* Remote build service endpoint to be used for remote index build. //TODO we can add String validators on these endpoint settings
403+
*/
404+
public static final Setting<String> KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING = Setting.simpleString(
405+
KNN_REMOTE_BUILD_SERVICE_ENDPOINT,
406+
NodeScope,
407+
Dynamic
408+
);
409+
410+
/**
411+
* Time the remote build service client will wait before falling back to CPU index build.
412+
*/
413+
public static final Setting<TimeValue> KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING = Setting.timeSetting(
414+
KNN_REMOTE_BUILD_SERVICE_TIMEOUT,
415+
TimeValue.timeValueMinutes(KNN_DEFAULT_REMOTE_BUILD_SERVICE_TIMEOUT_MINUTES),
416+
NodeScope,
417+
Dynamic
418+
);
419+
420+
/**
421+
* Setting to control how often the remote build service client polls the build service for the status of the job.
422+
*/
423+
public static final Setting<TimeValue> KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING = Setting.timeSetting(
424+
KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL,
425+
TimeValue.timeValueSeconds(KNN_DEFAULT_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SECONDS),
426+
NodeScope,
427+
Dynamic
428+
);
429+
430+
/**
431+
* Keystore settings for build service HTTP authorization
432+
*/
433+
public static final Setting<SecureString> KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING = SecureSetting.secureString(
434+
KNN_REMOTE_BUILD_SERVICE_USERNAME,
435+
null
436+
);
437+
public static final Setting<SecureString> KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING = SecureSetting.secureString(
438+
KNN_REMOTE_BUILD_SERVICE_PASSWORD,
439+
null
440+
);
441+
391442
/**
392443
* Dynamic settings
393444
*/
@@ -550,6 +601,26 @@ private Setting<?> getSetting(String key) {
550601
return KNN_REMOTE_VECTOR_REPO_SETTING;
551602
}
552603

604+
if (KNN_REMOTE_BUILD_SERVICE_ENDPOINT.equals(key)) {
605+
return KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING;
606+
}
607+
608+
if (KNN_REMOTE_BUILD_SERVICE_TIMEOUT.equals(key)) {
609+
return KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING;
610+
}
611+
612+
if (KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL.equals(key)) {
613+
return KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING;
614+
}
615+
616+
if (KNN_REMOTE_BUILD_SERVICE_USERNAME.equals(key)) {
617+
return KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING;
618+
}
619+
620+
if (KNN_REMOTE_BUILD_SERVICE_PASSWORD.equals(key)) {
621+
return KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING;
622+
}
623+
553624
throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
554625
}
555626

@@ -577,7 +648,12 @@ public List<Setting<?>> getSettings() {
577648
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
578649
KNN_DERIVED_SOURCE_ENABLED_SETTING,
579650
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
580-
KNN_REMOTE_VECTOR_REPO_SETTING
651+
KNN_REMOTE_VECTOR_REPO_SETTING,
652+
KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING,
653+
KNN_REMOTE_BUILD_SERVICE_TIMEOUT_SETTING,
654+
KNN_REMOTE_BUILD_SERVICE_POLL_INTERVAL_SETTING,
655+
KNN_REMOTE_BUILD_SERVICE_USERNAME_SETTING,
656+
KNN_REMOTE_BUILD_SERVICE_PASSWORD_SETTING
581657
);
582658
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
583659
.collect(Collectors.toList());

src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java

+56-6
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,27 @@
1111
import org.opensearch.common.StopWatch;
1212
import org.opensearch.common.annotation.ExperimentalApi;
1313
import org.opensearch.index.IndexSettings;
14+
import org.opensearch.knn.common.KNNConstants;
1415
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
1516
import org.opensearch.knn.index.KNNSettings;
1617
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
1718
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
19+
import org.opensearch.knn.index.remote.RemoteBuildRequest;
20+
import org.opensearch.knn.index.remote.RemoteIndexClient;
1821
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
1922
import org.opensearch.repositories.RepositoriesService;
2023
import org.opensearch.repositories.Repository;
2124
import org.opensearch.repositories.RepositoryMissingException;
2225
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2326

2427
import java.io.IOException;
28+
import java.util.HashMap;
29+
import java.util.Map;
2530
import java.util.function.Supplier;
2631

2732
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
2833
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING;
34+
import static org.opensearch.knn.index.KNNSettings.state;
2935

3036
/**
3137
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
@@ -54,7 +60,7 @@ public RemoteIndexBuildStrategy(Supplier<RepositoriesService> repositoriesServic
5460
* @return whether to use the remote build feature
5561
*/
5662
public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings) {
57-
String vectorRepo = KNNSettings.state().getSettingValue(KNN_REMOTE_VECTOR_REPO_SETTING.getKey());
63+
String vectorRepo = state().getSettingValue(KNN_REMOTE_VECTOR_REPO_SETTING.getKey());
5864
return KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
5965
&& indexSettings.getValue(KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING)
6066
&& vectorRepo != null
@@ -88,17 +94,18 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
8894
log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
8995

9096
stopWatch = new StopWatch().start();
91-
submitVectorBuild();
97+
RemoteBuildRequest buildRequest = constructBuildRequest(indexInfo);
98+
String jobId = RemoteIndexClient.getInstance().submitVectorBuild(buildRequest);
9299
time_in_millis = stopWatch.stop().totalTime().millis();
93100
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
94101

95102
stopWatch = new StopWatch().start();
96-
awaitVectorBuild();
103+
String indexPath = awaitVectorBuild(jobId);
97104
time_in_millis = stopWatch.stop().totalTime().millis();
98105
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
99106

100107
stopWatch = new StopWatch().start();
101-
readFromRepository();
108+
readFromRepository(indexPath);
102109
time_in_millis = stopWatch.stop().totalTime().millis();
103110
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
104111
} catch (Exception e) {
@@ -145,6 +152,49 @@ private void writeToRepository(
145152
throw new NotImplementedException();
146153
}
147154

155+
/**
156+
* Construct the RemoteBuildRequest object for the index build request
157+
* @return RemoteBuildRequest with parameters set
158+
*/
159+
public RemoteBuildRequest constructBuildRequest(BuildIndexParams indexInfo) throws IOException {
160+
String repositoryType = getRepository().getMetadata().type();
161+
String containerName = switch (repositoryType) {
162+
case "s3" -> getRepository().getMetadata().settings().get("bucket");
163+
case "fs" -> getRepository().getMetadata().settings().get("location");
164+
default -> throw new IllegalStateException("Unexpected value: " + repositoryType);
165+
};
166+
String vectorPath = null; // blobName + VECTOR_BLOB_FILE_EXTENSION
167+
String docIdPath = null; // blobName + DOC_ID_FILE_EXTENSION
168+
String tenantId = null; // indexSettings.getSettings().get(ClusterName.CLUSTER_NAME_SETTING.getKey());
169+
int dimension = 0; // TODO
170+
int docCount = indexInfo.getTotalLiveDocs();
171+
String dataType = indexInfo.getVectorDataType().getValue(); // TODO need to fetch encoder param to get fp16 vs fp32
172+
String engine = indexInfo.getKnnEngine().getName();
173+
174+
String spaceType = indexInfo.getParameters().get(KNNConstants.SPACE_TYPE).toString();
175+
176+
Map<String, Object> algorithmParams = new HashMap<>();
177+
algorithmParams.put("ef_construction", 100);
178+
algorithmParams.put("m", 16);
179+
180+
Map<String, Object> indexParameters = new HashMap<>();
181+
indexParameters.put("algorithm", "hnsw");
182+
indexParameters.put("algorithm_parameters", algorithmParams);
183+
184+
return RemoteBuildRequest.builder()
185+
.repositoryType(repositoryType)
186+
.containerName(containerName)
187+
.vectorPath(vectorPath)
188+
.docIdPath(docIdPath)
189+
.tenantId(tenantId)
190+
.dimension(dimension)
191+
.docCount(docCount)
192+
.dataType(dataType)
193+
.engine(engine)
194+
.indexParameters(indexParameters)
195+
.build();
196+
}
197+
148198
/**
149199
* Submit vector build request to remote vector build service
150200
*
@@ -156,14 +206,14 @@ private void submitVectorBuild() {
156206
/**
157207
* Wait on remote vector build to complete
158208
*/
159-
private void awaitVectorBuild() {
209+
private String awaitVectorBuild(String jobId) {
160210
throw new NotImplementedException();
161211
}
162212

163213
/**
164214
* Read constructed vector file from remote repository and write to IndexOutput
165215
*/
166-
private void readFromRepository() {
216+
private void readFromRepository(String indexPath) {
167217
throw new NotImplementedException();
168218
}
169219
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.remote;
7+
8+
import org.opensearch.common.xcontent.json.JsonXContent;
9+
import lombok.Builder;
10+
import lombok.Getter;
11+
import org.opensearch.core.xcontent.XContentBuilder;
12+
13+
import java.io.IOException;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
@Builder
18+
@Getter
19+
public class RemoteBuildRequest {
20+
private final String repositoryType;
21+
private final String containerName;
22+
private final String vectorPath;
23+
private final String docIdPath;
24+
private final String tenantId;
25+
private final int dimension;
26+
private final int docCount;
27+
private final String dataType;
28+
private final String engine;
29+
@Builder.Default
30+
private final Map<String, Object> indexParameters = new HashMap<>();
31+
32+
public String toJson() throws IOException {
33+
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
34+
builder.startObject();
35+
builder.field("repository_type", repositoryType);
36+
builder.field("container_name", containerName);
37+
builder.field("vector_path", vectorPath);
38+
builder.field("doc_id_path", docIdPath);
39+
builder.field("tenant_id", tenantId);
40+
builder.field("dimension", dimension);
41+
builder.field("doc_count", docCount);
42+
builder.field("data_type", dataType);
43+
builder.field("engine", engine);
44+
builder.field("index_parameters", indexParameters);
45+
builder.endObject();
46+
return builder.toString();
47+
}
48+
}
49+
50+
}

0 commit comments

Comments
 (0)