Skip to content

Commit a154f4d

Browse files
author
Jay Deng
committed
Add basic upload, download, indexOuput write functionality
1 parent 4f8648c commit a154f4d

File tree

8 files changed

+412
-18
lines changed

8 files changed

+412
-18
lines changed

build.gradle

+25-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ buildscript {
4545
opensearch_build += "-SNAPSHOT"
4646
}
4747
opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","")
48+
repo_s3_resource_folder = "build/resource/repository-s3"
49+
repo_s3_download_url = "https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/3.0.0/latest/linux/arm64/tar/builds/opensearch/core-plugins/repository-s3-3.0.0.zip"
4850
}
4951

5052
// This isn't applying from repositories.gradle so repeating git diff it here
@@ -93,7 +95,7 @@ ext {
9395
projectSubstitutions = [:]
9496

9597
configureSecurityPlugin = { OpenSearchCluster cluster ->
96-
configurations.zipArchive.asFileTree.each {
98+
configurations.secureIntegTestPluginArchive.asFileTree.each {
9799
cluster.plugin(provider(new Callable<RegularFile>() {
98100
@Override
99101
RegularFile call() throws Exception {
@@ -214,6 +216,7 @@ allprojects {
214216

215217
configurations {
216218
zipArchive
219+
secureIntegTestPluginArchive
217220
}
218221

219222
publishing {
@@ -303,6 +306,7 @@ task release(type: Copy, group: 'build') {
303306
dependencies {
304307
api "org.opensearch:opensearch:${opensearch_version}"
305308
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}"
309+
compileOnly "org.opensearch.plugin:repository-s3:${versions.opensearch}"
306310
api group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
307311
api group: 'com.google.guava', name: 'guava', version:'32.1.3-jre'
308312
api group: 'commons-lang', name: 'commons-lang', version: '2.6'
@@ -511,6 +515,26 @@ testClusters.integTest {
511515
configureSecurityPlugin(testClusters.integTest)
512516
}
513517

518+
// Install K-NN/ml-commons plugins on the integTest cluster nodes except security
519+
plugin(provider(new Callable<RegularFile>(){
520+
@Override
521+
RegularFile call() throws Exception {
522+
return new RegularFile() {
523+
@Override
524+
File getAsFile() {
525+
if (new File("$project.rootDir/$repo_s3_resource_folder").exists()) {
526+
project.delete(files("$project.rootDir/$repo_s3_resource_folder"))
527+
}
528+
project.mkdir repo_s3_resource_folder
529+
ant.get(src: repo_s3_download_url,
530+
dest: repo_s3_resource_folder,
531+
httpusecaches: false)
532+
return fileTree(repo_s3_resource_folder).getSingleFile()
533+
}
534+
}
535+
}
536+
}))
537+
514538
plugin(project.tasks.bundlePlugin.archiveFile)
515539
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
516540
// Add the paths of built JNI libraries and its dependent libraries to PATH variable in System variables

src/main/java/org/opensearch/knn/common/featureflags/KNNFeatureFlags.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class KNNFeatureFlags {
3838

3939
public static final Setting<Boolean> KNN_REMOTE_VECTOR_BUILD_SETTING = Setting.boolSetting(
4040
KNN_REMOTE_VECTOR_BUILD,
41-
false,
41+
true,
4242
NodeScope,
4343
Dynamic
4444
);

src/main/java/org/opensearch/knn/index/KNNSettings.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,14 @@ public class KNNSettings {
375375

376376
public static final Setting<Boolean> KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING = Setting.boolSetting(
377377
KNN_INDEX_REMOTE_VECTOR_BUILD,
378-
false,
378+
true,
379379
Dynamic,
380380
IndexScope
381381
);
382382

383-
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);
383+
public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(
384+
KNN_REMOTE_VECTOR_REPO, "vector-repo", Dynamic, NodeScope
385+
);
384386

385387
/**
386388
* Dynamic settings

src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java

-4
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import org.opensearch.knn.index.remote.RemoteIndexBuilder;
1313
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
1414
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
15-
import org.slf4j.Logger;
16-
import org.slf4j.LoggerFactory;
1715

1816
import java.io.IOException;
1917
import java.util.function.Supplier;
@@ -25,7 +23,6 @@
2523
* Interface for writing a KNN index field in a segment. This is intended to be used for native engines.
2624
*/
2725
public interface NativeIndexWriter {
28-
Logger log = LoggerFactory.getLogger(NativeIndexWriter.class);
2926

3027
/**
3128
* flushes the index
@@ -63,7 +60,6 @@ static NativeIndexWriter getWriter(
6360
// TODO: We will add threshold settings for using this featuer here as well, see:
6461
// https://github.com/opensearch-project/k-NN/issues/2391
6562
if (remoteIndexBuilder != null && remoteIndexBuilder.shouldBuildIndexRemotely()) {
66-
log.debug("Using RemoteNativeIndexWriter");
6763
return new RemoteNativeIndexWriter(
6864
fieldInfo,
6965
segmentWriteState,

src/main/java/org/opensearch/knn/index/codec/nativeindex/RemoteNativeIndexWriter.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.knn.index.codec.nativeindex;
77

8+
import lombok.extern.log4j.Log4j2;
89
import org.apache.lucene.index.FieldInfo;
910
import org.apache.lucene.index.SegmentWriteState;
1011
import org.opensearch.common.annotation.ExperimentalApi;
@@ -18,6 +19,7 @@
1819
* Writes KNN Index for a field in a segment. This is intended to be used for native engines. This class uses a remote index build service for building segments.
1920
* See {@link LocalNativeIndexWriter} for local vector index build path.
2021
*/
22+
@Log4j2
2123
@ExperimentalApi
2224
public class RemoteNativeIndexWriter implements NativeIndexWriter {
2325

@@ -46,7 +48,7 @@ public void flushIndex(KNNVectorValues<?> knnVectorValues, int totalLiveDocs) th
4648
try {
4749
remoteIndexBuilder.buildIndexRemotely(fieldInfo, knnVectorValuesSupplier, totalLiveDocs, segmentWriteState);
4850
} catch (Exception e) {
49-
log.warn("Failed to flush index remotely", e);
51+
log.info("Failed to flush index remotely", e);
5052
fallbackWriter.flushIndex(knnVectorValues, totalLiveDocs);
5153
}
5254
}
@@ -56,7 +58,7 @@ public void mergeIndex(KNNVectorValues<?> knnVectorValues, int totalLiveDocs) th
5658
try {
5759
remoteIndexBuilder.buildIndexRemotely(fieldInfo, knnVectorValuesSupplier, totalLiveDocs, segmentWriteState);
5860
} catch (Exception e) {
59-
log.warn("Failed to merge index remotely", e);
61+
log.info("Failed to merge index remotely", e);
6062
fallbackWriter.mergeIndex(knnVectorValues, totalLiveDocs);
6163
}
6264
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.remote;
7+
8+
import lombok.extern.log4j.Log4j2;
9+
import org.apache.lucene.search.DocIdSetIterator;
10+
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
11+
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.nio.ByteBuffer;
15+
import java.nio.ByteOrder;
16+
17+
// TODO: This class needs a revamp to make more efficient
18+
@Log4j2
19+
public class DocIdInputStream extends InputStream {
20+
private final KNNVectorValues<?> knnVectorValues;
21+
private ByteBuffer currentBuffer;
22+
private int position = 0;
23+
public DocIdInputStream(KNNVectorValues<?> knnVectorValues) {
24+
this.knnVectorValues = knnVectorValues;
25+
try {
26+
loadNextVector();
27+
} catch (IOException e) {
28+
log.error("Failed to load initial vector", e);
29+
}
30+
}
31+
@Override
32+
public int read() throws IOException {
33+
if (currentBuffer == null || position >= currentBuffer.capacity()) {
34+
loadNextVector();
35+
if (currentBuffer == null) {
36+
return -1;
37+
}
38+
}
39+
return currentBuffer.get(position++) & 0xFF;
40+
}
41+
@Override
42+
public int read(byte[] b, int off, int len) throws IOException {
43+
if (currentBuffer == null) {
44+
return -1;
45+
}
46+
int available = currentBuffer.capacity() - position;
47+
if (available <= 0) {
48+
loadNextVector();
49+
if (currentBuffer == null) {
50+
return -1;
51+
}
52+
available = currentBuffer.capacity() - position;
53+
}
54+
int bytesToRead = Math.min(available, len);
55+
currentBuffer.position(position);
56+
currentBuffer.get(b, off, bytesToRead);
57+
position += bytesToRead;
58+
return bytesToRead;
59+
}
60+
private void loadNextVector() throws IOException {
61+
int docId = knnVectorValues.nextDoc();
62+
if (docId != -1 && docId != DocIdSetIterator.NO_MORE_DOCS) {
63+
float[] vector = new float[1];
64+
vector[0] = (float) docId;
65+
currentBuffer = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
66+
currentBuffer.asFloatBuffer().put(vector);
67+
position = 0;
68+
} else {
69+
currentBuffer = null;
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)