14
14
import org .opensearch .knn .index .KNNSettings ;
15
15
import org .opensearch .knn .index .codec .nativeindex .NativeIndexBuildStrategy ;
16
16
import org .opensearch .knn .index .codec .nativeindex .model .BuildIndexParams ;
17
+ import org .opensearch .knn .index .vectorvalues .KNNVectorValues ;
17
18
import org .opensearch .repositories .RepositoriesService ;
18
19
import org .opensearch .repositories .Repository ;
19
20
import org .opensearch .repositories .RepositoryMissingException ;
25
26
import static org .opensearch .knn .index .KNNSettings .KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING ;
26
27
import static org .opensearch .knn .index .KNNSettings .KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING ;
27
28
import static org .opensearch .knn .index .KNNSettings .KNN_REMOTE_VECTOR_REPO_SETTING ;
29
+ import static org .opensearch .knn .index .codec .util .KNNCodecUtil .initializeVectorValues ;
30
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .BUILD_REQUEST_FAILURE_COUNT ;
31
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .BUILD_REQUEST_SUCCESS_COUNT ;
32
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_FAILURE_COUNT ;
33
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_SUCCESS_COUNT ;
34
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_TIME ;
35
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_CURRENT_OPERATIONS ;
36
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_CURRENT_SIZE ;
37
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_TIME ;
38
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WAITING_TIME ;
39
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_FAILURE_COUNT ;
40
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_SUCCESS_COUNT ;
41
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_TIME ;
28
42
29
43
/**
30
44
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
@@ -108,11 +122,17 @@ public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings, long
108
122
*/
109
123
@ Override
110
124
public void buildAndWriteIndex (BuildIndexParams indexInfo ) throws IOException {
125
+ StopWatch remoteBuildTimeStopwatch = new StopWatch ();
126
+ KNNVectorValues <?> knnVectorValues = indexInfo .getKnnVectorValuesSupplier ().get ();
127
+ initializeVectorValues (knnVectorValues );
128
+ startRemoteIndexBuildStats ((long ) indexInfo .getTotalLiveDocs () * knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
111
129
StopWatch stopWatch ;
112
130
long time_in_millis ;
131
+ VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (getRepository (), indexSettings );
132
+
133
+ // 1. Write required data to repository
134
+ stopWatch = new StopWatch ().start ();
113
135
try {
114
- VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (getRepository (), indexSettings );
115
- stopWatch = new StopWatch ().start ();
116
136
// We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
117
137
// shard id in this context.
118
138
String blobName = UUIDs .base64UUID () + "_" + indexInfo .getFieldName () + "_" + indexInfo .getSegmentWriteState ().segmentInfo .name ;
@@ -123,27 +143,61 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
123
143
indexInfo .getKnnVectorValuesSupplier ()
124
144
);
125
145
time_in_millis = stopWatch .stop ().totalTime ().millis ();
146
+ WRITE_SUCCESS_COUNT .increment ();
147
+ WRITE_TIME .incrementBy (time_in_millis );
126
148
log .debug ("Repository write took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
149
+ } catch (Exception e ) {
150
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
151
+ WRITE_FAILURE_COUNT .increment ();
152
+ log .error ("Repository write failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
153
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
154
+ return ;
155
+ }
127
156
128
- stopWatch = new StopWatch ().start ();
157
+ // 2. Triggers index build
158
+ stopWatch = new StopWatch ().start ();
159
+ try {
129
160
submitVectorBuild ();
130
161
time_in_millis = stopWatch .stop ().totalTime ().millis ();
162
+ BUILD_REQUEST_SUCCESS_COUNT .increment ();
131
163
log .debug ("Submit vector build took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
164
+ } catch (Exception e ) {
165
+ BUILD_REQUEST_FAILURE_COUNT .increment ();
166
+ log .error ("Submit vector failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
167
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
168
+ return ;
169
+ }
132
170
133
- stopWatch = new StopWatch ().start ();
171
+ // 3. Awaits on vector build to complete
172
+ stopWatch = new StopWatch ().start ();
173
+ try {
134
174
awaitVectorBuild ();
135
175
time_in_millis = stopWatch .stop ().totalTime ().millis ();
176
+ WAITING_TIME .incrementBy (time_in_millis );
136
177
log .debug ("Await vector build took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
178
+ } catch (Exception e ) {
179
+ log .debug ("Await vector build failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
180
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
181
+ return ;
182
+ }
137
183
138
- stopWatch = new StopWatch ().start ();
184
+ // 4. Downloads index file and writes to indexOutput
185
+ stopWatch = new StopWatch ().start ();
186
+ try {
139
187
vectorRepositoryAccessor .readFromRepository ();
140
188
time_in_millis = stopWatch .stop ().totalTime ().millis ();
189
+ READ_SUCCESS_COUNT .increment ();
190
+ READ_TIME .incrementBy (time_in_millis );
141
191
log .debug ("Repository read took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
142
192
} catch (Exception e ) {
143
- // TODO: This needs more robust failure handling
144
- log .warn ("Failed to build index remotely" , e );
145
- fallbackStrategy .buildAndWriteIndex (indexInfo );
193
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
194
+ READ_FAILURE_COUNT .increment ();
195
+ log .error ("Repository read failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
196
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildTimeStopwatch );
197
+ return ;
146
198
}
199
+
200
+ endRemoteIndexBuildStats ((long ) indexInfo .getTotalLiveDocs () * knnVectorValues .bytesPerVector (), stopWatch );
147
201
}
148
202
149
203
/**
@@ -178,4 +232,28 @@ private void submitVectorBuild() {
178
232
private void awaitVectorBuild () {
179
233
throw new NotImplementedException ();
180
234
}
235
+
236
+ private void startRemoteIndexBuildStats (long size , StopWatch stopWatch ) {
237
+ stopWatch .start ();
238
+ REMOTE_INDEX_BUILD_CURRENT_OPERATIONS .increment ();
239
+ REMOTE_INDEX_BUILD_CURRENT_SIZE .incrementBy (size );
240
+ }
241
+
242
+ private void endRemoteIndexBuildStats (long size , StopWatch stopWatch ) {
243
+ long time_in_millis = stopWatch .stop ().totalTime ().millis ();
244
+ REMOTE_INDEX_BUILD_CURRENT_OPERATIONS .decrement ();
245
+ REMOTE_INDEX_BUILD_CURRENT_SIZE .incrementBy (size );
246
+ REMOTE_INDEX_BUILD_TIME .incrementBy (time_in_millis );
247
+ }
248
+
249
+ /**
250
+ * Helper method to collect remote index build metrics on failure and invoke fallback strategy
251
+ * @param indexParams
252
+ * @param bytesPerVector
253
+ * @throws IOException
254
+ */
255
+ private void handleFailure (BuildIndexParams indexParams , long bytesPerVector , StopWatch stopWatch ) throws IOException {
256
+ endRemoteIndexBuildStats (indexParams .getTotalLiveDocs () * bytesPerVector , stopWatch );
257
+ fallbackStrategy .buildAndWriteIndex (indexParams );
258
+ }
181
259
}
0 commit comments