Skip to content

Commit 85a7563

Browse files
committed
address comments
Signed-off-by: Jackie Han <hnyng@amazon.com>
1 parent 9fced72 commit 85a7563

File tree

3 files changed

+54
-26
lines changed

3 files changed

+54
-26
lines changed

src/main/java/org/opensearch/ad/indices/ADIndexManagement.java

+21
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.IOException;
2424
import java.util.EnumMap;
25+
import java.util.Map;
2526

2627
import org.apache.logging.log4j.LogManager;
2728
import org.apache.logging.log4j.Logger;
@@ -45,6 +46,8 @@
4546
import org.opensearch.timeseries.indices.IndexManagement;
4647
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
4748

49+
import com.fasterxml.jackson.databind.ObjectMapper;
50+
4851
/**
4952
* This class provides utility methods for various anomaly detection indices.
5053
*/
@@ -57,6 +60,8 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
5760
// The index name pattern to query all AD result, history and current AD result
5861
public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*";
5962

63+
// private static final ObjectMapper objectMapper = new ObjectMapper();
64+
6065
/**
6166
* Constructor function
6267
*
@@ -122,6 +127,22 @@ public static String getResultMappings() throws IOException {
122127
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
123128
}
124129

130+
/**
131+
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
132+
* @return JSON mapping for the flattened result index.
133+
* @throws IOException if the mapping file cannot be read.
134+
*/
135+
public static String getFlattenedResultMappings() throws IOException {
136+
ObjectMapper objectMapper = new ObjectMapper();
137+
138+
Map<String, Object> mapping = objectMapper
139+
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);
140+
141+
mapping.put("dynamic", true);
142+
143+
return objectMapper.writeValueAsString(mapping);
144+
}
145+
125146
/**
126147
* Get anomaly detector state index mapping json content.
127148
*

src/main/java/org/opensearch/timeseries/indices/IndexManagement.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
package org.opensearch.timeseries.indices;
1313

14-
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE;
14+
import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings;
1515
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
1616
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;
1717

@@ -90,6 +90,7 @@
9090
import org.opensearch.timeseries.settings.TimeSeriesSettings;
9191
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
9292

93+
import com.fasterxml.jackson.databind.ObjectMapper;
9394
import com.google.common.base.Charsets;
9495
import com.google.common.io.Resources;
9596

@@ -137,6 +138,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
137138
private NamedXContentRegistry xContentRegistry;
138139
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
139140
protected String customResultIndexPrefix;
141+
private final ObjectMapper objectMapper = new ObjectMapper();
140142

141143
protected class IndexState {
142144
// keep track of whether the mapping version is up-to-date
@@ -1016,21 +1018,28 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
10161018

10171019
/**
10181020
* creates flattened result index
1019-
* @param indexName the index name
1021+
* @param flattenedResultIndexAlias the flattened result index alias
10201022
* @param actionListener the action listener
10211023
* @throws IOException
10221024
*/
1023-
public void initFlattenedResultIndex(String indexName, ActionListener<CreateIndexResponse> actionListener) throws IOException {
1025+
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener)
1026+
throws IOException {
1027+
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
10241028
logger.info("Initializing flattened result index: {}", indexName);
10251029

10261030
CreateIndexRequest request = new CreateIndexRequest(indexName)
1027-
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
1031+
.mapping(getFlattenedResultMappings(), XContentType.JSON)
10281032
.settings(settings);
1033+
1034+
if (flattenedResultIndexAlias != null) {
1035+
request.alias(new Alias(flattenedResultIndexAlias));
1036+
}
1037+
10291038
choosePrimaryShards(request, false);
10301039

10311040
adminClient.indices().create(request, ActionListener.wrap(response -> {
10321041
if (response.isAcknowledged()) {
1033-
logger.info("Successfully created flattened result index: {}", indexName);
1042+
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
10341043
actionListener.onResponse(response);
10351044
} else {
10361045
String errorMsg = "Index creation not acknowledged for index: " + indexName;
@@ -1043,15 +1052,6 @@ public void initFlattenedResultIndex(String indexName, ActionListener<CreateInde
10431052
}));
10441053
}
10451054

1046-
/**
1047-
* Get flattened result index mapping json content
1048-
* @return flattened result index mapping
1049-
* @throws IOException
1050-
*/
1051-
public String getFlattenedResultIndexMappings() throws IOException {
1052-
return getMappings(FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE);
1053-
}
1054-
10551055
public <T> void validateCustomIndexForBackendJob(
10561056
String resultIndexOrAlias,
10571057
String securityLogId,

src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java

+19-12
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,14 @@ protected void prepareConfigIndexing(boolean indexingDryRun, ActionListener<T> l
454454
}
455455
}
456456

457+
private String getFlattenedResultIndexAlias(String configId) {
458+
return config.getCustomResultIndexOrAlias() + "_flattened_" + configId.toLowerCase(Locale.ROOT);
459+
}
460+
461+
private String getFlattenResultIndexIngestPipelineId(String configId) {
462+
return "flatten_result_index_ingest_pipeline" + configId.toLowerCase(Locale.ROOT);
463+
}
464+
457465
private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener) {
458466
handler.confirmJobRunning(clusterService, client, id, listener, () -> {
459467
handleFlattenResultIndexMappingUpdate(listener);
@@ -465,17 +473,17 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
465473
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
466474
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
467475
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
468-
String detectorId = response.getId();
469-
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
470-
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);
476+
String configId = response.getId();
477+
String flattenedResultIndexAlias = getFlattenedResultIndexAlias(configId);
478+
String pipelineId = getFlattenResultIndexIngestPipelineId(configId);
471479

472480
timeSeriesIndices
473481
.initFlattenedResultIndex(
474-
indexName,
475-
ActionListener.wrap(initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(pipelineResponse -> {
482+
flattenedResultIndexAlias,
483+
ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
476484
updateResultIndexSetting(
477485
pipelineId,
478-
indexName,
486+
flattenedResultIndexAlias,
479487
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
480488
);
481489
}, listener::onFailure)), listener::onFailure)
@@ -493,12 +501,12 @@ private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConf
493501
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
494502
}
495503

496-
protected void setupIngestPipeline(String detectorId, ActionListener<T> listener) {
497-
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
498-
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);
504+
protected void setupIngestPipeline(String configId, ActionListener<T> listener) {
505+
String flattenedResultIndexAlias = getFlattenedResultIndexAlias(configId);
506+
String pipelineId = getFlattenResultIndexIngestPipelineId(configId);
499507

500508
try {
501-
BytesReference pipelineSource = createPipelineDefinition(indexName);
509+
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);
502510

503511
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);
504512

@@ -576,10 +584,9 @@ private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
576584
return;
577585
}
578586
if (config.getFlattenResultIndexMapping() != null && config.getFlattenResultIndexMapping()) {
579-
// if field value is true, create the pipeline. No need to get and compare with previous value
580587
setupIngestPipeline(id, listener);
581588
} else {
582-
String pipelineId = "anomaly_detection_ingest_pipeline_" + config.getId();
589+
String pipelineId = getFlattenResultIndexIngestPipelineId(config.getId());
583590
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {
584591

585592
@Override

0 commit comments

Comments
 (0)