Skip to content

Commit 525926f

Browse files
committed
fix bug to fetch all pipelines
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent ef68097 commit 525926f

File tree

2 files changed

+82
-52
lines changed

2 files changed

+82
-52
lines changed

plugin/src/main/java/org/opensearch/ml/action/models/DeleteModelTransportAction.java

+38-50
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
171171
);
172172
} else if (isModelNotDeployed(mlModelState)) {
173173
checkDownstreamTaskBeforeDeleteModel(modelId, isHidden, actionListener);
174-
;
175174
} else {
176175
wrappedListener
177176
.onFailure(
@@ -291,29 +290,26 @@ private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean>
291290
private void checkIngestPipelineBeforeDeleteModel(String modelId, ActionListener<Boolean> actionListener) {
292291
GetPipelineRequest getPipelineRequest = new GetPipelineRequest();
293292
client.execute(GetPipelineAction.INSTANCE, getPipelineRequest, ActionListener.wrap(ingestPipelineResponse -> {
294-
if (!isPipelineContainsModel(
295-
ingestPipelineResponse.pipelines(),
296-
modelId,
297-
org.opensearch.ingest.PipelineConfiguration::getConfigAsMap
298-
)) {
299-
actionListener.onResponse(true);
300-
} else {
301-
List<String> searchPipelineIds = getAllPipelineIds(
293+
List<String> allRelevantPipelineIds = findRelevantPipelines(
302294
ingestPipelineResponse.pipelines(),
295+
modelId,
296+
org.opensearch.ingest.PipelineConfiguration::getConfigAsMap,
303297
org.opensearch.ingest.PipelineConfiguration::getId
304-
);
298+
);
299+
if (allRelevantPipelineIds.isEmpty()) {
300+
actionListener.onResponse(true);
301+
}
302+
else {
305303
actionListener
306-
.onFailure(
307-
new OpenSearchStatusException(
308-
searchPipelineIds.size()
309-
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
310-
+ Arrays.toString(searchPipelineIds.toArray(new String[0])),
311-
RestStatus.CONFLICT
312-
)
313-
);
314-
304+
.onFailure(
305+
new OpenSearchStatusException(
306+
allRelevantPipelineIds.size()
307+
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
308+
+ Arrays.toString(allRelevantPipelineIds.toArray(new String[0])),
309+
RestStatus.CONFLICT
310+
)
311+
);
315312
}
316-
317313
}, e -> {
318314
log.error("Failed to delete ML Model: " + modelId, e);
319315
actionListener.onFailure(e);
@@ -325,29 +321,26 @@ private void checkIngestPipelineBeforeDeleteModel(String modelId, ActionListener
325321
private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener<Boolean> actionListener) {
326322
GetSearchPipelineRequest getSearchPipelineRequest = new GetSearchPipelineRequest();
327323
client.execute(GetSearchPipelineAction.INSTANCE, getSearchPipelineRequest, ActionListener.wrap(searchPipelineResponse -> {
328-
if (!isPipelineContainsModel(
329-
searchPipelineResponse.pipelines(),
330-
modelId,
331-
org.opensearch.search.pipeline.PipelineConfiguration::getConfigAsMap
332-
)) {
333-
actionListener.onResponse(true);
334-
} else {
335-
List<String> searchPipelineIds = getAllPipelineIds(
324+
List<String> allRelevantPipelineIds = findRelevantPipelines(
336325
searchPipelineResponse.pipelines(),
326+
modelId,
327+
org.opensearch.search.pipeline.PipelineConfiguration::getConfigAsMap,
337328
org.opensearch.search.pipeline.PipelineConfiguration::getId
338-
);
329+
);
330+
if (allRelevantPipelineIds.isEmpty()) {
331+
actionListener.onResponse(true);
332+
}
333+
else {
339334
actionListener
340-
.onFailure(
341-
new OpenSearchStatusException(
342-
searchPipelineIds.size()
343-
+ " search pipelines are still using this model, please delete or update the pipelines first: "
344-
+ Arrays.toString(searchPipelineIds.toArray(new String[0])),
345-
RestStatus.CONFLICT
346-
)
347-
);
348-
335+
.onFailure(
336+
new OpenSearchStatusException(
337+
allRelevantPipelineIds.size()
338+
+ " search pipelines are still using this model, please delete or update the pipelines first: "
339+
+ Arrays.toString(allRelevantPipelineIds.toArray(new String[0])),
340+
RestStatus.CONFLICT
341+
)
342+
);
349343
}
350-
351344
}, e -> {
352345
log.error("Failed to delete ML Model: " + modelId, e);
353346
actionListener.onFailure(e);
@@ -484,18 +477,20 @@ private Boolean isModelNotDeployed(MLModelState mlModelState) {
484477
&& !mlModelState.equals(MLModelState.PARTIALLY_DEPLOYED);
485478
}
486479

487-
private <T> Boolean isPipelineContainsModel(
480+
private <T> List<String> findRelevantPipelines(
488481
List<T> pipelineConfigurations,
489482
String candidateModelId,
490-
Function<T, Map<String, Object>> getConfigFunction
483+
Function<T, Map<String, Object>> getConfigFunction,
484+
Function<T, String> getIdFunction
491485
) {
486+
List<String> relevantPipelineConfigurations = new ArrayList<>();
492487
for (T pipelineConfiguration : pipelineConfigurations) {
493488
Map<String, Object> config = getConfigFunction.apply(pipelineConfiguration);
494489
if (searchThroughConfig(config, candidateModelId, "")) {
495-
return true;
490+
relevantPipelineConfigurations.add(getIdFunction.apply(pipelineConfiguration));
496491
}
497492
}
498-
return false;
493+
return relevantPipelineConfigurations;
499494
}
500495

501496
private Boolean searchThroughConfig(Object searchCandidate, String candidateId, String targetModelKey) {
@@ -518,13 +513,6 @@ private Boolean searchThroughConfig(Object searchCandidate, String candidateId,
518513
return flag;
519514
}
520515

521-
private <T> List<String> getAllPipelineIds(List<T> pipelineConfigurations, Function<T, String> getIdFunction) {
522-
List<String> pipelineIds = new ArrayList<>();
523-
for (T pipelineConfiguration : pipelineConfigurations) {
524-
pipelineIds.add(getIdFunction.apply(pipelineConfiguration));
525-
}
526-
return pipelineIds;
527-
}
528516

529517
// this method is only to stub static method.
530518
@VisibleForTesting

plugin/src/test/java/org/opensearch/ml/action/models/DeleteModelTransportActionTests.java

+44-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.nio.charset.StandardCharsets;
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
27+
import java.util.HashMap;
2728
import java.util.List;
2829
import java.util.Map;
2930

@@ -188,11 +189,45 @@ public void testDeleteModel_Success() throws IOException {
188189
verify(actionListener).onResponse(deleteResponse);
189190
}
190191

192+
public void testDeleteModel_BlockedBySearchPipelineAndIngestionPipeline() throws IOException {
193+
when(searchPipelineConfiguration.getId()).thenReturn("1");
194+
when(searchPipelineConfiguration.getConfigAsMap()).thenReturn(configDataMap);
195+
when(getSearchPipelineResponse.pipelines()).thenReturn(List.of(searchPipelineConfiguration));
196+
doAnswer(invocation -> {
197+
ActionListener<GetSearchPipelineResponse> listener = invocation.getArgument(2);
198+
listener.onResponse(getSearchPipelineResponse);
199+
return null;
200+
}).when(client).execute(eq(GetSearchPipelineAction.INSTANCE), any(), any());
201+
202+
org.opensearch.ingest.PipelineConfiguration ingestPipelineConfiguration = new org.opensearch.ingest.PipelineConfiguration(
203+
"1",
204+
new BytesArray("{\"model_id\": \"test_id\"}".getBytes(StandardCharsets.UTF_8)),
205+
MediaTypeRegistry.JSON
206+
);
207+
when(getIngestionPipelineResponse.pipelines()).thenReturn(List.of(ingestPipelineConfiguration));
208+
doAnswer(invocation -> {
209+
ActionListener<GetPipelineResponse> listener = invocation.getArgument(2);
210+
listener.onResponse(getIngestionPipelineResponse);
211+
return null;
212+
}).when(client).execute(eq(GetPipelineAction.INSTANCE), any(), any());
213+
214+
deleteModelTransportAction.doExecute(null, mlModelDeleteRequest, actionListener);
215+
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
216+
verify(actionListener).onFailure(argumentCaptor.capture());
217+
assertEquals("1 ingest pipelines are still using this model, please delete or update the pipelines first: [1],1 search pipelines are still using this model, please delete or update the pipelines first: [1]", argumentCaptor.getValue().getMessage());
218+
}
219+
191220
public void testDeleteModel_BlockedBySearchPipeline() throws IOException {
192221
//org.opensearch.search.pipeline.PipelineConfiguration pipelineConfiguration = new PipelineConfiguration();
193222
when(searchPipelineConfiguration.getId()).thenReturn("1");
194223
when(searchPipelineConfiguration.getConfigAsMap()).thenReturn(configDataMap);
195-
when(getSearchPipelineResponse.pipelines()).thenReturn(List.of(searchPipelineConfiguration));
224+
225+
org.opensearch.search.pipeline.PipelineConfiguration irrelevantSearchPipelineConfiguration = mock(org.opensearch.search.pipeline.PipelineConfiguration.class);
226+
Map<String, Object> irrelevantConfigMap = new HashMap<>();
227+
irrelevantConfigMap.put("nothing", "nothing");
228+
when(irrelevantSearchPipelineConfiguration.getConfigAsMap()).thenReturn(irrelevantConfigMap);
229+
when(irrelevantSearchPipelineConfiguration.getId()).thenReturn("2");
230+
when(getSearchPipelineResponse.pipelines()).thenReturn(List.of(searchPipelineConfiguration, irrelevantSearchPipelineConfiguration));
196231
doAnswer(invocation -> {
197232
ActionListener<GetSearchPipelineResponse> listener = invocation.getArgument(2);
198233
listener.onResponse(getSearchPipelineResponse);
@@ -211,7 +246,14 @@ public void testDeleteModel_BlockedByIngestPipeline() throws IOException {
211246
new BytesArray("{\"model_id\": \"test_id\"}".getBytes(StandardCharsets.UTF_8)),
212247
MediaTypeRegistry.JSON
213248
);
214-
when(getIngestionPipelineResponse.pipelines()).thenReturn(List.of(ingestPipelineConfiguration));
249+
250+
org.opensearch.ingest.PipelineConfiguration irrelevantIngestPipelineConfiguration = new org.opensearch.ingest.PipelineConfiguration(
251+
"2",
252+
new BytesArray("{\"nothing\": \"test_id\"}".getBytes(StandardCharsets.UTF_8)),
253+
MediaTypeRegistry.JSON
254+
);
255+
256+
when(getIngestionPipelineResponse.pipelines()).thenReturn(List.of(ingestPipelineConfiguration, irrelevantIngestPipelineConfiguration));
215257
doAnswer(invocation -> {
216258
ActionListener<GetPipelineResponse> listener = invocation.getArgument(2);
217259
listener.onResponse(getIngestionPipelineResponse);

0 commit comments

Comments
 (0)