Skip to content

Commit 88d5fd5

Browse files
committed
refactor to parallel check
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 56ec29f commit 88d5fd5

File tree

3 files changed

+129
-113
lines changed

3 files changed

+129
-113
lines changed

plugin/src/main/java/org/opensearch/ml/action/models/DeleteModelTransportAction.java

+126-110
Original file line numberDiff line numberDiff line change
@@ -116,123 +116,15 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
116116
MLModelDeleteRequest mlModelDeleteRequest = MLModelDeleteRequest.fromActionRequest(request);
117117
String modelId = mlModelDeleteRequest.getModelId();
118118
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
119-
// check whether agent are using them
120-
SearchRequest searchAgentRequest = agentModelsSearcher.constructQueryRequest(modelId);
121-
client.search(searchAgentRequest, ActionListener.runBefore(ActionListener.wrap(searchResponse -> {
122-
SearchHit[] searchHits = searchResponse.getHits().getHits();
123-
if (searchHits.length == 0) {
124-
checkPipelineAndDelete(modelId, actionListener);
125-
} else {
126-
List<String> relatedAgents = new ArrayList<>();
127-
for (SearchHit hit : searchHits) {
128-
relatedAgents.add(hit.getId());
129-
}
130-
actionListener
131-
.onFailure(
132-
new OpenSearchStatusException(
133-
searchHits.length
134-
+ " agents are still using this model, please delete or update the agents first: "
135-
+ Arrays.toString(relatedAgents.toArray(new String[0])),
136-
RestStatus.CONFLICT
137-
)
138-
);
139-
}
140-
141-
}, e -> {
142-
if (e instanceof IndexNotFoundException) {
143-
checkPipelineAndDelete(modelId, actionListener);
144-
return;
145-
}
146-
log.error("Failed to delete ML Model: " + modelId, e);
147-
actionListener.onFailure(e);
148-
149-
}), () -> context.restore()));
119+
ActionListener<DeleteResponse> wrappedListener = ActionListener.runBefore(actionListener, () -> context.restore());
120+
checkDownstreamTaskBeforeDeleteModel(modelId, wrappedListener);
150121
} catch (Exception e) {
151122
log.error(e.getMessage(), e);
152123
actionListener.onFailure(e);
153124
}
154125

155126
}
156127

157-
private void checkPipelineAndDelete(String modelId, ActionListener<DeleteResponse> actionListener) {
158-
GetSearchPipelineRequest getSearchPipelineRequest = new GetSearchPipelineRequest();
159-
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
160-
// Search for whether search pipeline uses the model
161-
client
162-
.execute(
163-
GetSearchPipelineAction.INSTANCE,
164-
getSearchPipelineRequest,
165-
ActionListener.runBefore(ActionListener.wrap(searchPipelineResponse -> {
166-
if (!isPipelineContainsModel(
167-
searchPipelineResponse.pipelines(),
168-
modelId,
169-
org.opensearch.search.pipeline.PipelineConfiguration::getConfigAsMap
170-
)) {
171-
GetPipelineRequest getPipelineRequest = new GetPipelineRequest();
172-
// // Search for whether ingest pipeline uses the model
173-
client
174-
.execute(
175-
GetPipelineAction.INSTANCE,
176-
getPipelineRequest,
177-
ActionListener.runBefore(ActionListener.wrap(ingestPipelineResponse -> {
178-
if (!isPipelineContainsModel(
179-
ingestPipelineResponse.pipelines(),
180-
modelId,
181-
org.opensearch.ingest.PipelineConfiguration::getConfigAsMap
182-
)) {
183-
doDeleteModel(modelId, actionListener);
184-
} else {
185-
List<String> searchPipelineIds = getAllPipelineIds(
186-
ingestPipelineResponse.pipelines(),
187-
org.opensearch.ingest.PipelineConfiguration::getId
188-
);
189-
actionListener
190-
.onFailure(
191-
new OpenSearchStatusException(
192-
searchPipelineIds.size()
193-
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
194-
+ Arrays.toString(searchPipelineIds.toArray(new String[0])),
195-
RestStatus.CONFLICT
196-
)
197-
);
198-
199-
}
200-
201-
}, e -> {
202-
log.error("Failed to delete ML Model: " + modelId, e);
203-
actionListener.onFailure(e);
204-
205-
}), () -> context.restore())
206-
);
207-
} else {
208-
List<String> ingestPipelineIds = getAllPipelineIds(
209-
searchPipelineResponse.pipelines(),
210-
org.opensearch.search.pipeline.PipelineConfiguration::getId
211-
);
212-
actionListener
213-
.onFailure(
214-
new OpenSearchStatusException(
215-
ingestPipelineIds.size()
216-
+ " search pipelines are still using this model, please delete or update the pipelines first: "
217-
+ Arrays.toString(ingestPipelineIds.toArray(new String[0])),
218-
RestStatus.CONFLICT
219-
)
220-
);
221-
222-
}
223-
224-
}, e -> {
225-
log.error("Failed to delete ML Model: " + modelId, e);
226-
actionListener.onFailure(e);
227-
228-
}), () -> context.restore())
229-
);
230-
} catch (Exception e) {
231-
log.error(e.getMessage(), e);
232-
actionListener.onFailure(e);
233-
}
234-
}
235-
236128
private void doDeleteModel(String modelId, ActionListener<DeleteResponse> actionListener) {
237129
MLModelGetRequest mlModelGetRequest = new MLModelGetRequest(modelId, false, false);
238130
FetchSourceContext fetchSourceContext = getFetchSourceContext(mlModelGetRequest.isReturnContent());
@@ -370,6 +262,130 @@ public void onFailure(Exception e) {
370262
});
371263
}
372264

