Skip to content

Commit 1947887

Browse files
update put route when flattening result index (#1409)
* update put route when flattening result index Signed-off-by: Jackie Han <hnyng@amazon.com> * clean up unused code Signed-off-by: Jackie Han <hnyng@amazon.com> --------- Signed-off-by: Jackie Han <hnyng@amazon.com> (cherry picked from commit d4ce6cf) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 328f901 commit 1947887

File tree

3 files changed

+64
-91
lines changed

3 files changed

+64
-91
lines changed

src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java

+46-59
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.time.Clock;
1818
import java.util.*;
19+
import java.util.function.Consumer;
1920
import java.util.stream.Collectors;
2021

2122
import org.apache.commons.lang.StringUtils;
@@ -467,18 +468,10 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener
467468
private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
468469
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
469470
if (shouldHandleFlattening(indexingDryRun)) {
470-
String configId = RestHandlerUtils.getConfigIdFromIndexResponse(createConfigResponse);
471471
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();
472472

473-
timeSeriesIndices
474-
.initFlattenedResultIndex(
475-
flattenedResultIndexAlias,
476-
ActionListener
477-
.wrap(
478-
initResponse -> setupIngestPipeline(flattenedResultIndexAlias, configId, listener, createConfigResponse),
479-
listener::onFailure
480-
)
481-
);
473+
initAndSetupPipeline(flattenedResultIndexAlias, listener, l -> l.onResponse(createConfigResponse));
474+
482475
} else {
483476
listener.onResponse(createConfigResponse);
484477
}
@@ -491,23 +484,25 @@ private boolean shouldHandleFlattening(boolean indexingDryRun) {
491484
return !indexingDryRun && config.getCustomResultIndexOrAlias() != null && Boolean.TRUE.equals(flattenResultIndexMapping);
492485
}
493486

494-
protected void setupIngestPipeline(
495-
String flattenedResultIndexAlias,
496-
String configId,
497-
ActionListener<T> listener,
498-
T createConfigResponse
499-
) {
487+
private void initAndSetupPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, Consumer<ActionListener<T>> onSuccess) {
488+
timeSeriesIndices
489+
.initFlattenedResultIndex(
490+
flattenedResultIndexAlias,
491+
ActionListener
492+
.wrap(initResponse -> setupIngestPipeline(flattenedResultIndexAlias, listener, onSuccess), listener::onFailure)
493+
);
494+
}
495+
496+
private void setupIngestPipeline(String flattenedResultIndexAlias, ActionListener<T> listener, Consumer<ActionListener<T>> onSuccess) {
500497
String pipelineId = config.getFlattenResultIndexIngestPipelineName();
501498

502499
try {
503500
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);
504-
505501
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);
506502

507503
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> {
508504
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
509-
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);
510-
505+
bindIngestPipelineWithFlattenedResultIndex(pipelineId, flattenedResultIndexAlias, listener, onSuccess);
511506
}, exception -> {
512507
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
513508
listener.onFailure(exception);
@@ -519,6 +514,23 @@ protected void setupIngestPipeline(
519514
}
520515
}
521516

517+
private void bindIngestPipelineWithFlattenedResultIndex(
518+
String pipelineId,
519+
String flattenedResultIndexAlias,
520+
ActionListener<T> listener,
521+
Consumer<ActionListener<T>> onSuccess
522+
) {
523+
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId);
524+
525+
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
526+
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
527+
onSuccess.accept(listener);
528+
}, exception -> {
529+
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
530+
listener.onFailure(exception);
531+
}));
532+
}
533+
522534
private BytesReference createPipelineDefinition(String indexName) throws IOException {
523535
XContentBuilder pipelineBuilder = XContentFactory.jsonBuilder();
524536
pipelineBuilder.startObject();
@@ -544,11 +556,7 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
544556
return BytesReference.bytes(pipelineBuilder);
545557
}
546558

547-
private UpdateSettingsRequest buildUpdateSettingsRequest(
548-
String flattenedResultIndexAlias,
549-
String defaultPipelineName,
550-
String configId
551-
) {
559+
private UpdateSettingsRequest buildUpdateSettingsRequest(String flattenedResultIndexAlias, String defaultPipelineName) {
552560
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
553561
updateSettingsRequest.indices(flattenedResultIndexAlias);
554562

@@ -560,24 +568,6 @@ private UpdateSettingsRequest buildUpdateSettingsRequest(
560568
return updateSettingsRequest;
561569
}
562570

563-
protected void bindIngestPipelineWithFlattenedResultIndex(
564-
String pipelineId,
565-
String configId,
566-
String flattenedResultIndexAlias,
567-
ActionListener<T> listener,
568-
T createConfigResponse
569-
) {
570-
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(flattenedResultIndexAlias, pipelineId, configId);
571-
572-
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
573-
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
574-
listener.onResponse(createConfigResponse);
575-
}, exception -> {
576-
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
577-
listener.onFailure(exception);
578-
}));
579-
}
580-
581571
protected void updateConfig(String id, boolean indexingDryRun, ActionListener<T> listener) {
582572
GetRequest request = new GetRequest(CommonName.CONFIG_INDEX, id);
583573
client
@@ -619,18 +609,6 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
619609
);
620610
return;
621611
}
622-
if (!existingConfig.getFlattenResultIndexMapping()
623-
&& config.getFlattenResultIndexMapping()
624-
&& existingConfig.getCustomResultIndexOrAlias() != null) {
625-
// customers can choose to use a flattened result index for newly created detectors and disable it for those detectors.
626-
// however, since enabling the flattened result index creates additional resources and due to bwc concerns,
627-
// we do not allow customers to enable this feature for existing running detectors.
628-
listener
629-
.onFailure(
630-
new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX, RestStatus.BAD_REQUEST)
631-
);
632-
return;
633-
}
634612
} else {
635613
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
636614
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
@@ -650,6 +628,19 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
650628
listener::onFailure
651629
);
652630

