Skip to content

Commit b963587

Browse files
committed
rename and add more UTs
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent b4adbc7 commit b963587

File tree

2 files changed

+164
-46
lines changed

2 files changed

+164
-46
lines changed

plugin/src/main/java/org/opensearch/ml/action/models/DeleteModelTransportAction.java

+50-28
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.util.Arrays;
2222
import java.util.Deque;
2323
import java.util.List;
24+
import java.util.Locale;
2425
import java.util.Map;
2526
import java.util.Objects;
27+
import java.util.concurrent.CopyOnWriteArrayList;
2628
import java.util.concurrent.CountDownLatch;
2729
import java.util.concurrent.atomic.AtomicBoolean;
2830
import java.util.function.Function;
@@ -264,19 +266,9 @@ private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean>
264266
if (searchHits.length == 0) {
265267
actionListener.onResponse(true);
266268
} else {
267-
List<String> relatedAgents = new ArrayList<>();
268-
for (SearchHit hit : searchHits) {
269-
relatedAgents.add(hit.getId());
270-
}
271-
actionListener
272-
.onFailure(
273-
new OpenSearchStatusException(
274-
searchHits.length
275-
+ " agents are still using this model, please delete or update the agents first: "
276-
+ Arrays.toString(relatedAgents.toArray(new String[0])),
277-
RestStatus.CONFLICT
278-
)
279-
);
269+
String errorMessage = formatAgentErrorMessage(searchHits);
270+
271+
actionListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.CONFLICT));
280272
}
281273

282274
}, e -> {
@@ -305,9 +297,13 @@ private void checkIngestPipelineBeforeDeleteModel(String modelId, ActionListener
305297
actionListener
306298
.onFailure(
307299
new OpenSearchStatusException(
308-
allDependentPipelineIds.size()
309-
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
310-
+ Arrays.toString(allDependentPipelineIds.toArray(new String[0])),
300+
String
301+
.format(
302+
Locale.ROOT,
303+
"%d ingest pipelines are still using this model, please delete or update the pipelines first: %s",
304+
allDependentPipelineIds.size(),
305+
Arrays.toString(allDependentPipelineIds.toArray(new String[0]))
306+
),
311307
RestStatus.CONFLICT
312308
)
313309
);
@@ -335,9 +331,13 @@ private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener
335331
actionListener
336332
.onFailure(
337333
new OpenSearchStatusException(
338-
allDependentPipelineIds.size()
339-
+ " search pipelines are still using this model, please delete or update the pipelines first: "
340-
+ Arrays.toString(allDependentPipelineIds.toArray(new String[0])),
334+
String
335+
.format(
336+
Locale.ROOT,
337+
"%d search pipelines are still using this model, please delete or update the pipelines first: %s",
338+
allDependentPipelineIds.size(),
339+
Arrays.toString(allDependentPipelineIds.toArray(new String[0]))
340+
),
341341
RestStatus.CONFLICT
342342
)
343343
);
@@ -353,24 +353,23 @@ private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener
353353
private void checkDownstreamTaskBeforeDeleteModel(String modelId, Boolean isHidden, ActionListener<DeleteResponse> actionListener) {
354354
CountDownLatch countDownLatch = new CountDownLatch(3);
355355
AtomicBoolean noneBlocked = new AtomicBoolean(true);
356-
List<String> errorMessages = new ArrayList<>();
356+
CopyOnWriteArrayList<String> errorMessages = new CopyOnWriteArrayList<>();
357357
ActionListener<Boolean> countDownActionListener = ActionListener.wrap(b -> {
358358
countDownLatch.countDown();
359359
noneBlocked.compareAndSet(true, b);
360360
if (countDownLatch.getCount() == 0) {
361361
if (noneBlocked.get()) {
362362
deleteModel(modelId, isHidden, actionListener);
363363
} else {
364-
actionListener.onFailure(new OpenSearchStatusException(String.join(",", errorMessages), RestStatus.CONFLICT));
364+
actionListener.onFailure(new OpenSearchStatusException(String.join(". ", errorMessages), RestStatus.CONFLICT));
365365
}
366366
}
367367
}, e -> {
368368
countDownLatch.countDown();
369369
noneBlocked.compareAndSet(true, false);
370-
// actionListener.onFailure(e);
371370
errorMessages.add(e.getMessage());
372371
if (countDownLatch.getCount() == 0) {
373-
actionListener.onFailure(new OpenSearchStatusException(String.join(",", errorMessages), RestStatus.CONFLICT));
372+
actionListener.onFailure(new OpenSearchStatusException(String.join(". ", errorMessages), RestStatus.CONFLICT));
374373
}
375374

376375
});
@@ -487,19 +486,19 @@ private <T> List<String> findDependentPipelines(
487486
List<String> dependentPipelineConfigurations = new ArrayList<>();
488487
for (T pipelineConfiguration : pipelineConfigurations) {
489488
Map<String, Object> config = getConfigFunction.apply(pipelineConfiguration);
490-
if (searchThroughConfig(config, candidateModelId, "")) {
489+
if (searchThroughConfig(config, candidateModelId)) {
491490
dependentPipelineConfigurations.add(getIdFunction.apply(pipelineConfiguration));
492491
}
493492
}
494493
return dependentPipelineConfigurations;
495494
}
496495

497-
// This method is to go through the pipeline configs and only when the key is model id and value is
498-
// 1. String and equal to candidate id 2. A list of String containing candidate id We will return True. Otherwise False
499-
private Boolean searchThroughConfig(Object searchCandidate, String candidateId, String targetModelKey) {
496+
// This method is to go through the pipeline configs and he configuration is a map of string to objects.
497+
// Objects can be a list or a map. we will search exhaustively through the configuration for any match of the candidateId.
498+
private Boolean searchThroughConfig(Object searchCandidate, String candidateId) {
500499
// Use a stack to store the elements to be processed
501500
Deque<Pair<String, Object>> stack = new ArrayDeque<>();
502-
stack.push(Pair.of(targetModelKey, searchCandidate));
501+
stack.push(Pair.of("", searchCandidate));
503502

504503
while (!stack.isEmpty()) {
505504
// Pop an item from the stack
@@ -531,6 +530,29 @@ private Boolean searchThroughConfig(Object searchCandidate, String candidateId,
531530
return false;
532531
}
533532

533+
private String formatAgentErrorMessage(SearchHit[] hits) {
534+
boolean isHidden = false;
535+
for (SearchHit hit : hits) {
536+
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
537+
isHidden = isHidden || Boolean.parseBoolean((String) sourceAsMap.getOrDefault(IS_HIDDEN_FIELD, false));
538+
}
539+
if (isHidden) {
540+
return String
541+
.format(Locale.ROOT, "%d agents are still using this model, please delete or update the agents first", hits.length);
542+
}
543+
List<String> agentIds = new ArrayList<>();
544+
for (SearchHit hit : hits) {
545+
agentIds.add(hit.getId());
546+
}
547+
return String
548+
.format(
549+
Locale.ROOT,
550+
"%d agents are still using this model, please delete or update the agents first: %s",
551+
hits.length,
552+
Arrays.toString(agentIds.toArray(new String[0]))
553+
);
554+
}
555+
534556
// this method is only to stub static method.
535557
@VisibleForTesting
536558
boolean isSuperAdminUserWrapper(ClusterService clusterService, Client client) {

0 commit comments

Comments
 (0)