Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a feature that flattens custom result index when enabled #1401

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.EnumMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,6 +46,8 @@
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This class provides utility methods for various anomaly detection indices.
*/
Expand All @@ -57,6 +60,8 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
// The index name pattern to query all AD result, history and current AD result
public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*";

// private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructor function
*
Expand Down Expand Up @@ -122,6 +127,22 @@ public static String getResultMappings() throws IOException {
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
}

/**
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
* @return JSON mapping for the flattened result index.
* @throws IOException if the mapping file cannot be read.
*/
public static String getFlattenedResultMappings() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> mapping = objectMapper
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);

mapping.put("dynamic", true);

return objectMapper.writeValueAsString(mapping);
}

/**
* Get anomaly detector state index mapping json content.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ private AnomalyDetectorSettings() {}
);

public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json";
public static final String FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results-flattened.json";
public static final String ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE = "mappings/anomaly-detection-state.json";
public static final String CHECKPOINT_INDEX_MAPPING_FILE = "mappings/anomaly-checkpoint.json";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult res
}

private void addToFlattenedIndexIfExists(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
String flattenedResultIndexName = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
if (clusterService.state().metadata().hasIndex(flattenedResultIndexName)) {
addResult(bulkRequest, result, flattenedResultIndexName);
String flattenedResultIndexAlias = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
if (clusterService.state().metadata().hasAlias(flattenedResultIndexAlias)) {
addResult(bulkRequest, result, flattenedResultIndexAlias);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

Expand Down Expand Up @@ -90,6 +90,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;

Expand Down Expand Up @@ -137,6 +138,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
private NamedXContentRegistry xContentRegistry;
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
protected String customResultIndexPrefix;
private final ObjectMapper objectMapper = new ObjectMapper();

protected class IndexState {
// keep track of whether the mapping version is up-to-date
Expand Down Expand Up @@ -1016,21 +1018,28 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu

/**
* creates flattened result index
* @param indexName the index name
* @param flattenedResultIndexAlias the flattened result index alias
* @param actionListener the action listener
* @throws IOException
*/
public void initFlattenedResultIndex(String indexName, ActionListener<CreateIndexResponse> actionListener) throws IOException {
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener)
throws IOException {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);

if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}

choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {}", indexName);
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
Expand All @@ -1043,13 +1052,12 @@ public void initFlattenedResultIndex(String indexName, ActionListener<CreateInde
}));
}

/**
* Get flattened result index mapping json content
* @return flattened result index mapping
* @throws IOException
*/
public String getFlattenedResultIndexMappings() throws IOException {
return getMappings(FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE);
public String getFlattenedResultIndexAlias(String indexOrAliasName, String configId) {
return indexOrAliasName + "_flattened_" + configId.toLowerCase(Locale.ROOT);
}

public String getFlattenResultIndexIngestPipelineId(String configId) {
return "flatten_result_index_ingest_pipeline" + configId.toLowerCase(Locale.ROOT);
}

public <T> void validateCustomIndexForBackendJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public abstract class Config implements Writeable, ToXContentObject {
public static final String RESULT_INDEX_FIELD_MIN_SIZE = "result_index_min_size";
public static final String RESULT_INDEX_FIELD_MIN_AGE = "result_index_min_age";
public static final String RESULT_INDEX_FIELD_TTL = "result_index_ttl";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_result_index_mapping";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_custom_result_index";
// Changing categorical field, feature attributes, interval, windowDelay, time field, horizon, indices,
// result index would force us to display results only from the most recent update. Otherwise,
// the UI appear cluttered and unclear.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,18 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
String detectorId = response.getId();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);
String configId = response.getId();
String flattenedResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);

timeSeriesIndices
.initFlattenedResultIndex(
indexName,
ActionListener.wrap(initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(pipelineResponse -> {
flattenedResultIndexAlias,
ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
updateResultIndexSetting(
pipelineId,
indexName,
flattenedResultIndexAlias,
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
);
}, listener::onFailure)), listener::onFailure)
Expand All @@ -487,18 +488,20 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
}

private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConfigResponse) {
Boolean flattenResultIndexMapping = config.getFlattenResultIndexMapping();

return !indexingDryRun
&& config.getCustomResultIndexOrAlias() != null
&& config.getFlattenResultIndexMapping()
&& Boolean.TRUE.equals(flattenResultIndexMapping)
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
}

protected void setupIngestPipeline(String detectorId, ActionListener<T> listener) {
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);
protected void setupIngestPipeline(String configId, ActionListener<T> listener) {
String flattenedResultIndexAlias = timeSeriesIndices.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(configId);

try {
BytesReference pipelineSource = createPipelineDefinition(indexName);
BytesReference pipelineSource = createPipelineDefinition(flattenedResultIndexAlias);

PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);

Expand Down Expand Up @@ -576,10 +579,9 @@ private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
return;
}
if (config.getFlattenResultIndexMapping() != null && config.getFlattenResultIndexMapping()) {
// if field value is true, create the pipeline. No need to get and compare with previous value
setupIngestPipeline(id, listener);
} else {
String pipelineId = "anomaly_detection_ingest_pipeline_" + config.getId();
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {

@Override
Expand Down
173 changes: 0 additions & 173 deletions src/main/resources/mappings/anomaly-results-flattened.json

This file was deleted.

Loading
Loading