@@ -242,6 +242,34 @@ private void returnFailure(BulkByScrollResponse response, String modelId, Action
242
242
actionListener .onFailure (new OpenSearchStatusException (errorMessage , RestStatus .INTERNAL_SERVER_ERROR ));
243
243
}
244
244
245
+ private void checkDownstreamTaskBeforeDeleteModel (String modelId , Boolean isHidden , ActionListener <DeleteResponse > actionListener ) {
246
+ // Now checks 3 resources associated with the model id 1. Agent 2. Search pipeline 3. ingest pipeline
247
+ CountDownLatch countDownLatch = new CountDownLatch (3 );
248
+ AtomicBoolean noneBlocked = new AtomicBoolean (true );
249
+ ConcurrentLinkedQueue <String > errorMessages = new ConcurrentLinkedQueue <>();
250
+ ActionListener <Boolean > countDownActionListener = ActionListener .wrap (b -> {
251
+ countDownLatch .countDown ();
252
+ noneBlocked .compareAndSet (true , b );
253
+ if (countDownLatch .getCount () == 0 ) {
254
+ if (noneBlocked .get ()) {
255
+ deleteModel (modelId , isHidden , actionListener );
256
+ } else {
257
+ actionListener .onFailure (new OpenSearchStatusException (String .join (". " , errorMessages ), RestStatus .CONFLICT ));
258
+ }
259
+ }
260
+ }, e -> {
261
+ countDownLatch .countDown ();
262
+ noneBlocked .set (false );
263
+ errorMessages .add (e .getMessage ());
264
+ actionListener .onFailure (new OpenSearchStatusException (e .getMessage (), RestStatus .CONFLICT ));
265
+
266
+ });
267
+ checkAgentBeforeDeleteModel (modelId , countDownActionListener );
268
+ checkIngestPipelineBeforeDeleteModel (modelId , countDownActionListener );
269
+ checkSearchPipelineBeforeDeleteModel (modelId , countDownActionListener );
270
+ }
271
+
272
+
245
273
private void deleteModel (String modelId , Boolean isHidden , ActionListener <DeleteResponse > actionListener ) {
246
274
DeleteRequest deleteRequest = new DeleteRequest (ML_MODEL_INDEX , modelId ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
247
275
client .delete (deleteRequest , new ActionListener <>() {
@@ -271,7 +299,6 @@ private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean>
271
299
actionListener .onResponse (true );
272
300
} else {
273
301
String errorMessage = formatAgentErrorMessage (searchHits );
274
-
275
302
actionListener .onFailure (new OpenSearchStatusException (errorMessage , RestStatus .CONFLICT ));
276
303
}
277
304
@@ -333,34 +360,6 @@ private void checkPipelineBeforeDeleteModel(
333
360
334
361
}
335
362
336
- private void checkDownstreamTaskBeforeDeleteModel (String modelId , Boolean isHidden , ActionListener <DeleteResponse > actionListener ) {
337
- // Now checks 3 resources associated with with the model id 1. Agent 2. Search pipeline 3. ingest pipeline
338
- CountDownLatch countDownLatch = new CountDownLatch (3 );
339
- AtomicBoolean noneBlocked = new AtomicBoolean (true );
340
- ConcurrentLinkedQueue <String > errorMessages = new ConcurrentLinkedQueue <>();
341
- ActionListener <Boolean > countDownActionListener = ActionListener .wrap (b -> {
342
- countDownLatch .countDown ();
343
- noneBlocked .compareAndSet (true , b );
344
- if (countDownLatch .getCount () == 0 ) {
345
- if (noneBlocked .get ()) {
346
- deleteModel (modelId , isHidden , actionListener );
347
- } else {
348
- actionListener .onFailure (new OpenSearchStatusException (String .join (". " , errorMessages ), RestStatus .CONFLICT ));
349
- }
350
- }
351
- }, e -> {
352
- countDownLatch .countDown ();
353
- noneBlocked .set (false );
354
- errorMessages .add (e .getMessage ());
355
- if (countDownLatch .getCount () == 0 ) {
356
- actionListener .onFailure (new OpenSearchStatusException (String .join (". " , errorMessages ), RestStatus .CONFLICT ));
357
- }
358
-
359
- });
360
- checkAgentBeforeDeleteModel (modelId , countDownActionListener );
361
- checkIngestPipelineBeforeDeleteModel (modelId , countDownActionListener );
362
- checkSearchPipelineBeforeDeleteModel (modelId , countDownActionListener );
363
- }
364
363
365
364
private void deleteModelChunksAndController (
366
365
ActionListener <DeleteResponse > actionListener ,
@@ -473,21 +472,6 @@ private List<String> findDependentPipelinesEasy(Map<String, Object> allConfigMap
473
472
return dependentPipelineConfigurations ;
474
473
}
475
474
476
- private <T > List <String > findDependentPipelines (
477
- List <T > pipelineConfigurations ,
478
- String candidateModelId ,
479
- Function <T , Map <String , Object >> getConfigFunction ,
480
- Function <T , String > getIdFunction
481
- ) {
482
- List <String > dependentPipelineConfigurations = new ArrayList <>();
483
- for (T pipelineConfiguration : pipelineConfigurations ) {
484
- Map <String , Object > config = getConfigFunction .apply (pipelineConfiguration );
485
- if (searchThroughConfig (config , candidateModelId )) {
486
- dependentPipelineConfigurations .add (getIdFunction .apply (pipelineConfiguration ));
487
- }
488
- }
489
- return dependentPipelineConfigurations ;
490
- }
491
475
492
476
// This method is to go through the pipeline configs and the configuration is a map of string to objects.
493
477
// Objects can be a list or a map. we will search exhaustively through the configuration for any match of the candidateId.
0 commit comments