631+
} else if (!existingConfig.getFlattenResultIndexMapping()
632+
&& config.getFlattenResultIndexMapping()
633+
&& existingConfig.getCustomResultIndexOrAlias() != null) {
634+
confirmBatchRunningListener = ActionListener
635+
.wrap(
636+
r -> initAndSetupPipeline(
637+
config.getFlattenResultIndexAlias(),
638+
listener,
639+
l -> searchConfigInputIndices(id, indexingDryRun, l)
640+
),
641+
listener::onFailure
642+
);
643+
653644
} else {
654645
confirmBatchRunningListener = ActionListener
655646
.wrap(
@@ -673,11 +664,7 @@ private void unbindIngestPipelineWithFlattenedResultIndex(
673664
boolean indexingDryRun
674665
) {
675666
// The pipeline name _none specifies that the index does not have an ingest pipeline.
676-
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(
677-
existingConfig.getFlattenResultIndexAlias(),
678-
"_none",
679-
existingConfig.getId()
680-
);
667+
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(existingConfig.getFlattenResultIndexAlias(), "_none");
681668
client
682669
.admin()
683670
.indices()

src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java

-20
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,17 @@
2626
import org.opensearch.OpenSearchStatusException;
2727
import org.opensearch.action.search.SearchPhaseExecutionException;
2828
import org.opensearch.action.search.ShardSearchFailure;
29-
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
3029
import org.opensearch.common.Nullable;
3130
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
3231
import org.opensearch.common.xcontent.XContentHelper;
3332
import org.opensearch.common.xcontent.XContentType;
3433
import org.opensearch.core.action.ActionListener;
35-
import org.opensearch.core.action.ActionResponse;
3634
import org.opensearch.core.common.Strings;
3735
import org.opensearch.core.common.bytes.BytesReference;
3836
import org.opensearch.core.rest.RestStatus;
3937
import org.opensearch.core.xcontent.NamedXContentRegistry;
4038
import org.opensearch.core.xcontent.ToXContent;
4139
import org.opensearch.core.xcontent.XContentParser;
42-
import org.opensearch.forecast.transport.IndexForecasterResponse;
4340
import org.opensearch.index.IndexNotFoundException;
4441
import org.opensearch.indices.InvalidIndexNameException;
4542
import org.opensearch.rest.RestChannel;
@@ -293,21 +290,4 @@ public static Entity buildEntity(RestRequest request, String detectorId) throws
293290
// not a valid profile request with correct entity information
294291
return null;
295292
}
296-
297-
public static String getConfigIdFromIndexResponse(ActionResponse actionResponse) {
298-
String configId;
299-
if (actionResponse instanceof IndexAnomalyDetectorResponse) {
300-
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) actionResponse;
301-
configId = response.getId();
302-
logger.info("Handling IndexAnomalyDetectorResponse for configId: {}", configId);
303-
} else if (actionResponse instanceof IndexForecasterResponse) {
304-
IndexForecasterResponse response = (IndexForecasterResponse) actionResponse;
305-
configId = response.getId();
306-
logger.info("Handling IndexForecasterResponse for configId: {}", configId);
307-
} else {
308-
throw new IllegalStateException("Unexpected response type: " + actionResponse.getClass().getName());
309-
}
310-
return configId;
311-
}
312-
313293
}

src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -445,19 +445,25 @@ public void testUpdateAnomalyDetectorFlattenResultIndexField() throws Exception
445445
detector.getLastBreakingUIChangeTime()
446446
);
447447

448-
Exception ex = expectThrows(
449-
ResponseException.class,
450-
() -> TestHelpers
451-
.makeRequest(
452-
client(),
453-
"PUT",
454-
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
455-
ImmutableMap.of(),
456-
TestHelpers.toHttpEntity(newDetector),
457-
null
458-
)
448+
Response updateResponse = TestHelpers
449+
.makeRequest(
450+
client(),
451+
"PUT",
452+
TestHelpers.AD_BASE_DETECTORS_URI + "/" + id + "?refresh=true",
453+
ImmutableMap.of(),
454+
TestHelpers.toHttpEntity(newDetector),
455+
null
456+
);
457+
458+
assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse));
459+
String expectedPipelineId = "flatten_result_index_ingest_pipeline_" + detector.getName().toLowerCase(Locale.ROOT);
460+
String getIngestPipelineEndpoint = String.format(Locale.ROOT, "_ingest/pipeline/%s", expectedPipelineId);
461+
Response getPipelineResponse = TestHelpers.makeRequest(client(), "GET", getIngestPipelineEndpoint, ImmutableMap.of(), "", null);
462+
assertEquals(
463+
"Expected 200 response but got: " + getPipelineResponse.getStatusLine().getStatusCode(),
464+
200,
465+
getPipelineResponse.getStatusLine().getStatusCode()
459466
);
460-
assertThat(ex.getMessage(), containsString(CommonMessages.CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX));
461467
}
462468

463469
public void testCreateAnomalyDetector() throws Exception {

0 commit comments

Comments
 (0)