14
14
import org .opensearch .knn .index .KNNSettings ;
15
15
import org .opensearch .knn .index .codec .nativeindex .NativeIndexBuildStrategy ;
16
16
import org .opensearch .knn .index .codec .nativeindex .model .BuildIndexParams ;
17
+ import org .opensearch .knn .index .vectorvalues .KNNVectorValues ;
17
18
import org .opensearch .repositories .RepositoriesService ;
18
19
import org .opensearch .repositories .Repository ;
19
20
import org .opensearch .repositories .RepositoryMissingException ;
25
26
import static org .opensearch .knn .index .KNNSettings .KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING ;
26
27
import static org .opensearch .knn .index .KNNSettings .KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING ;
27
28
import static org .opensearch .knn .index .KNNSettings .KNN_REMOTE_VECTOR_REPO_SETTING ;
29
+ import static org .opensearch .knn .index .codec .util .KNNCodecUtil .initializeVectorValues ;
30
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .BUILD_REQUEST_FAILURE_COUNT ;
31
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .BUILD_REQUEST_SUCCESS_COUNT ;
32
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_FAILURE_COUNT ;
33
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_SUCCESS_COUNT ;
34
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_TIME ;
35
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_CURRENT_OPERATIONS ;
36
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_CURRENT_SIZE ;
37
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_TIME ;
38
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WAITING_TIME ;
39
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_FAILURE_COUNT ;
40
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_SUCCESS_COUNT ;
41
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_TIME ;
28
42
29
43
/**
30
44
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
@@ -110,9 +124,17 @@ public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings, long
110
124
public void buildAndWriteIndex (BuildIndexParams indexInfo ) throws IOException {
111
125
StopWatch stopWatch ;
112
126
long time_in_millis ;
127
+ final VectorRepositoryAccessor vectorRepositoryAccessor ;
128
+
129
+ StopWatch remoteBuildTimeStopwatch = new StopWatch ();
130
+ KNNVectorValues <?> knnVectorValues = indexInfo .getKnnVectorValuesSupplier ().get ();
131
+ initializeVectorValues (knnVectorValues );
132
+ startRemoteIndexBuildStats ((long ) indexInfo .getTotalLiveDocs () * knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
133
+
134
+ // 1. Write required data to repository
135
+ stopWatch = new StopWatch ().start ();
113
136
try {
114
- VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (getRepository (), indexSettings );
115
- stopWatch = new StopWatch ().start ();
137
+ vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (getRepository (), indexSettings );
116
138
// We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
117
139
// shard id in this context.
118
140
String blobName = UUIDs .base64UUID () + "_" + indexInfo .getFieldName () + "_" + indexInfo .getSegmentWriteState ().segmentInfo .name ;
@@ -123,27 +145,62 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
123
145
indexInfo .getKnnVectorValuesSupplier ()
124
146
);
125
147
time_in_millis = stopWatch .stop ().totalTime ().millis ();
148
+ WRITE_SUCCESS_COUNT .increment ();
149
+ WRITE_TIME .incrementBy (time_in_millis );
126
150
log .debug ("Repository write took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
151
+ } catch (Exception e ) {
152
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
153
+ WRITE_FAILURE_COUNT .increment ();
154
+ log .error ("Repository write failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
155
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
156
+ return ;
157
+ }
127
158
128
- stopWatch = new StopWatch ().start ();
159
+ // 2. Triggers index build
160
+ stopWatch = new StopWatch ().start ();
161
+ try {
129
162
submitVectorBuild ();
130
163
time_in_millis = stopWatch .stop ().totalTime ().millis ();
164
+ BUILD_REQUEST_SUCCESS_COUNT .increment ();
131
165
log .debug ("Submit vector build took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
166
+ } catch (Exception e ) {
167
+ BUILD_REQUEST_FAILURE_COUNT .increment ();
168
+ log .error ("Submit vector failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
169
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
170
+ return ;
171
+ }
132
172
133
- stopWatch = new StopWatch ().start ();
173
+ // 3. Awaits on vector build to complete
174
+ stopWatch = new StopWatch ().start ();
175
+ try {
134
176
awaitVectorBuild ();
135
177
time_in_millis = stopWatch .stop ().totalTime ().millis ();
178
+ WAITING_TIME .incrementBy (time_in_millis );
136
179
log .debug ("Await vector build took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
180
+ } catch (Exception e ) {
181
+ log .debug ("Await vector build failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
182
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
183
+ return ;
184
+ }
137
185
138
- stopWatch = new StopWatch ().start ();
186
+ // 4. Downloads index file and writes to indexOutput
187
+ stopWatch = new StopWatch ().start ();
188
+ try {
189
+ assert vectorRepositoryAccessor != null ;
139
190
vectorRepositoryAccessor .readFromRepository ();
140
191
time_in_millis = stopWatch .stop ().totalTime ().millis ();
192
+ READ_SUCCESS_COUNT .increment ();
193
+ READ_TIME .incrementBy (time_in_millis );
141
194
log .debug ("Repository read took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
142
195
} catch (Exception e ) {
143
- // TODO: This needs more robust failure handling
144
- log .warn ("Failed to build index remotely" , e );
145
- fallbackStrategy .buildAndWriteIndex (indexInfo );
196
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
197
+ READ_FAILURE_COUNT .increment ();
198
+ log .error ("Repository read failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
199
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
200
+ return ;
146
201
}
202
+
203
+ endRemoteIndexBuildStats ((long ) indexInfo .getTotalLiveDocs () * knnVectorValues .bytesPerVector (), stopWatch );
147
204
}
148
205
149
206
/**
@@ -178,4 +235,28 @@ private void submitVectorBuild() {
178
235
private void awaitVectorBuild () {
179
236
throw new NotImplementedException ();
180
237
}
238
+
239
+ private void startRemoteIndexBuildStats (long size , StopWatch stopWatch ) {
240
+ stopWatch .start ();
241
+ REMOTE_INDEX_BUILD_CURRENT_OPERATIONS .increment ();
242
+ REMOTE_INDEX_BUILD_CURRENT_SIZE .incrementBy (size );
243
+ }
244
+
245
+ private void endRemoteIndexBuildStats (long size , StopWatch stopWatch ) {
246
+ long time_in_millis = stopWatch .stop ().totalTime ().millis ();
247
+ REMOTE_INDEX_BUILD_CURRENT_OPERATIONS .decrement ();
248
+ REMOTE_INDEX_BUILD_CURRENT_SIZE .decrementBy (size );
249
+ REMOTE_INDEX_BUILD_TIME .incrementBy (time_in_millis );
250
+ }
251
+
252
+ /**
253
+ * Helper method to collect remote index build metrics on failure and invoke fallback strategy
254
+ * @param indexParams
255
+ * @param bytesPerVector
256
+ * @throws IOException
257
+ */
258
+ private void handleFailure (BuildIndexParams indexParams , long bytesPerVector , StopWatch stopWatch ) throws IOException {
259
+ endRemoteIndexBuildStats (indexParams .getTotalLiveDocs () * bytesPerVector , stopWatch );
260
+ fallbackStrategy .buildAndWriteIndex (indexParams );
261
+ }
181
262
}
0 commit comments