Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.19] update put route when flattening result index #1411

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.time.Clock;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -467,18 +468,10 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener
private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun)) {
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();

timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener
.wrap(
initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse),
listener::onFailure
)
);
initAndSetupPipeline(flattenedResultIndexAlias, listener, l -> l.onResponse(createConfigResponse));

} else {
listener.onResponse(createConfigResponse);
}
Expand All @@ -491,23 +484,25 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) {
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
}

protected void setupIngestPipeline(
String flattenedResultIndexAlias,
String configId,
ActionListener<T> listener,
T createConfigResponse
) {
private void initAndSetupPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, Consumer<ActionListener<T>> onSuccess) {
timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener
.wrap(initResponse -> setupIngestPipeline(flattenedResultIndexAlias, listener, onSuccess), listener::onFailure)
);
}

private void setupIngestPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, Consumer<ActionListener<T>> onSuccess) {
String pipelineId = config.getFlattenResultIndexIngestPipelineName();

try {
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> {
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);

bindIngestPipelineWithFlattenedResultIndex(pipelineId, flattenedResultIndexAlias, listener, onSuccess);
}, exception -> {
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
listener.onFailure(exception);
Expand All @@ -519,6 +514,23 @@ protected void setupIngestPipeline(
}
}

private void bindIngestPipelineWithFlattenedResultIndex(
String pipelineId,
String flattenedResultIndexAlias,
ActionListener<T> listener,
Consumer<ActionListener<T>> onSuccess
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
onSuccess.accept(listener);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
}));
}

private BytesReference createPipelineDefinition(String indexName) throws IOException {
XContentBuilder pipelineBuilder = XContentFactory.jsonBuilder();
pipelineBuilder.startObject();
Expand All @@ -544,11 +556,7 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
return BytesReference.bytes(pipelineBuilder);
}

private UpdateSettingsRequest buildUpdateSettingsRequest(
String flattenedResultIndexAlias,
String defaultPipelineName,
String configId
) {
private UpdateSettingsRequest buildUpdateSettingsRequest(String flattenedResultIndexAlias, String defaultPipelineName) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
updateSettingsRequest.indices(flattenedResultIndexAlias);

Expand All @@ -560,24 +568,6 @@ private UpdateSettingsRequest buildUpdateSettingsRequest(
return updateSettingsRequest;
}

protected void bindIngestPipelineWithFlattenedResultIndex(
String pipelineId,
String configId,
String flattenedResultIndexAlias,
ActionListener<T> listener,
T createConfigResponse
) {
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId);

client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
listener.onResponse(createConfigResponse);
}, exception -> {
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
listener.onFailure(exception);
}));
}

protected void updateConfig(String id, boolean indexingDryRun, ActionListener<T> listener) {
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, id);
client
Expand Down Expand Up @@ -619,18 +609,6 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
// customers can choose to use a flattened result index for newly created detectors and disable it for those detectors.
// however, since enabling the flattened result index creates additional resources and due to bwc concerns,
// we do not allow customers to enable this feature for existing running detectors.
listener
.onFailure(
new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST)
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
Expand All @@ -650,6 +628,19 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
listener::onFailure
);

} else if (!existingConfig.getFlattenResultIndexMapping()
&& config.getFlattenResultIndexMapping()
&& existingConfig.getCustomResultIndexOrAlias() != null) {
confirmBatchRunningListener = ActionListener
.wrap(
r -> initAndSetupPipeline(
config.getFlattenResultIndexAlias(),
listener,
l -> searchConfigInputIndices(id, indexingDryRun, l)
),
listener::onFailure
);

} else {
confirmBatchRunningListener = ActionListener
.wrap(
Expand All @@ -673,11 +664,7 @@ private void unbindIngestPipelineWithFlattenedResultIndex(
boolean indexingDryRun
) {
// The pipeline name _none specifies that the index does not have an ingest pipeline.
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(
existingConfig.getFlattenResultIndexAlias(),
"_none",
existingConfig.getId()
);
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(existingConfig.getFlattenResultIndexAlias(), "_none");
client
.admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.forecast.transport.IndexForecasterResponse;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.rest.RestChannel;
Expand Down Expand Up @@ -293,21 +290,4 @@ public static Entity buildEntity(RestRequest request, String detectorId) throws
// not a valid profile request with correct entity information
return null;
}

public static String getConfigIdFromIndexResponse(ActionResponse actionResponse) {
String configId;
if (actionResponse instanceof IndexAnomalyDetectorResponse) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) actionResponse;
configId = response.getId();
logger.info("Handling IndexAnomalyDetectorResponse for configId: {}", configId);
} else if (actionResponse instanceof IndexForecasterResponse) {
IndexForecasterResponse response = (IndexForecasterResponse) actionResponse;
configId = response.getId();
logger.info("Handling IndexForecasterResponse for configId: {}", configId);
} else {
throw new IllegalStateException("Unexpected response type: " + actionResponse.getClass().getName());
}
return configId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -445,19 +445,25 @@ public void testUpdateAnomalyDetectorFlattenResultIndexField() throws Exception
detector.getLastBreakingUIChangeTime()
);

Exception ex = expectThrows(
ResponseException.class,
() -> TestHelpers
.makeRequest(
client(),
"PUT",
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
ImmutableMap.of(),
TestHelpers.toHttpEntity(newDetector),
null
)
Response updateResponse = TestHelpers
.makeRequest(
client(),
"PUT",
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
ImmutableMap.of(),
TestHelpers.toHttpEntity(newDetector),
null
);

assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse));
String expectedPipelineId = "flatten_result_index_ingest_pipeline_" + detector.getName().toLowerCase(Locale.ROOT);
String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId);
Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null);
assertEquals(
"Expected 200 response but got: " + getPipelineResponse.getStatusLine().getStatusCode(),
200,
getPipelineResponse.getStatusLine().getStatusCode()
);
assertThat(ex.getMessage(), containsString(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX));
}

public void testCreateAnomalyDetector() throws Exception {
Expand Down
Loading