Skip to content

Commit cbede86

Browse files
add IMMEDIATE refresh policy (opensearch-project#2541) (opensearch-project#2552)
* add IMMEDIATE refresh policy Signed-off-by: Yaliang Wu <ylwu@amazon.com> * add refresh policy to bulk request Signed-off-by: Yaliang Wu <ylwu@amazon.com> * run spotlessApply Signed-off-by: Yaliang Wu <ylwu@amazon.com> --------- Signed-off-by: Yaliang Wu <ylwu@amazon.com> (cherry picked from commit 4c04854) Co-authored-by: Yaliang Wu <ylwu@amazon.com>
1 parent 2775942 commit cbede86

File tree

11 files changed

+21
-7
lines changed

11 files changed

+21
-7
lines changed

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/metrics_correlation/MetricsCorrelation.java

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.engine.algorithms.metrics_correlation;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.index.query.QueryBuilders.termQuery;
910
import static org.opensearch.ml.common.CommonValue.ML_MODEL_GROUP_INDEX;
1011
import static org.opensearch.ml.common.CommonValue.ML_MODEL_GROUP_INDEX_MAPPING;
@@ -268,6 +269,7 @@ void registerModel(ActionListener<MLRegisterModelResponse> listener) throws Inte
268269
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
269270
modelGroup.toXContent(builder, ToXContent.EMPTY_PARAMS);
270271
createModelGroupRequest.source(builder);
272+
createModelGroupRequest.setRefreshPolicy(IMMEDIATE);
271273
client.index(createModelGroupRequest, ActionListener.runBefore(ActionListener.wrap(r -> {
272274
client.execute(MLRegisterModelAction.INSTANCE, registerRequest, ActionListener.wrap(listener::onResponse, e -> {
273275
log.error("Failed to Register Model", e);

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/ConversationIndexMemory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.engine.memory;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_MESSAGE_INDEX;
910
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_META_INDEX;
1011

@@ -84,7 +85,7 @@ public void save(String id, Message message) {
8485
public void save(String id, Message message, ActionListener listener) {
8586
mlIndicesHandler.initMemoryMessageIndex(ActionListener.wrap(created -> {
8687
if (created) {
87-
IndexRequest indexRequest = new IndexRequest(memoryMessageIndexName);
88+
IndexRequest indexRequest = new IndexRequest(memoryMessageIndexName).setRefreshPolicy(IMMEDIATE);
8889
ConversationIndexMessage conversationIndexMessage = (ConversationIndexMessage) message;
8990
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
9091
conversationIndexMessage.toXContent(builder, ToXContent.EMPTY_PARAMS);

plugin/src/main/java/org/opensearch/ml/action/agents/DeleteAgentTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.action.agents;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
910
import static org.opensearch.ml.common.CommonValue.ML_AGENT_INDEX;
1011
import static org.opensearch.ml.utils.MLNodeUtils.createXContentParserFromRegistry;
@@ -80,7 +81,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
8081
);
8182
} else {
8283
// If the agent is not hidden or if the user is a super admin, proceed with deletion
83-
DeleteRequest deleteRequest = new DeleteRequest(ML_AGENT_INDEX, agentId);
84+
DeleteRequest deleteRequest = new DeleteRequest(ML_AGENT_INDEX, agentId).setRefreshPolicy(IMMEDIATE);
8485
client.delete(deleteRequest, ActionListener.wrap(deleteResponse -> {
8586
log.debug("Completed Delete Agent Request, agent id:{} deleted", agentId);
8687
actionListener.onResponse(deleteResponse);

plugin/src/main/java/org/opensearch/ml/action/agents/TransportRegisterAgentAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.action.agents;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.ml.common.CommonValue.ML_AGENT_INDEX;
910

1011
import java.time.Instant;
@@ -70,7 +71,7 @@ private void registerAgent(MLAgent agent, ActionListener<MLRegisterAgentResponse
7071
mlIndicesHandler.initMLAgentIndex(ActionListener.wrap(result -> {
7172
if (result) {
7273
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
73-
IndexRequest indexRequest = new IndexRequest(ML_AGENT_INDEX);
74+
IndexRequest indexRequest = new IndexRequest(ML_AGENT_INDEX).setRefreshPolicy(IMMEDIATE);
7475
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
7576
mlAgent.toXContent(builder, ToXContent.EMPTY_PARAMS);
7677
indexRequest.source(builder);

plugin/src/main/java/org/opensearch/ml/action/connector/DeleteConnectorTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.action.connector;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
910
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
1011

@@ -66,7 +67,7 @@ public DeleteConnectorTransportAction(
6667
protected void doExecute(Task task, ActionRequest request, ActionListener<DeleteResponse> actionListener) {
6768
MLConnectorDeleteRequest mlConnectorDeleteRequest = MLConnectorDeleteRequest.fromActionRequest(request);
6869
String connectorId = mlConnectorDeleteRequest.getConnectorId();
69-
DeleteRequest deleteRequest = new DeleteRequest(ML_CONNECTOR_INDEX, connectorId);
70+
DeleteRequest deleteRequest = new DeleteRequest(ML_CONNECTOR_INDEX, connectorId).setRefreshPolicy(IMMEDIATE);
7071
connectorAccessControlHelper.validateConnectorAccess(client, connectorId, ActionListener.wrap(x -> {
7172
if (Boolean.TRUE.equals(x)) {
7273
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {

plugin/src/main/java/org/opensearch/ml/action/connector/UpdateConnectorTransportAction.java

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.action.search.SearchRequest;
2020
import org.opensearch.action.support.ActionFilters;
2121
import org.opensearch.action.support.HandledTransportAction;
22+
import org.opensearch.action.support.WriteRequest;
2223
import org.opensearch.action.update.UpdateRequest;
2324
import org.opensearch.action.update.UpdateResponse;
2425
import org.opensearch.client.Client;
@@ -93,6 +94,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Update
9394
connector.update(mlUpdateConnectorAction.getUpdateContent(), mlEngine::encrypt);
9495
connector.validateConnectorURL(trustedConnectorEndpointsRegex);
9596
UpdateRequest updateRequest = new UpdateRequest(ML_CONNECTOR_INDEX, connectorId);
97+
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
9698
updateRequest.doc(connector.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
9799
updateUndeployedConnector(connectorId, updateRequest, listener, context);
98100
} else {

plugin/src/main/java/org/opensearch/ml/action/controller/DeleteControllerTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.action.controller;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
910
import static org.opensearch.ml.common.utils.StringUtils.getErrorMessage;
1011

@@ -216,7 +217,7 @@ private void deleteControllerWithDeployedModel(String modelId, Boolean isHidden,
216217
}
217218

218219
private void deleteController(String modelId, Boolean isHidden, ActionListener<DeleteResponse> actionListener) {
219-
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId);
220+
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId).setRefreshPolicy(IMMEDIATE);
220221
client.delete(deleteRequest, new ActionListener<>() {
221222
@Override
222223
public void onResponse(DeleteResponse deleteResponse) {

plugin/src/main/java/org/opensearch/ml/action/model_group/TransportUpdateModelGroupAction.java

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.action.get.GetRequest;
2020
import org.opensearch.action.support.ActionFilters;
2121
import org.opensearch.action.support.HandledTransportAction;
22+
import org.opensearch.action.support.WriteRequest;
2223
import org.opensearch.action.update.UpdateRequest;
2324
import org.opensearch.client.Client;
2425
import org.opensearch.cluster.service.ClusterService;
@@ -186,6 +187,7 @@ private void updateModelGroup(
186187

187188
private void updateModelGroup(String modelGroupId, Map<String, Object> source, ActionListener<MLUpdateModelGroupResponse> listener) {
188189
UpdateRequest updateModelGroupRequest = new UpdateRequest();
190+
updateModelGroupRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
189191
updateModelGroupRequest.index(ML_MODEL_GROUP_INDEX).id(modelGroupId).doc(source);
190192
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
191193
ActionListener<MLUpdateModelGroupResponse> wrappedListener = ActionListener.runBefore(listener, () -> context.restore());

plugin/src/main/java/org/opensearch/ml/action/models/DeleteModelTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.action.models;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
910
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
1011
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
@@ -287,7 +288,7 @@ private void deleteModelChunksAndController(
287288
* @param modelId model ID
288289
*/
289290
private void deleteController(String modelId, Boolean isHidden, ActionListener<Boolean> actionListener) {
290-
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId);
291+
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId).setRefreshPolicy(IMMEDIATE);
291292
client.delete(deleteRequest, new ActionListener<>() {
292293
@Override
293294
public void onResponse(DeleteResponse deleteResponse) {

plugin/src/main/java/org/opensearch/ml/action/tasks/DeleteTaskTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.ml.action.tasks;
77

8+
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
89
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
910
import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX;
1011
import static org.opensearch.ml.utils.MLNodeUtils.createXContentParserFromRegistry;
@@ -68,7 +69,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
6869
if (mlTaskState.equals(MLTaskState.RUNNING)) {
6970
actionListener.onFailure(new Exception("Task cannot be deleted in running state. Try after sometime"));
7071
} else {
71-
DeleteRequest deleteRequest = new DeleteRequest(ML_TASK_INDEX, taskId);
72+
DeleteRequest deleteRequest = new DeleteRequest(ML_TASK_INDEX, taskId).setRefreshPolicy(IMMEDIATE);
7273
client.delete(deleteRequest, new ActionListener<DeleteResponse>() {
7374
@Override
7475
public void onResponse(DeleteResponse deleteResponse) {

plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java

+1
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ private void bulkUpdateModelState(
426426
updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build());
427427
bulkUpdateRequest.add(updateRequest);
428428
}
429+
bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
429430
log.info("Refresh model state: {}", newModelStates);
430431
client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> {
431432
updateModelStateSemaphore.release();

0 commit comments

Comments
 (0)