10
10
import static org .opensearch .ml .common .MLTaskState .COMPLETED ;
11
11
import static org .opensearch .ml .common .MLTaskState .FAILED ;
12
12
import static org .opensearch .ml .plugin .MachineLearningPlugin .INGEST_THREAD_POOL ;
13
+ import static org .opensearch .ml .settings .MLCommonsSettings .ML_COMMONS_BATCH_INGESTION_BULK_SIZE ;
13
14
import static org .opensearch .ml .task .MLTaskManager .TASK_SEMAPHORE_TIMEOUT ;
14
15
import static org .opensearch .ml .utils .MLExceptionUtils .OFFLINE_BATCH_INGESTION_DISABLED_ERR_MSG ;
15
16
24
25
import org .opensearch .action .support .ActionFilters ;
25
26
import org .opensearch .action .support .HandledTransportAction ;
26
27
import org .opensearch .client .Client ;
28
+ import org .opensearch .cluster .service .ClusterService ;
27
29
import org .opensearch .common .inject .Inject ;
30
+ import org .opensearch .common .settings .Settings ;
28
31
import org .opensearch .core .action .ActionListener ;
29
32
import org .opensearch .core .rest .RestStatus ;
30
33
import org .opensearch .ml .common .MLTask ;
@@ -60,16 +63,19 @@ public class TransportBatchIngestionAction extends HandledTransportAction<Action
60
63
private final Client client ;
61
64
private ThreadPool threadPool ;
62
65
private MLFeatureEnabledSetting mlFeatureEnabledSetting ;
66
+ private volatile Integer batchIngestionBulkSize ;
63
67
64
68
@ Inject
65
69
public TransportBatchIngestionAction (
70
+ ClusterService clusterService ,
66
71
TransportService transportService ,
67
72
ActionFilters actionFilters ,
68
73
Client client ,
69
74
MLTaskManager mlTaskManager ,
70
75
ThreadPool threadPool ,
71
76
MLModelManager mlModelManager ,
72
- MLFeatureEnabledSetting mlFeatureEnabledSetting
77
+ MLFeatureEnabledSetting mlFeatureEnabledSetting ,
78
+ Settings settings
73
79
) {
74
80
super (MLBatchIngestionAction .NAME , transportService , actionFilters , MLBatchIngestionRequest ::new );
75
81
this .transportService = transportService ;
@@ -78,6 +84,12 @@ public TransportBatchIngestionAction(
78
84
this .threadPool = threadPool ;
79
85
this .mlModelManager = mlModelManager ;
80
86
this .mlFeatureEnabledSetting = mlFeatureEnabledSetting ;
87
+
88
+ batchIngestionBulkSize = ML_COMMONS_BATCH_INGESTION_BULK_SIZE .get (settings );
89
+ clusterService
90
+ .getClusterSettings ()
91
+ .addSettingsUpdateConsumer (ML_COMMONS_BATCH_INGESTION_BULK_SIZE , it -> batchIngestionBulkSize = it );
92
+
81
93
}
82
94
83
95
@ Override
@@ -131,33 +143,45 @@ protected void createMLTaskandExecute(MLBatchIngestionInput mlBatchIngestionInpu
131
143
.state (MLTaskState .CREATED )
132
144
.build ();
133
145
134
- mlTaskManager .createMLTask (mlTask , ActionListener .wrap (response -> {
135
- String taskId = response .getId ();
136
- try {
137
- mlTask .setTaskId (taskId );
138
- mlTaskManager .add (mlTask );
139
- listener .onResponse (new MLBatchIngestionResponse (taskId , MLTaskType .BATCH_INGEST , MLTaskState .CREATED .name ()));
140
- String ingestType = (String ) mlBatchIngestionInput .getDataSources ().get (TYPE );
141
- Ingestable ingestable = MLEngineClassLoader .initInstance (ingestType .toLowerCase (), client , Client .class );
142
- threadPool .executor (INGEST_THREAD_POOL ).execute (() -> {
143
- executeWithErrorHandling (() -> {
144
- double successRate = ingestable .ingest (mlBatchIngestionInput );
145
- handleSuccessRate (successRate , taskId );
146
- }, taskId );
147
- });
148
- } catch (Exception ex ) {
149
- log .error ("Failed in batch ingestion" , ex );
150
- mlTaskManager
151
- .updateMLTask (
152
- taskId ,
153
- Map .of (STATE_FIELD , FAILED , ERROR_FIELD , MLExceptionUtils .getRootCauseMessage (ex )),
154
- TASK_SEMAPHORE_TIMEOUT ,
155
- true
156
- );
157
- listener .onFailure (ex );
146
+ mlModelManager .checkMaxBatchJobTask (mlTask , ActionListener .wrap (exceedLimits -> {
147
+ if (exceedLimits ) {
148
+ String error =
149
+ "Exceeded maximum limit for BATCH_INGEST tasks. To increase the limit, update the plugins.ml_commons.max_batch_ingestion_tasks setting." ;
150
+ log .warn (error + " in task " + mlTask .getTaskId ());
151
+ listener .onFailure (new OpenSearchStatusException (error , RestStatus .TOO_MANY_REQUESTS ));
152
+ } else {
153
+ mlTaskManager .createMLTask (mlTask , ActionListener .wrap (response -> {
154
+ String taskId = response .getId ();
155
+ try {
156
+ mlTask .setTaskId (taskId );
157
+ mlTaskManager .add (mlTask );
158
+ listener .onResponse (new MLBatchIngestionResponse (taskId , MLTaskType .BATCH_INGEST , MLTaskState .CREATED .name ()));
159
+ String ingestType = (String ) mlBatchIngestionInput .getDataSources ().get (TYPE );
160
+ Ingestable ingestable = MLEngineClassLoader .initInstance (ingestType .toLowerCase (), client , Client .class );
161
+ threadPool .executor (INGEST_THREAD_POOL ).execute (() -> {
162
+ executeWithErrorHandling (() -> {
163
+ double successRate = ingestable .ingest (mlBatchIngestionInput , batchIngestionBulkSize );
164
+ handleSuccessRate (successRate , taskId );
165
+ }, taskId );
166
+ });
167
+ } catch (Exception ex ) {
168
+ log .error ("Failed in batch ingestion" , ex );
169
+ mlTaskManager
170
+ .updateMLTask (
171
+ taskId ,
172
+ Map .of (STATE_FIELD , FAILED , ERROR_FIELD , MLExceptionUtils .getRootCauseMessage (ex )),
173
+ TASK_SEMAPHORE_TIMEOUT ,
174
+ true
175
+ );
176
+ listener .onFailure (ex );
177
+ }
178
+ }, exception -> {
179
+ log .error ("Failed to create batch ingestion task" , exception );
180
+ listener .onFailure (exception );
181
+ }));
158
182
}
159
183
}, exception -> {
160
- log .error ("Failed to create batch ingestion task " , exception );
184
+ log .error ("Failed to check the maximum BATCH_INGEST Task limits " , exception );
161
185
listener .onFailure (exception );
162
186
}));
163
187
}
0 commit comments