28
28
import java .util .concurrent .CountDownLatch ;
29
29
import java .util .concurrent .atomic .AtomicBoolean ;
30
30
import java .util .function .Function ;
31
+ import java .util .function .Supplier ;
31
32
32
33
import org .apache .commons .lang3 .tuple .Pair ;
33
34
import org .opensearch .OpenSearchStatusException ;
34
35
import org .opensearch .ResourceNotFoundException ;
35
36
import org .opensearch .action .ActionRequest ;
37
+ import org .opensearch .action .ActionType ;
36
38
import org .opensearch .action .delete .DeleteRequest ;
37
39
import org .opensearch .action .delete .DeleteResponse ;
38
40
import org .opensearch .action .get .GetRequest ;
66
68
import org .opensearch .ml .common .transport .model .MLModelDeleteAction ;
67
69
import org .opensearch .ml .common .transport .model .MLModelDeleteRequest ;
68
70
import org .opensearch .ml .common .transport .model .MLModelGetRequest ;
69
- import org .opensearch .ml .engine .tools .AgentModelsSearcher ;
71
+ import org .opensearch .ml .common .utils .StringUtils ;
72
+ import org .opensearch .ml .engine .utils .AgentModelsSearcher ;
70
73
import org .opensearch .ml .helper .ModelAccessControlHelper ;
71
74
import org .opensearch .ml .utils .RestActionUtils ;
72
75
import org .opensearch .search .SearchHit ;
@@ -284,48 +287,27 @@ private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean>
284
287
}
285
288
286
289
private void checkIngestPipelineBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
287
- GetPipelineRequest getPipelineRequest = new GetPipelineRequest ();
288
- client .execute (GetPipelineAction .INSTANCE , getPipelineRequest , ActionListener .wrap (ingestPipelineResponse -> {
289
- List <String > allDependentPipelineIds = findDependentPipelines (
290
- ingestPipelineResponse .pipelines (),
291
- modelId ,
292
- org .opensearch .ingest .PipelineConfiguration ::getConfigAsMap ,
293
- org .opensearch .ingest .PipelineConfiguration ::getId
294
- );
295
- if (allDependentPipelineIds .isEmpty ()) {
296
- actionListener .onResponse (true );
297
- } else {
298
- actionListener
299
- .onFailure (
300
- new OpenSearchStatusException (
301
- String
302
- .format (
303
- Locale .ROOT ,
304
- "%d ingest pipelines are still using this model, please delete or update the pipelines first: %s" ,
305
- allDependentPipelineIds .size (),
306
- Arrays .toString (allDependentPipelineIds .toArray (new String [0 ]))
307
- ),
308
- RestStatus .CONFLICT
309
- )
310
- );
311
- }
312
- }, e -> {
313
- log .error ("Failed to delete ML Model: " + modelId , e );
314
- actionListener .onFailure (e );
315
-
316
- }));
290
+ checkPipelineBeforeDeleteModel (modelId , actionListener , "ingest" , GetPipelineRequest ::new , GetPipelineAction .INSTANCE );
317
291
318
292
}
319
293
320
294
private void checkSearchPipelineBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
321
- GetSearchPipelineRequest getSearchPipelineRequest = new GetSearchPipelineRequest ();
322
- client .execute (GetSearchPipelineAction .INSTANCE , getSearchPipelineRequest , ActionListener .wrap (searchPipelineResponse -> {
323
- List <String > allDependentPipelineIds = findDependentPipelines (
324
- searchPipelineResponse .pipelines (),
325
- modelId ,
326
- org .opensearch .search .pipeline .PipelineConfiguration ::getConfigAsMap ,
327
- org .opensearch .search .pipeline .PipelineConfiguration ::getId
328
- );
295
+ checkPipelineBeforeDeleteModel (modelId , actionListener , "search" , GetSearchPipelineRequest ::new , GetSearchPipelineAction .INSTANCE );
296
+
297
+ }
298
+
299
+ private void checkPipelineBeforeDeleteModel (
300
+ String modelId ,
301
+ ActionListener <Boolean > actionListener ,
302
+ String pipelineType ,
303
+ Supplier <ActionRequest > requestSupplier ,
304
+ ActionType actionType
305
+ ) {
306
+ ActionRequest request = requestSupplier .get ();
307
+ client .execute (actionType , request , ActionListener .wrap (pipelineResponse -> {
308
+ String responseString = pipelineResponse .toString ();
309
+ Map <String , Object > allConfigMap = StringUtils .fromJson (pipelineResponse .toString (), "" );
310
+ List <String > allDependentPipelineIds = findDependentPipelinesEasy (allConfigMap , modelId );
329
311
if (allDependentPipelineIds .isEmpty ()) {
330
312
actionListener .onResponse (true );
331
313
} else {
@@ -335,8 +317,9 @@ private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener
335
317
String
336
318
.format (
337
319
Locale .ROOT ,
338
- "%d search pipelines are still using this model, please delete or update the pipelines first: %s" ,
320
+ "%d %s pipelines are still using this model, please delete or update the pipelines first: %s" ,
339
321
allDependentPipelineIds .size (),
322
+ pipelineType ,
340
323
Arrays .toString (allDependentPipelineIds .toArray (new String [0 ]))
341
324
),
342
325
RestStatus .CONFLICT
@@ -479,6 +462,18 @@ private Boolean isModelNotDeployed(MLModelState mlModelState) {
479
462
&& !mlModelState .equals (MLModelState .PARTIALLY_DEPLOYED );
480
463
}
481
464
465
+ private List <String > findDependentPipelinesEasy (Map <String , Object > allConfigMap , String candidateModelId ) {
466
+ List <String > dependentPipelineConfigurations = new ArrayList <>();
467
+ for (Map .Entry <String , Object > entry : allConfigMap .entrySet ()) {
468
+ String id = entry .getKey ();
469
+ Map <String , Object > config = (Map <String , Object >) entry .getValue ();
470
+ if (searchThroughConfig (config , candidateModelId )) {
471
+ dependentPipelineConfigurations .add (id );
472
+ }
473
+ }
474
+ return dependentPipelineConfigurations ;
475
+ }
476
+
482
477
private <T > List <String > findDependentPipelines (
483
478
List <T > pipelineConfigurations ,
484
479
String candidateModelId ,
@@ -533,24 +528,22 @@ private Boolean searchThroughConfig(Object searchCandidate, String candidateId)
533
528
}
534
529
535
530
private String formatAgentErrorMessage (SearchHit [] hits ) {
536
- boolean isHidden = false ;
537
531
List <String > agentIds = new ArrayList <>();
538
532
for (SearchHit hit : hits ) {
539
533
Map <String , Object > sourceAsMap = hit .getSourceAsMap ();
540
- isHidden = isHidden || Boolean .parseBoolean ((String ) sourceAsMap .getOrDefault (MLAgent .IS_HIDDEN_FIELD , false ));
541
- agentIds .add (hit .getId ());
542
- }
543
- if (isHidden ) {
544
- return String
545
- .format (Locale .ROOT , "%d agents are still using this model, please delete or update the agents first" , hits .length );
534
+ Boolean isHidden = (Boolean ) sourceAsMap .getOrDefault (MLAgent .IS_HIDDEN_FIELD , false );
535
+ if (!isHidden ) {
536
+ agentIds .add (hit .getId ());
537
+ }
546
538
}
547
539
return String
548
540
.format (
549
541
Locale .ROOT ,
550
- "%d agents are still using this model, please delete or update the agents first: %s" ,
542
+ "%d agents are still using this model, please delete or update the agents first, all visible agents are : %s" ,
551
543
hits .length ,
552
544
Arrays .toString (agentIds .toArray (new String [0 ]))
553
545
);
546
+
554
547
}
555
548
556
549
// this method is only to stub static method.
0 commit comments