265+
private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean> actionListener) {
266+
// check whether agent are using them
267+
SearchRequest searchAgentRequest = agentModelsSearcher.constructQueryRequest(modelId);
268+
client.search(searchAgentRequest, ActionListener.wrap(searchResponse -> {
269+
SearchHit[] searchHits = searchResponse.getHits().getHits();
270+
if (searchHits.length == 0) {
271+
actionListener.onResponse(true);
272+
} else {
273+
List<String> relatedAgents = new ArrayList<>();
274+
for (SearchHit hit : searchHits) {
275+
relatedAgents.add(hit.getId());
276+
}
277+
actionListener
278+
.onFailure(
279+
new OpenSearchStatusException(
280+
searchHits.length
281+
+ " agents are still using this model, please delete or update the agents first: "
282+
+ Arrays.toString(relatedAgents.toArray(new String[0])),
283+
RestStatus.CONFLICT
284+
)
285+
);
286+
}
287+
288+
}, e -> {
289+
if (e instanceof IndexNotFoundException) {
290+
actionListener.onResponse(true);
291+
return;
292+
}
293+
log.error("Failed to delete ML Model: " + modelId, e);
294+
actionListener.onFailure(e);
295+
296+
}));
297+
}
298+
299+
private void checkIngestPipelineBeforeDeleteModel(String modelId, ActionListener<Boolean> actionListener) {
300+
GetPipelineRequest getPipelineRequest = new GetPipelineRequest();
301+
client.execute(GetPipelineAction.INSTANCE, getPipelineRequest, ActionListener.wrap(ingestPipelineResponse -> {
302+
if (!isPipelineContainsModel(
303+
ingestPipelineResponse.pipelines(),
304+
modelId,
305+
org.opensearch.ingest.PipelineConfiguration::getConfigAsMap
306+
)) {
307+
actionListener.onResponse(true);
308+
} else {
309+
List<String> searchPipelineIds = getAllPipelineIds(
310+
ingestPipelineResponse.pipelines(),
311+
org.opensearch.ingest.PipelineConfiguration::getId
312+
);
313+
actionListener
314+
.onFailure(
315+
new OpenSearchStatusException(
316+
searchPipelineIds.size()
317+
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
318+
+ Arrays.toString(searchPipelineIds.toArray(new String[0])),
319+
RestStatus.CONFLICT
320+
)
321+
);
322+
323+
}
324+
325+
}, e -> {
326+
log.error("Failed to delete ML Model: " + modelId, e);
327+
actionListener.onFailure(e);
328+
329+
}));
330+
331+
}
332+
333+
private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener<Boolean> actionListener) {
334+
GetSearchPipelineRequest getSearchPipelineRequest = new GetSearchPipelineRequest();
335+
client.execute(GetSearchPipelineAction.INSTANCE, getSearchPipelineRequest, ActionListener.wrap(searchPipelineResponse -> {
336+
if (!isPipelineContainsModel(
337+
searchPipelineResponse.pipelines(),
338+
modelId,
339+
org.opensearch.search.pipeline.PipelineConfiguration::getConfigAsMap
340+
)) {
341+
actionListener.onResponse(true);
342+
} else {
343+
List<String> searchPipelineIds = getAllPipelineIds(
344+
searchPipelineResponse.pipelines(),
345+
org.opensearch.search.pipeline.PipelineConfiguration::getId
346+
);
347+
actionListener
348+
.onFailure(
349+
new OpenSearchStatusException(
350+
searchPipelineIds.size()
351+
+ " search pipelines are still using this model, please delete or update the pipelines first: "
352+
+ Arrays.toString(searchPipelineIds.toArray(new String[0])),
353+
RestStatus.CONFLICT
354+
)
355+
);
356+
357+
}
358+
359+
}, e -> {
360+
log.error("Failed to delete ML Model: " + modelId, e);
361+
actionListener.onFailure(e);
362+
363+
}));
364+
365+
}
366+
367+
private void checkDownstreamTaskBeforeDeleteModel(String modelId, ActionListener<DeleteResponse> actionListener) {
368+
CountDownLatch countDownLatch = new CountDownLatch(3);
369+
AtomicBoolean noneBlocked = new AtomicBoolean(true);
370+
ActionListener<Boolean> countDownActionListener = ActionListener.wrap(b -> {
371+
countDownLatch.countDown();
372+
noneBlocked.compareAndSet(true, b);
373+
if (countDownLatch.getCount() == 0) {
374+
if (noneBlocked.get()) {
375+
doDeleteModel(modelId, actionListener);
376+
}
377+
}
378+
}, e -> {
379+
countDownLatch.countDown();
380+
noneBlocked.compareAndSet(true, false);
381+
actionListener.onFailure(e);
382+
383+
});
384+
checkAgentBeforeDeleteModel(modelId, countDownActionListener);
385+
checkIngestPipelineBeforeDeleteModel(modelId, countDownActionListener);
386+
checkSearchPipelineBeforeDeleteModel(modelId, countDownActionListener);
387+
}
388+
373389
private void deleteModelChunksAndController(
374390
ActionListener<DeleteResponse> actionListener,
375391
String modelId,

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,12 @@
178178
import org.opensearch.ml.engine.indices.MLInputDatasetHandler;
179179
import org.opensearch.ml.engine.memory.ConversationIndexMemory;
180180
import org.opensearch.ml.engine.memory.MLMemoryManager;
181+
import org.opensearch.ml.engine.tools.AgentModelsSearcher;
181182
import org.opensearch.ml.engine.tools.AgentTool;
182183
import org.opensearch.ml.engine.tools.CatIndexTool;
183184
import org.opensearch.ml.engine.tools.ConnectorTool;
184185
import org.opensearch.ml.engine.tools.IndexMappingTool;
185186
import org.opensearch.ml.engine.tools.MLModelTool;
186-
import org.opensearch.ml.engine.tools.AgentModelsSearcher;
187187
import org.opensearch.ml.engine.tools.SearchIndexTool;
188188
import org.opensearch.ml.engine.tools.VisualizationsTool;
189189
import org.opensearch.ml.helper.ConnectorAccessControlHelper;
@@ -686,7 +686,7 @@ public Collection<Object> createComponents(
686686
mlStats,
687687
mlTaskManager,
688688
mlModelManager,
689-
agentModelsSearcher,
689+
agentModelsSearcher,
690690
mlIndicesHandler,
691691
mlInputDatasetHandler,
692692
mlTrainingTaskRunner,

plugin/src/test/java/org/opensearch/ml/action/models/DeleteModelTransportActionTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void setup() throws IOException {
153153
xContentRegistry,
154154
clusterService,
155155
modelAccessControlHelper,
156-
agentModelsSearcher
156+
agentModelsSearcher
157157
)
158158
);
159159

0 commit comments

Comments
 (0)