Skip to content

Commit b06e298

Browse files
committed
add feature flag for offline batch ingestion
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 6a6cac1 commit b06e298

File tree

6 files changed

+55
-3
lines changed

6 files changed

+55
-3
lines changed

plugin/src/main/java/org/opensearch/ml/action/batch/TransportBatchIngestionAction.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.opensearch.ml.common.MLTaskState.FAILED;
1212
import static org.opensearch.ml.plugin.MachineLearningPlugin.INGEST_THREAD_POOL;
1313
import static org.opensearch.ml.task.MLTaskManager.TASK_SEMAPHORE_TIMEOUT;
14+
import static org.opensearch.ml.utils.MLExceptionUtils.OFFLINE_BATCH_INGESTION_DISABLED_ERR_MSG;
1415

1516
import java.time.Instant;
1617
import java.util.List;
@@ -35,6 +36,7 @@
3536
import org.opensearch.ml.common.transport.batch.MLBatchIngestionResponse;
3637
import org.opensearch.ml.engine.MLEngineClassLoader;
3738
import org.opensearch.ml.engine.ingest.Ingestable;
39+
import org.opensearch.ml.settings.MLFeatureEnabledSetting;
3840
import org.opensearch.ml.task.MLTaskManager;
3941
import org.opensearch.ml.utils.MLExceptionUtils;
4042
import org.opensearch.tasks.Task;
@@ -55,27 +57,33 @@ public class TransportBatchIngestionAction extends HandledTransportAction<Action
5557
MLTaskManager mlTaskManager;
5658
private final Client client;
5759
private ThreadPool threadPool;
60+
private MLFeatureEnabledSetting mlFeatureEnabledSetting;
5861

5962
@Inject
6063
public TransportBatchIngestionAction(
6164
TransportService transportService,
6265
ActionFilters actionFilters,
6366
Client client,
6467
MLTaskManager mlTaskManager,
65-
ThreadPool threadPool
68+
ThreadPool threadPool,
69+
MLFeatureEnabledSetting mlFeatureEnabledSetting
6670
) {
6771
super(MLBatchIngestionAction.NAME, transportService, actionFilters, MLBatchIngestionRequest::new);
6872
this.transportService = transportService;
6973
this.client = client;
7074
this.mlTaskManager = mlTaskManager;
7175
this.threadPool = threadPool;
76+
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
7277
}
7378

