Skip to content

Commit 62768c5

Browse files
committed
add more IT
Signed-off-by: Jackie Han <hnyng@amazon.com>
1 parent b876a22 commit 62768c5

File tree

2 files changed

+48
-41
lines changed

2 files changed

+48
-41
lines changed

src/main/java/org/opensearch/timeseries/indices/IndexManagement.java

+28-25
Original file line numberDiff line numberDiff line change
@@ -1020,36 +1020,39 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
10201020
* creates flattened result index
10211021
* @param flattenedResultIndexAlias the flattened result index alias
10221022
* @param actionListener the action listener
1023-
* @throws IOException
10241023
*/
1025-
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener)
1026-
throws IOException {
1027-
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
1028-
logger.info("Initializing flattened result index: {}", indexName);
1024+
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener) {
1025+
try {
1026+
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
1027+
logger.info("Initializing flattened result index: {}", indexName);
10291028

1030-
CreateIndexRequest request = new CreateIndexRequest(indexName)
1031-
.mapping(getFlattenedResultMappings(), XContentType.JSON)
1032-
.settings(settings);
1029+
CreateIndexRequest request = new CreateIndexRequest(indexName)
1030+
.mapping(getFlattenedResultMappings(), XContentType.JSON)
1031+
.settings(settings);
10331032

1034-
if (flattenedResultIndexAlias != null) {
1035-
request.alias(new Alias(flattenedResultIndexAlias));
1036-
}
1033+
if (flattenedResultIndexAlias != null) {
1034+
request.alias(new Alias(flattenedResultIndexAlias));
1035+
}
10371036

1038-
choosePrimaryShards(request, false);
1037+
choosePrimaryShards(request, false);
10391038

1040-
adminClient.indices().create(request, ActionListener.wrap(response -> {
1041-
if (response.isAcknowledged()) {
1042-
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
1043-
actionListener.onResponse(response);
1044-
} else {
1045-
String errorMsg = "Index creation not acknowledged for index: " + indexName;
1046-
logger.error(errorMsg);
1047-
actionListener.onFailure(new IllegalStateException(errorMsg));
1048-
}
1049-
}, exception -> {
1050-
logger.error("Failed to create flattened result index: {}", indexName, exception);
1051-
actionListener.onFailure(exception);
1052-
}));
1039+
adminClient.indices().create(request, ActionListener.wrap(response -> {
1040+
if (response.isAcknowledged()) {
1041+
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
1042+
actionListener.onResponse(response);
1043+
} else {
1044+
String errorMsg = "Index creation not acknowledged for index: " + indexName;
1045+
logger.error(errorMsg);
1046+
actionListener.onFailure(new IllegalStateException(errorMsg));
1047+
}
1048+
}, exception -> {
1049+
logger.error("Failed to create flattened result index: {}", indexName, exception);
1050+
actionListener.onFailure(exception);
1051+
}));
1052+
} catch (IOException e) {
1053+
logger.error("Error while building mappings for flattened result index: {}", flattenedResultIndexAlias, e);
1054+
actionListener.onFailure(e);
1055+
}
10531056
}
10541057

10551058
public String getFlattenedResultIndexAlias(String indexOrAliasName, String configId) {

src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java

+20-16
Original file line numberDiff line numberDiff line change
@@ -466,27 +466,31 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
466466
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
467467
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
468468
String configId = response.getId();
469-
String flattenedResultIndexAlias = timeSeriesIndices
470-
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
471-
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);
472-
473-
timeSeriesIndices
474-
.initFlattenedResultIndex(
475-
flattenedResultIndexAlias,
476-
ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
477-
updateResultIndexSetting(
478-
pipelineId,
479-
flattenedResultIndexAlias,
480-
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
481-
);
482-
}, listener::onFailure)), listener::onFailure)
483-
);
469+
initFlattenedResultIndexAndPipeline(
470+
configId,
471+
ActionListener.wrap(initResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
472+
);
484473
} else {
485474
listener.onResponse(createConfigResponse);
486475
}
487476
}, listener::onFailure));
488477
}
489478

479+
private void initFlattenedResultIndexAndPipeline(String configId, ActionListener<T> listener) {
480+
String flattenedResultIndexAlias = timeSeriesIndices.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
481+
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);
482+
483+
timeSeriesIndices.initFlattenedResultIndex(flattenedResultIndexAlias, ActionListener.wrap(initResponse -> {
484+
setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
485+
updateResultIndexSetting(
486+
pipelineId,
487+
flattenedResultIndexAlias,
488+
ActionListener.wrap(updateResponse -> listener.onResponse(null), listener::onFailure)
489+
);
490+
}, listener::onFailure));
491+
}, listener::onFailure));
492+
}
493+
490494
private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConfigResponse) {
491495
Boolean flattenResultIndexMapping = config.getFlattenResultIndexMapping();
492496

@@ -579,7 +583,7 @@ private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
579583
return;
580584
}
581585
if (config.getFlattenResultIndexMapping() != null && config.getFlattenResultIndexMapping()) {
582-
setupIngestPipeline(id, listener);
586+
initFlattenedResultIndexAndPipeline(config.getId(), listener);
583587
} else {
584588
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
585589
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {

0 commit comments

Comments
 (0)