|
5 | 5 |
|
6 | 6 | package org.opensearch.knn.index.codec.nativeindex.remote;
|
7 | 7 |
|
| 8 | +import com.google.common.annotations.VisibleForTesting; |
8 | 9 | import lombok.AllArgsConstructor;
|
9 | 10 | import lombok.extern.log4j.Log4j2;
|
10 | 11 | import org.opensearch.action.LatchedActionListener;
|
|
32 | 33 | import java.util.concurrent.CountDownLatch;
|
33 | 34 | import java.util.concurrent.atomic.AtomicReference;
|
34 | 35 | import java.util.function.Supplier;
|
| 36 | +import java.util.zip.CRC32; |
| 37 | +import java.util.zip.CheckedInputStream; |
35 | 38 |
|
36 | 39 | import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
|
37 | 40 | import static org.opensearch.knn.index.remote.KNNRemoteConstants.DOC_ID_FILE_EXTENSION;
|
@@ -74,50 +77,42 @@ public void writeToRepository(
|
74 | 77 | initializeVectorValues(knnVectorValues);
|
75 | 78 | long vectorBlobLength = (long) knnVectorValues.bytesPerVector() * totalLiveDocs;
|
76 | 79 |
|
77 |
| - if (blobContainer instanceof AsyncMultiStreamBlobContainer) { |
| 80 | + if (blobContainer instanceof AsyncMultiStreamBlobContainer asyncBlobContainer) { |
78 | 81 | // First initiate vectors upload
|
79 | 82 | log.debug("Repository {} Supports Parallel Blob Upload", repository);
|
80 | 83 | // WriteContext is the main entry point into asyncBlobUpload. It stores all of our upload configurations, analogous to
|
81 | 84 | // BuildIndexParams
|
82 |
| - WriteContext writeContext = new WriteContext.Builder().fileName(blobName + VECTOR_BLOB_FILE_EXTENSION) |
83 |
| - .streamContextSupplier((partSize) -> getStreamContext(partSize, vectorBlobLength, knnVectorValuesSupplier, vectorDataType)) |
84 |
| - .fileSize(vectorBlobLength) |
85 |
| - .failIfAlreadyExists(true) |
86 |
| - .writePriority(WritePriority.NORMAL) |
87 |
| - // TODO: Checksum implementations -- It is difficult to calculate a checksum on the knnVectorValues as |
88 |
| - // there is no underlying file upon which we can create the checksum. We should be able to create a |
89 |
| - // checksum still by iterating through once, however this will be an expensive operation. |
90 |
| - .uploadFinalizer((bool) -> {}) |
91 |
| - .doRemoteDataIntegrityCheck(false) |
92 |
| - .expectedChecksum(null) |
93 |
| - .build(); |
| 85 | + WriteContext writeContext = createWriteContext( |
| 86 | + blobName, |
| 87 | + vectorBlobLength, |
| 88 | + knnVectorValuesSupplier, |
| 89 | + vectorDataType, |
| 90 | + asyncBlobContainer.remoteIntegrityCheckSupported() |
| 91 | + ); |
94 | 92 |
|
95 | 93 | AtomicReference<Exception> exception = new AtomicReference<>();
|
96 | 94 | final CountDownLatch latch = new CountDownLatch(1);
|
97 |
| - ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload( |
98 |
| - writeContext, |
99 |
| - new LatchedActionListener<>(new ActionListener<>() { |
100 |
| - @Override |
101 |
| - public void onResponse(Void unused) { |
102 |
| - log.debug( |
103 |
| - "Parallel vector upload succeeded for blob {} with size {}", |
104 |
| - blobName + VECTOR_BLOB_FILE_EXTENSION, |
105 |
| - vectorBlobLength |
106 |
| - ); |
107 |
| - } |
108 |
| - |
109 |
| - @Override |
110 |
| - public void onFailure(Exception e) { |
111 |
| - log.error( |
112 |
| - "Parallel vector upload failed for blob {} with size {}", |
113 |
| - blobName + VECTOR_BLOB_FILE_EXTENSION, |
114 |
| - vectorBlobLength, |
115 |
| - e |
116 |
| - ); |
117 |
| - exception.set(e); |
118 |
| - } |
119 |
| - }, latch) |
120 |
| - ); |
| 95 | + asyncBlobContainer.asyncBlobUpload(writeContext, new LatchedActionListener<>(new ActionListener<>() { |
| 96 | + @Override |
| 97 | + public void onResponse(Void unused) { |
| 98 | + log.debug( |
| 99 | + "Parallel vector upload succeeded for blob {} with size {}", |
| 100 | + blobName + VECTOR_BLOB_FILE_EXTENSION, |
| 101 | + vectorBlobLength |
| 102 | + ); |
| 103 | + } |
| 104 | + |
| 105 | + @Override |
| 106 | + public void onFailure(Exception e) { |
| 107 | + log.error( |
| 108 | + "Parallel vector upload failed for blob {} with size {}", |
| 109 | + blobName + VECTOR_BLOB_FILE_EXTENSION, |
| 110 | + vectorBlobLength, |
| 111 | + e |
| 112 | + ); |
| 113 | + exception.set(e); |
| 114 | + } |
| 115 | + }, latch)); |
121 | 116 |
|
122 | 117 | // Then upload doc id blob before waiting on vector uploads
|
123 | 118 | // TODO: We wrap with a BufferedInputStream to support retries. We can tune this buffer size to optimize performance.
|
@@ -215,6 +210,61 @@ private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOExceptio
|
215 | 210 | });
|
216 | 211 | }
|
217 | 212 |
|
| 213 | + /** |
| 214 | + * Creates a {@link WriteContext} meant to be used by {@link AsyncMultiStreamBlobContainer#asyncBlobUpload}. If integrity checking is supported, calculates a checksum as well. |
| 215 | + * @param blobName |
| 216 | + * @param vectorBlobLength |
| 217 | + * @param knnVectorValuesSupplier |
| 218 | + * @param vectorDataType |
| 219 | + * @param supportsIntegrityCheck |
| 220 | + * @return |
| 221 | + * @throws IOException |
| 222 | + */ |
| 223 | + private WriteContext createWriteContext( |
| 224 | + String blobName, |
| 225 | + long vectorBlobLength, |
| 226 | + Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, |
| 227 | + VectorDataType vectorDataType, |
| 228 | + boolean supportsIntegrityCheck |
| 229 | + ) throws IOException { |
| 230 | + return new WriteContext.Builder().fileName(blobName + VECTOR_BLOB_FILE_EXTENSION) |
| 231 | + .streamContextSupplier((partSize) -> getStreamContext(partSize, vectorBlobLength, knnVectorValuesSupplier, vectorDataType)) |
| 232 | + .fileSize(vectorBlobLength) |
| 233 | + .failIfAlreadyExists(true) |
| 234 | + .writePriority(WritePriority.NORMAL) |
| 235 | + .doRemoteDataIntegrityCheck(supportsIntegrityCheck) |
| 236 | + .uploadFinalizer((bool) -> {}) |
| 237 | + .expectedChecksum(supportsIntegrityCheck ? getExpectedChecksum(knnVectorValuesSupplier.get(), vectorDataType) : null) |
| 238 | + .build(); |
| 239 | + } |
| 240 | + |
| 241 | + /** |
| 242 | + * Calculates a checksum on the given {@link KNNVectorValues}, representing all the vector data for the index build operation. |
| 243 | + * This is done by creating a {@link VectorValuesInputStream} which is wrapped by a {@link CheckedInputStream} and then reading all the data through the stream to calculate the checksum. |
| 244 | + * Note: This does add some overhead to the vector blob upload, as we are reading through the KNNVectorValues an additional time. If instead of taking an expected checksum up front |
| 245 | + * the WriteContext accepted an expectedChecksumSupplier, we could calculate the checksum as the stream is being uploaded and use that same value to compare, however this is pending |
| 246 | + * a change in OpenSearch core. |
| 247 | + * |
| 248 | + * @param knnVectorValues |
| 249 | + * @param vectorDataType |
| 250 | + * @return |
| 251 | + * @throws IOException |
| 252 | + */ |
| 253 | + @VisibleForTesting |
| 254 | + long getExpectedChecksum(KNNVectorValues<?> knnVectorValues, VectorDataType vectorDataType) throws IOException { |
| 255 | + initializeVectorValues(knnVectorValues); |
| 256 | + CheckedInputStream checkedStream = new CheckedInputStream( |
| 257 | + new VectorValuesInputStream(knnVectorValues, vectorDataType), |
| 258 | + new CRC32() |
| 259 | + ); |
| 260 | + // VectorValuesInputStream#read only reads 1 vector max at a time, so no point making this buffer any larger than that |
| 261 | + int bufferSize = knnVectorValues.bytesPerVector(); |
| 262 | + final byte[] buffer = new byte[bufferSize]; |
| 263 | + while (checkedStream.read(buffer, 0, bufferSize) != -1) { |
| 264 | + } |
| 265 | + return checkedStream.getChecksum().getValue(); |
| 266 | + } |
| 267 | + |
218 | 268 | @Override
|
219 | 269 | public void readFromRepository(String path, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
|
220 | 270 | if (path == null || path.isEmpty()) {
|
|
0 commit comments