12
12
import java .util .ArrayList ;
13
13
import java .util .Arrays ;
14
14
import java .util .List ;
15
- import java .util .Map ;
16
15
import java .util .Optional ;
17
16
import java .util .Queue ;
18
17
import java .util .concurrent .ConcurrentLinkedQueue ;
31
30
import org .opensearch .core .common .Strings ;
32
31
import org .opensearch .index .IndexNotFoundException ;
33
32
import org .opensearch .index .query .TermsQueryBuilder ;
34
- import org .opensearch .ml .common .FunctionName ;
35
33
import org .opensearch .ml .common .MLModel ;
36
34
import org .opensearch .ml .common .model .MLModelState ;
37
35
import org .opensearch .ml .common .transport .deploy .MLDeployModelAction ;
@@ -186,6 +184,9 @@ private void triggerAutoDeployModels(List<String> addedNodes) {
186
184
modelAutoRedeployArrangements .add (modelAutoRedeployArrangement );
187
185
});
188
186
redeployAModel ();
187
+ } else {
188
+ log .info ("Could not find any models in the index, not performing auto reloading!" );
189
+ startCronjobAndClearListener ();
189
190
}
190
191
}, e -> {
191
192
if (e instanceof IndexNotFoundException ) {
@@ -241,9 +242,7 @@ private void queryRunningModels(ActionListener<SearchResponse> listener) {
241
242
String [] includes = new String [] {
242
243
MLModel .AUTO_REDEPLOY_RETRY_TIMES_FIELD ,
243
244
MLModel .PLANNING_WORKER_NODES_FIELD ,
244
- MLModel .DEPLOY_TO_ALL_NODES_FIELD ,
245
- MLModel .FUNCTION_NAME_FIELD ,
246
- MLModel .ALGORITHM_FIELD };
245
+ MLModel .DEPLOY_TO_ALL_NODES_FIELD };
247
246
248
247
String [] excludes = new String [] { MLModel .MODEL_CONTENT_FIELD , MLModel .OLD_MODEL_CONTENT_FIELD };
249
248
FetchSourceContext fetchContext = new FetchSourceContext (true , includes , excludes );
@@ -261,29 +260,22 @@ private void queryRunningModels(ActionListener<SearchResponse> listener) {
261
260
private void triggerModelRedeploy (ModelAutoRedeployArrangement modelAutoRedeployArrangement ) {
262
261
if (modelAutoRedeployArrangement == null ) {
263
262
log .info ("No more models in arrangement, skipping the redeployment" );
263
+ startCronjobAndClearListener ();
264
264
return ;
265
265
}
266
266
String modelId = modelAutoRedeployArrangement .getSearchResponse ().getId ();
267
267
List <String > addedNodes = modelAutoRedeployArrangement .getAddedNodes ();
268
- Map <String , Object > sourceAsMap = modelAutoRedeployArrangement .getSearchResponse ().getSourceAsMap ();
269
- String functionName = (String ) Optional
270
- .ofNullable (sourceAsMap .get (MLModel .FUNCTION_NAME_FIELD ))
271
- .orElse (sourceAsMap .get (MLModel .ALGORITHM_FIELD ));
272
- if (functionName == null ) {
273
- log
274
- .error (
275
- "Model function_name or algorithm is null, model is not in correct status, please check the model, model id is: {}" ,
276
- modelId
277
- );
278
- return ;
279
- }
280
- if (FunctionName .REMOTE == FunctionName .from (functionName )) {
281
- log .info ("Skipping redeploying remote model {} as remote model deployment can be done at prediction time." , modelId );
282
- return ;
283
- }
284
- List <String > planningWorkerNodes = (List <String >) sourceAsMap .get (MLModel .PLANNING_WORKER_NODES_FIELD );
285
- Integer autoRedeployRetryTimes = (Integer ) sourceAsMap .get (MLModel .AUTO_REDEPLOY_RETRY_TIMES_FIELD );
286
- Boolean deployToAllNodes = (Boolean ) Optional .ofNullable (sourceAsMap .get (MLModel .DEPLOY_TO_ALL_NODES_FIELD )).orElse (false );
268
+ List <String > planningWorkerNodes = (List <String >) modelAutoRedeployArrangement
269
+ .getSearchResponse ()
270
+ .getSourceAsMap ()
271
+ .get (MLModel .PLANNING_WORKER_NODES_FIELD );
272
+ Integer autoRedeployRetryTimes = (Integer ) modelAutoRedeployArrangement
273
+ .getSearchResponse ()
274
+ .getSourceAsMap ()
275
+ .get (MLModel .AUTO_REDEPLOY_RETRY_TIMES_FIELD );
276
+ Boolean deployToAllNodes = (Boolean ) Optional
277
+ .ofNullable (modelAutoRedeployArrangement .getSearchResponse ().getSourceAsMap ().get (MLModel .DEPLOY_TO_ALL_NODES_FIELD ))
278
+ .orElse (false );
287
279
// calculate node ids.
288
280
String [] nodeIds = null ;
289
281
if (deployToAllNodes || !allowCustomDeploymentPlan ) {
@@ -302,6 +294,7 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy
302
294
.info (
303
295
"Allow custom deployment plan is true and deploy to all nodes is false and added nodes are not in planning worker nodes list, not to auto redeploy the model to the new nodes!"
304
296
);
297
+ redeployAModel ();
305
298
return ;
306
299
}
307
300
0 commit comments