|
5 | 5 |
|
6 | 6 | package org.opensearch.knn.index.codec.nativeindex.remote;
|
7 | 7 |
|
| 8 | +import org.apache.lucene.store.Directory; |
| 9 | +import org.apache.lucene.store.IOContext; |
| 10 | +import org.apache.lucene.store.IndexInput; |
| 11 | +import org.apache.lucene.store.IndexOutput; |
8 | 12 | import org.mockito.Mockito;
|
9 | 13 | import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
|
10 | 14 | import org.opensearch.common.blobstore.BlobContainer;
|
|
13 | 17 | import org.opensearch.common.blobstore.fs.FsBlobStore;
|
14 | 18 | import org.opensearch.index.IndexSettings;
|
15 | 19 | import org.opensearch.knn.index.VectorDataType;
|
| 20 | +import org.opensearch.knn.index.engine.KNNEngine; |
| 21 | +import org.opensearch.knn.index.store.IndexOutputWithBuffer; |
16 | 22 | import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
|
17 | 23 | import org.opensearch.repositories.RepositoriesService;
|
18 | 24 | import org.opensearch.repositories.blobstore.BlobStoreRepository;
|
19 | 25 |
|
| 26 | +import java.io.ByteArrayInputStream; |
20 | 27 | import java.io.IOException;
|
| 28 | +import java.io.InputStream; |
21 | 29 | import java.nio.file.Path;
|
| 30 | +import java.util.List; |
| 31 | +import java.util.Random; |
22 | 32 |
|
23 | 33 | import static org.mockito.ArgumentMatchers.any;
|
24 | 34 | import static org.mockito.ArgumentMatchers.eq;
|
@@ -129,4 +139,65 @@ public void testAsyncUploadThrowsException() throws InterruptedException, IOExce
|
129 | 139 | verify(mockBlobStore).blobContainer(any());
|
130 | 140 | verify(mockRepository).basePath();
|
131 | 141 | }
|
| 142 | + |
| 143 | + /** |
| 144 | + * Verify the buffered read method in {@link DefaultVectorRepositoryAccessor#readFromRepository} produces the correct result |
| 145 | + */ |
| 146 | + public void testRepositoryRead() throws IOException { |
| 147 | + String TEST_FILE_NAME = randomAlphaOfLength(8) + KNNEngine.FAISS.getExtension(); |
| 148 | + |
| 149 | + // Create an InputStream with random values |
| 150 | + int TEST_ARRAY_SIZE = 64 * 1024 * 10; |
| 151 | + byte[] byteArray = new byte[TEST_ARRAY_SIZE]; |
| 152 | + Random random = new Random(); |
| 153 | + random.nextBytes(byteArray); |
| 154 | + InputStream randomStream = new ByteArrayInputStream(byteArray); |
| 155 | + |
| 156 | + // Create a test segment that we will read/write from |
| 157 | + Directory directory; |
| 158 | + directory = newFSDirectory(createTempDir()); |
| 159 | + String TEST_SEGMENT_NAME = "test-segment-name"; |
| 160 | + IndexOutput testIndexOutput = directory.createOutput(TEST_SEGMENT_NAME, IOContext.DEFAULT); |
| 161 | + IndexOutputWithBuffer testIndexOutputWithBuffer = new IndexOutputWithBuffer(testIndexOutput); |
| 162 | + |
| 163 | + // Set up RemoteIndexBuildStrategy and write to IndexOutput |
| 164 | + RepositoriesService repositoriesService = mock(RepositoriesService.class); |
| 165 | + BlobStoreRepository mockRepository = mock(BlobStoreRepository.class); |
| 166 | + BlobPath testBasePath = new BlobPath().add("testBasePath"); |
| 167 | + BlobStore mockBlobStore = mock(BlobStore.class); |
| 168 | + AsyncMultiStreamBlobContainer mockBlobContainer = mock(AsyncMultiStreamBlobContainer.class); |
| 169 | + |
| 170 | + when(repositoriesService.repository(any())).thenReturn(mockRepository); |
| 171 | + when(mockRepository.basePath()).thenReturn(testBasePath); |
| 172 | + when(mockRepository.blobStore()).thenReturn(mockBlobStore); |
| 173 | + when(mockBlobStore.blobContainer(any())).thenReturn(mockBlobContainer); |
| 174 | + when(mockBlobContainer.readBlob(TEST_FILE_NAME)).thenReturn(randomStream); |
| 175 | + |
| 176 | + VectorRepositoryAccessor objectUnderTest = new DefaultVectorRepositoryAccessor(mockRepository, mock(IndexSettings.class)); |
| 177 | + |
| 178 | + // Verify file extension check |
| 179 | + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.readFromRepository("test_file.txt", testIndexOutputWithBuffer)); |
| 180 | + |
| 181 | + // Now test with valid file extensions |
| 182 | + String testPath = randomFrom( |
| 183 | + List.of( |
| 184 | + "testBasePath/testDirectory/" + TEST_FILE_NAME, // Test with subdirectory |
| 185 | + "testBasePath/" + TEST_FILE_NAME, // Test with only base path |
| 186 | + TEST_FILE_NAME // test with no base path |
| 187 | + ) |
| 188 | + ); |
| 189 | + // This should read from randomStream into testIndexOutput |
| 190 | + objectUnderTest.readFromRepository(testPath, testIndexOutputWithBuffer); |
| 191 | + testIndexOutput.close(); |
| 192 | + |
| 193 | + // Now try to read from the IndexOutput |
| 194 | + IndexInput testIndexInput = directory.openInput(TEST_SEGMENT_NAME, IOContext.DEFAULT); |
| 195 | + byte[] resultByteArray = new byte[TEST_ARRAY_SIZE]; |
| 196 | + testIndexInput.readBytes(resultByteArray, 0, TEST_ARRAY_SIZE); |
| 197 | + assertArrayEquals(byteArray, resultByteArray); |
| 198 | + |
| 199 | + // Test Cleanup |
| 200 | + testIndexInput.close(); |
| 201 | + directory.close(); |
| 202 | + } |
132 | 203 | }
|
0 commit comments