5
5
6
6
package org .opensearch .knn .index .codec .nativeindex .remote ;
7
7
8
+ import org .apache .lucene .store .Directory ;
9
+ import org .apache .lucene .store .IOContext ;
10
+ import org .apache .lucene .store .IndexInput ;
11
+ import org .apache .lucene .store .IndexOutput ;
12
+ import org .junit .Before ;
8
13
import org .mockito .Mockito ;
14
+ import org .opensearch .common .blobstore .AsyncMultiStreamBlobContainer ;
15
+ import org .opensearch .common .blobstore .BlobPath ;
16
+ import org .opensearch .common .blobstore .BlobStore ;
17
+ import org .opensearch .common .settings .ClusterSettings ;
18
+ import org .opensearch .knn .KNNTestCase ;
19
+ import org .opensearch .knn .index .KNNSettings ;
9
20
import org .opensearch .knn .index .VectorDataType ;
10
21
import org .opensearch .knn .index .codec .nativeindex .NativeIndexBuildStrategy ;
11
22
import org .opensearch .knn .index .codec .nativeindex .model .BuildIndexParams ;
16
27
import org .opensearch .knn .index .vectorvalues .TestVectorValues ;
17
28
import org .opensearch .repositories .RepositoriesService ;
18
29
import org .opensearch .repositories .RepositoryMissingException ;
19
- import org .opensearch .test . OpenSearchTestCase ;
30
+ import org .opensearch .repositories . blobstore . BlobStoreRepository ;
20
31
32
+ import java .io .ByteArrayInputStream ;
21
33
import java .io .IOException ;
34
+ import java .io .InputStream ;
22
35
import java .util .List ;
23
36
import java .util .Map ;
37
+ import java .util .Random ;
24
38
25
39
import static org .mockito .ArgumentMatchers .any ;
26
40
import static org .mockito .Mockito .mock ;
27
41
import static org .mockito .Mockito .when ;
42
+ import static org .opensearch .knn .index .KNNSettings .KNN_REMOTE_VECTOR_REPO_SETTING ;
28
43
29
- public class RemoteIndexBuildStrategyTests extends OpenSearchTestCase {
44
+ public class RemoteIndexBuildStrategyTests extends KNNTestCase {
30
45
31
46
static int fallbackCounter = 0 ;
32
47
@@ -38,6 +53,16 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
38
53
}
39
54
}
40
55
56
+ @ Before
57
+ @ Override
58
+ public void setUp () throws Exception {
59
+ super .setUp ();
60
+ ClusterSettings clusterSettings = mock (ClusterSettings .class );
61
+ when (clusterSettings .get (KNN_REMOTE_VECTOR_REPO_SETTING )).thenReturn ("test-repo-name" );
62
+ when (clusterService .getClusterSettings ()).thenReturn (clusterSettings );
63
+ KNNSettings .state ().setClusterService (clusterService );
64
+ }
65
+
41
66
public void testFallback () throws IOException {
42
67
List <float []> vectorValues = List .of (new float [] { 1 , 2 }, new float [] { 2 , 3 }, new float [] { 3 , 4 });
43
68
final TestVectorValues .PreDefinedFloatVectorValues randomVectorValues = new TestVectorValues .PreDefinedFloatVectorValues (
@@ -64,4 +89,55 @@ public void testFallback() throws IOException {
64
89
objectUnderTest .buildAndWriteIndex (buildIndexParams );
65
90
assertEquals (1 , fallbackCounter );
66
91
}
92
+
93
+ /**
94
+ * Verify the buffered read method in {@link RemoteIndexBuildStrategy#readFromRepository} produces the correct result
95
+ */
96
+ public void testRepositoryRead () throws IOException {
97
+ // Create an InputStream with random values
98
+ int TEST_ARRAY_SIZE = 64 * 1024 * 10 ;
99
+ byte [] byteArray = new byte [TEST_ARRAY_SIZE ];
100
+ Random random = new Random ();
101
+ random .nextBytes (byteArray );
102
+ InputStream randomStream = new ByteArrayInputStream (byteArray );
103
+
104
+ // Create a test segment that we will read/write from
105
+ Directory directory ;
106
+ directory = newFSDirectory (createTempDir ());
107
+ String TEST_SEGMENT_NAME = "test-segment-name" ;
108
+ IndexOutput testIndexOutput = directory .createOutput (TEST_SEGMENT_NAME , IOContext .DEFAULT );
109
+ IndexOutputWithBuffer testIndexOutputWithBuffer = new IndexOutputWithBuffer (testIndexOutput );
110
+
111
+ // Set up RemoteIndexBuildStrategy and write to IndexOutput
112
+ RepositoriesService repositoriesService = mock (RepositoriesService .class );
113
+ BlobStoreRepository mockRepository = mock (BlobStoreRepository .class );
114
+ BlobPath testBasePath = new BlobPath ().add ("testBasePath" );
115
+ BlobStore mockBlobStore = mock (BlobStore .class );
116
+ AsyncMultiStreamBlobContainer mockBlobContainer = mock (AsyncMultiStreamBlobContainer .class );
117
+
118
+ when (repositoriesService .repository (any ())).thenReturn (mockRepository );
119
+ when (mockRepository .basePath ()).thenReturn (testBasePath );
120
+ when (mockRepository .blobStore ()).thenReturn (mockBlobStore );
121
+ when (mockBlobStore .blobContainer (any ())).thenReturn (mockBlobContainer );
122
+ when (mockBlobContainer .readBlob ("testFile" )).thenReturn (randomStream );
123
+
124
+ RemoteIndexBuildStrategy objectUnderTest = new RemoteIndexBuildStrategy (
125
+ () -> repositoriesService ,
126
+ mock (NativeIndexBuildStrategy .class )
127
+ );
128
+ // This should read from randomStream into testIndexOutput
129
+ BlobPath testPath = new BlobPath ().add ("testBasePath" ).add ("testDirectory" ).add ("testFile" );
130
+ objectUnderTest .readFromRepository (testPath , testIndexOutputWithBuffer );
131
+ testIndexOutput .close ();
132
+
133
+ // Now try to read from the IndexOutput
134
+ IndexInput testIndexInput = directory .openInput (TEST_SEGMENT_NAME , IOContext .DEFAULT );
135
+ byte [] resultByteArray = new byte [TEST_ARRAY_SIZE ];
136
+ testIndexInput .readBytes (resultByteArray , 0 , TEST_ARRAY_SIZE );
137
+ assertArrayEquals (byteArray , resultByteArray );
138
+
139
+ // Test Cleanup
140
+ testIndexInput .close ();
141
+ directory .close ();
142
+ }
67
143
}
0 commit comments