16
16
import static org .opensearch .ml .utils .MLNodeUtils .createXContentParserFromRegistry ;
17
17
import static org .opensearch .ml .utils .RestActionUtils .getFetchSourceContext ;
18
18
19
+ import java .util .ArrayDeque ;
19
20
import java .util .ArrayList ;
20
21
import java .util .Arrays ;
22
+ import java .util .Deque ;
21
23
import java .util .List ;
22
24
import java .util .Map ;
23
25
import java .util .Objects ;
24
26
import java .util .concurrent .CountDownLatch ;
25
27
import java .util .concurrent .atomic .AtomicBoolean ;
26
28
import java .util .function .Function ;
27
29
30
+ import org .apache .commons .lang3 .tuple .Pair ;
28
31
import org .opensearch .OpenSearchStatusException ;
29
32
import org .opensearch .ResourceNotFoundException ;
30
33
import org .opensearch .action .ActionRequest ;
@@ -290,21 +293,21 @@ private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean>
290
293
private void checkIngestPipelineBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
291
294
GetPipelineRequest getPipelineRequest = new GetPipelineRequest ();
292
295
client .execute (GetPipelineAction .INSTANCE , getPipelineRequest , ActionListener .wrap (ingestPipelineResponse -> {
293
- List <String > allRelevantPipelineIds = findRelevantPipelines (
296
+ List <String > allDependentPipelineIds = findDependentPipelines (
294
297
ingestPipelineResponse .pipelines (),
295
298
modelId ,
296
299
org .opensearch .ingest .PipelineConfiguration ::getConfigAsMap ,
297
300
org .opensearch .ingest .PipelineConfiguration ::getId
298
301
);
299
- if (allRelevantPipelineIds .isEmpty ()) {
302
+ if (allDependentPipelineIds .isEmpty ()) {
300
303
actionListener .onResponse (true );
301
304
} else {
302
305
actionListener
303
306
.onFailure (
304
307
new OpenSearchStatusException (
305
- allRelevantPipelineIds .size ()
308
+ allDependentPipelineIds .size ()
306
309
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
307
- + Arrays .toString (allRelevantPipelineIds .toArray (new String [0 ])),
310
+ + Arrays .toString (allDependentPipelineIds .toArray (new String [0 ])),
308
311
RestStatus .CONFLICT
309
312
)
310
313
);
@@ -320,21 +323,21 @@ private void checkIngestPipelineBeforeDeleteModel(String modelId, ActionListener
320
323
private void checkSearchPipelineBeforeDeleteModel (String modelId , ActionListener <Boolean > actionListener ) {
321
324
GetSearchPipelineRequest getSearchPipelineRequest = new GetSearchPipelineRequest ();
322
325
client .execute (GetSearchPipelineAction .INSTANCE , getSearchPipelineRequest , ActionListener .wrap (searchPipelineResponse -> {
323
- List <String > allRelevantPipelineIds = findRelevantPipelines (
326
+ List <String > allDependentPipelineIds = findDependentPipelines (
324
327
searchPipelineResponse .pipelines (),
325
328
modelId ,
326
329
org .opensearch .search .pipeline .PipelineConfiguration ::getConfigAsMap ,
327
330
org .opensearch .search .pipeline .PipelineConfiguration ::getId
328
331
);
329
- if (allRelevantPipelineIds .isEmpty ()) {
332
+ if (allDependentPipelineIds .isEmpty ()) {
330
333
actionListener .onResponse (true );
331
334
} else {
332
335
actionListener
333
336
.onFailure (
334
337
new OpenSearchStatusException (
335
- allRelevantPipelineIds .size ()
338
+ allDependentPipelineIds .size ()
336
339
+ " search pipelines are still using this model, please delete or update the pipelines first: "
337
- + Arrays .toString (allRelevantPipelineIds .toArray (new String [0 ])),
340
+ + Arrays .toString (allDependentPipelineIds .toArray (new String [0 ])),
338
341
RestStatus .CONFLICT
339
342
)
340
343
);
@@ -475,40 +478,57 @@ private Boolean isModelNotDeployed(MLModelState mlModelState) {
475
478
&& !mlModelState .equals (MLModelState .PARTIALLY_DEPLOYED );
476
479
}
477
480
478
- private <T > List <String > findRelevantPipelines (
481
+ private <T > List <String > findDependentPipelines (
479
482
List <T > pipelineConfigurations ,
480
483
String candidateModelId ,
481
484
Function <T , Map <String , Object >> getConfigFunction ,
482
485
Function <T , String > getIdFunction
483
486
) {
484
- List <String > relevantPipelineConfigurations = new ArrayList <>();
487
+ List <String > dependentPipelineConfigurations = new ArrayList <>();
485
488
for (T pipelineConfiguration : pipelineConfigurations ) {
486
489
Map <String , Object > config = getConfigFunction .apply (pipelineConfiguration );
487
490
if (searchThroughConfig (config , candidateModelId , "" )) {
488
- relevantPipelineConfigurations .add (getIdFunction .apply (pipelineConfiguration ));
491
+ dependentPipelineConfigurations .add (getIdFunction .apply (pipelineConfiguration ));
489
492
}
490
493
}
491
- return relevantPipelineConfigurations ;
494
+ return dependentPipelineConfigurations ;
492
495
}
493
496
497
+ // This method is to go through the pipeline configs and only when the key is model id and value is
498
+ // 1. String and equal to candidate id 2. A list of String containing candidate id We will return True. Otherwise False
494
499
private Boolean searchThroughConfig (Object searchCandidate , String candidateId , String targetModelKey ) {
495
- boolean flag = false ;
496
- if (searchCandidate instanceof String
497
- && Objects .equals (targetModelKey , PIPELINE_TARGET_MODEL_KEY )
498
- && Objects .equals (candidateId , searchCandidate )) {
499
- return true ;
500
- } else if (searchCandidate instanceof List <?>) {
501
- for (Object v : (List <?>) searchCandidate ) {
502
- flag = flag || searchThroughConfig (v , candidateId , targetModelKey );
503
- }
504
- } else if (searchCandidate instanceof Map <?, ?>) {
505
- for (Map .Entry <String , Object > entry : ((Map <String , Object >) searchCandidate ).entrySet ()) {
506
- String key = entry .getKey ();
507
- Object value = entry .getValue ();
508
- flag = flag || searchThroughConfig (value , candidateId , key );
500
+ // Use a stack to store the elements to be processed
501
+ Deque <Pair <String , Object >> stack = new ArrayDeque <>();
502
+ stack .push (Pair .of (targetModelKey , searchCandidate ));
503
+
504
+ while (!stack .isEmpty ()) {
505
+ // Pop an item from the stack
506
+ Pair <String , Object > current = stack .pop ();
507
+ String currentKey = current .getLeft ();
508
+ Object currentCandidate = current .getRight ();
509
+
510
+ if (currentCandidate instanceof String ) {
511
+ // Check for a match
512
+ if (Objects .equals (currentKey , PIPELINE_TARGET_MODEL_KEY ) && Objects .equals (candidateId , currentCandidate )) {
513
+ return true ;
514
+ }
515
+ } else if (currentCandidate instanceof List <?>) {
516
+ // Push all elements in the list onto the stack
517
+ for (Object v : (List <?>) currentCandidate ) {
518
+ stack .push (Pair .of (currentKey , v ));
519
+ }
520
+ } else if (currentCandidate instanceof Map <?, ?>) {
521
+ // Push all values in the map onto the stack
522
+ for (Map .Entry <?, ?> entry : ((Map <?, ?>) currentCandidate ).entrySet ()) {
523
+ String key = (String ) entry .getKey ();
524
+ Object value = entry .getValue ();
525
+ stack .push (Pair .of (key , value ));
526
+ }
509
527
}
510
528
}
511
- return flag ;
529
+
530
+ // If no match is found
531
+ return false ;
512
532
}
513
533
514
534
// this method is only to stub static method.
0 commit comments