7479
@Override
7580
protected void doExecute(Task task, ActionRequest request, ActionListener<MLBatchIngestionResponse> listener) {
7681
MLBatchIngestionRequest mlBatchIngestionRequest = MLBatchIngestionRequest.fromActionRequest(request);
7782
MLBatchIngestionInput mlBatchIngestionInput = mlBatchIngestionRequest.getMlBatchIngestionInput();
7883
try {
84+
if (!mlFeatureEnabledSetting.isOfflineBatchIngestionEnabled()) {
85+
throw new IllegalStateException(OFFLINE_BATCH_INGESTION_DISABLED_ERR_MSG);
86+
}
7987
validateBatchIngestInput(mlBatchIngestionInput);
8088
MLTask mlTask = MLTask
8189
.builder()

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,8 @@ public List<Setting<?>> getSettings() {
970970
MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_CANCELLED_REGEX,
971971
MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_CANCELLING_REGEX,
972972
MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_EXPIRED_REGEX,
973-
MLCommonsSettings.ML_COMMONS_CONTROLLER_ENABLED
973+
MLCommonsSettings.ML_COMMONS_CONTROLLER_ENABLED,
974+
MLCommonsSettings.ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED
974975
);
975976
return settings;
976977
}

plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java

+3
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ private MLCommonsSettings() {}
136136
public static final Setting<Boolean> ML_COMMONS_CONNECTOR_ACCESS_CONTROL_ENABLED = Setting
137137
.boolSetting("plugins.ml_commons.connector_access_control_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
138138

139+
public static final Setting<Boolean> ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED = Setting
140+
.boolSetting("plugins.ml_commons.offline_batch_ingestion_enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic);
141+
139142
public static final Setting<List<String>> ML_COMMONS_TRUSTED_CONNECTOR_ENDPOINTS_REGEX = Setting
140143
.listSetting(
141144
"plugins.ml_commons.trusted_connector_endpoints_regex",

plugin/src/main/java/org/opensearch/ml/settings/MLFeatureEnabledSetting.java

+14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_CONNECTOR_PRIVATE_IP_ENABLED;
1212
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_CONTROLLER_ENABLED;
1313
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_LOCAL_MODEL_ENABLED;
14+
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED;
1415
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_INFERENCE_ENABLED;
1516

1617
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,13 +28,15 @@ public class MLFeatureEnabledSetting {
2728
private volatile AtomicBoolean isConnectorPrivateIpEnabled;
2829

2930
private volatile Boolean isControllerEnabled;
31+
private volatile Boolean isBatchIngestionEnabled;
3032

3133
public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings) {
3234
isRemoteInferenceEnabled = ML_COMMONS_REMOTE_INFERENCE_ENABLED.get(settings);
3335
isAgentFrameworkEnabled = ML_COMMONS_AGENT_FRAMEWORK_ENABLED.get(settings);
3436
isLocalModelEnabled = ML_COMMONS_LOCAL_MODEL_ENABLED.get(settings);
3537
isConnectorPrivateIpEnabled = new AtomicBoolean(ML_COMMONS_CONNECTOR_PRIVATE_IP_ENABLED.get(settings));
3638
isControllerEnabled = ML_COMMONS_CONTROLLER_ENABLED.get(settings);
39+
isBatchIngestionEnabled = ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED.get(settings);
3740

3841
clusterService
3942
.getClusterSettings()
@@ -46,6 +49,9 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
4649
.getClusterSettings()
4750
.addSettingsUpdateConsumer(ML_COMMONS_CONNECTOR_PRIVATE_IP_ENABLED, it -> isConnectorPrivateIpEnabled.set(it));
4851
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_CONTROLLER_ENABLED, it -> isControllerEnabled = it);
52+
clusterService
53+
.getClusterSettings()
54+
.addSettingsUpdateConsumer(ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED, it -> isBatchIngestionEnabled = it);
4955
}
5056

5157
/**
@@ -84,4 +90,12 @@ public Boolean isControllerEnabled() {
8490
return isControllerEnabled;
8591
}
8692

93+
/**
94+
* Whether the offline batch ingestion is enabled. If disabled, APIs in ml-commons will block offline batch ingestion.
95+
* @return whether the feature is enabled.
96+
*/
97+
public Boolean isOfflineBatchIngestionEnabled() {
98+
return isBatchIngestionEnabled;
99+
}
100+
87101
}

plugin/src/main/java/org/opensearch/ml/utils/MLExceptionUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class MLExceptionUtils {
2626
"Agent Framework is currently disabled. To enable it, update the setting \"plugins.ml_commons.agent_framework_enabled\" to true.";
2727
public static final String CONTROLLER_DISABLED_ERR_MSG =
2828
"Controller is currently disabled. To enable it, update the setting \"plugins.ml_commons.controller_enabled\" to true.";
29+
public static final String OFFLINE_BATCH_INGESTION_DISABLED_ERR_MSG =
30+
"Offline batch ingestion is currently disabled. To enable it, update the setting \"plugins.ml_commons.offline_batch_ingestion_enabled\" to true.";
2931

3032
public static String getRootCauseMessage(final Throwable throwable) {
3133
String message = ExceptionUtils.getRootCauseMessage(throwable);

plugin/src/test/java/org/opensearch/ml/action/batch/TransportBatchIngestionActionTests.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.ml.common.transport.batch.MLBatchIngestionInput;
4747
import org.opensearch.ml.common.transport.batch.MLBatchIngestionRequest;
4848
import org.opensearch.ml.common.transport.batch.MLBatchIngestionResponse;
49+
import org.opensearch.ml.settings.MLFeatureEnabledSetting;
4950
import org.opensearch.ml.task.MLTaskManager;
5051
import org.opensearch.tasks.Task;
5152
import org.opensearch.test.OpenSearchTestCase;
@@ -73,6 +74,8 @@ public class TransportBatchIngestionActionTests extends OpenSearchTestCase {
7374
ThreadPool threadPool;
7475
@Mock
7576
ExecutorService executorService;
77+
@Mock
78+
private MLFeatureEnabledSetting mlFeatureEnabledSetting;
7679

7780
private TransportBatchIngestionAction batchAction;
7881
private MLBatchIngestionInput batchInput;
@@ -81,7 +84,14 @@ public class TransportBatchIngestionActionTests extends OpenSearchTestCase {
8184
@Before
8285
public void setup() {
8386
MockitoAnnotations.openMocks(this);
84-
batchAction = new TransportBatchIngestionAction(transportService, actionFilters, client, mlTaskManager, threadPool);
87+
batchAction = new TransportBatchIngestionAction(
88+
transportService,
89+
actionFilters,
90+
client,
91+
mlTaskManager,
92+
threadPool,
93+
mlFeatureEnabledSetting
94+
);
8595

8696
Map<String, Object> fieldMap = new HashMap<>();
8797
fieldMap.put("chapter", "$.content[0]");
@@ -106,6 +116,8 @@ public void setup() {
106116
.dataSources(dataSource)
107117
.build();
108118
when(mlBatchIngestionRequest.getMlBatchIngestionInput()).thenReturn(batchInput);
119+
120+
when(mlFeatureEnabledSetting.isOfflineBatchIngestionEnabled()).thenReturn(true);
109121
}
110122

111123
public void test_doExecute_success() {
@@ -181,6 +193,18 @@ public void test_doExecute_handleSuccessRate0() {
181193
);
182194
}
183195

196+
public void test_doExecute_batchIngestionDisabled() {
197+
when(mlFeatureEnabledSetting.isOfflineBatchIngestionEnabled()).thenReturn(false);
198+
batchAction.doExecute(task, mlBatchIngestionRequest, actionListener);
199+
200+
ArgumentCaptor<IllegalStateException> argumentCaptor = ArgumentCaptor.forClass(IllegalStateException.class);
201+
verify(actionListener).onFailure(argumentCaptor.capture());
202+
assertEquals(
203+
"Offline batch ingestion is currently disabled. To enable it, update the setting \"plugins.ml_commons.offline_batch_ingestion_enabled\" to true.",
204+
argumentCaptor.getValue().getMessage()
205+
);
206+
}
207+
184208
public void test_doExecute_noDataSource() {
185209
MLBatchIngestionInput batchInput = MLBatchIngestionInput
186210
.builder()

0 commit comments

Comments
 (0)