Skip to content

Commit 328f901

Browse files
[Backport 2.x] add a feature that flattens custom result index when enabled (#1401) (#1405)
* add a feature that flattens custom result index when enabled (#1401) * add a feature that flattens custom result index when enabled Signed-off-by: Jackie Han <jkhanjob@gmail.com> * clean up Signed-off-by: Jackie Han <jkhanjob@gmail.com> * add IT Signed-off-by: Jackie Han <hnyng@amazon.com> * cleanup Signed-off-by: Jackie Han <hnyng@amazon.com> * address comments Signed-off-by: Jackie Han <hnyng@amazon.com> * update IT Signed-off-by: Jackie Han <hnyng@amazon.com> * update IT Signed-off-by: Jackie Han <hnyng@amazon.com> * clean up Signed-off-by: Jackie Han <hnyng@amazon.com> * add more IT Signed-off-by: Jackie Han <hnyng@amazon.com> * add more IT Signed-off-by: Jackie Han <hnyng@amazon.com> * add more IT Signed-off-by: Jackie Han <hnyng@amazon.com> * address comments * address comments Signed-off-by: Jackie Han <hnyng@amazon.com> * address comments Signed-off-by: Jackie Han <hnyng@amazon.com> * utlizing a node state manager when writing results into flattened result index Signed-off-by: Jackie Han <hnyng@amazon.com> * build flatten resuilt index enabled into the ResultWriteRequest Signed-off-by: Jackie Han <hnyng@amazon.com> * unbind ingest pipeline with flattened result index when it's disabled Signed-off-by: Jackie Han <hnyng@amazon.com> * test * address comments * cleanup Signed-off-by: Jackie Han <hnyng@amazon.com> * make flatten result index use detector name Signed-off-by: Jackie Han <hnyng@amazon.com> --------- Signed-off-by: Jackie Han <jkhanjob@gmail.com> Signed-off-by: Jackie Han <hnyng@amazon.com> * spotless apply Signed-off-by: Jackie Han <hnyng@amazon.com> * update recency_emphasis to be greater than 1 in test cases Signed-off-by: Jackie Han <hnyng@amazon.com> --------- Signed-off-by: Jackie Han <jkhanjob@gmail.com> Signed-off-by: Jackie Han <hnyng@amazon.com>
1 parent 78e2b5e commit 328f901

33 files changed

+888
-103
lines changed

src/main/java/org/opensearch/ad/indices/ADIndexManagement.java

+19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.IOException;
2424
import java.util.EnumMap;
25+
import java.util.Map;
2526

2627
import org.apache.logging.log4j.LogManager;
2728
import org.apache.logging.log4j.Logger;
@@ -45,6 +46,8 @@
4546
import org.opensearch.timeseries.indices.IndexManagement;
4647
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
4748

49+
import com.fasterxml.jackson.databind.ObjectMapper;
50+
4851
/**
4952
* This class provides utility methods for various anomaly detection indices.
5053
*/
@@ -122,6 +125,22 @@ public static String getResultMappings() throws IOException {
122125
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
123126
}
124127

128+
/**
129+
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
130+
* @return JSON mapping for the flattened result index.
131+
* @throws IOException if the mapping file cannot be read.
132+
*/
133+
public static String getFlattenedResultMappings() throws IOException {
134+
ObjectMapper objectMapper = new ObjectMapper();
135+
136+
Map<String, Object> mapping = objectMapper
137+
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);
138+
139+
mapping.put("dynamic", true);
140+
141+
return objectMapper.writeValueAsString(mapping);
142+
}
143+
125144
/**
126145
* Get anomaly detector state index mapping json content.
127146
*

src/main/java/org/opensearch/ad/model/AnomalyDetector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ public static AnomalyDetector parse(
592592
case RESULT_INDEX_FIELD_TTL:
593593
customResultIndexTTL = onlyParseNumberValue(parser);
594594
break;
595-
case FLATTEN_RESULT_INDEX_MAPPING:
595+
case FLATTEN_CUSTOM_RESULT_INDEX:
596596
flattenResultIndexMapping = onlyParseBooleanValue(parser);
597597
break;
598598
case BREAKING_UI_CHANGE_TIME:

src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ public ADResultWriteRequest(
2525
String detectorId,
2626
RequestPriority priority,
2727
AnomalyResult result,
28-
String resultIndex
28+
String resultIndex,
29+
String flattenResultIndex
2930
) {
30-
super(expirationEpochMs, detectorId, priority, result, resultIndex);
31+
super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex);
3132
}
3233

3334
public ADResultWriteRequest(StreamInput in) throws IOException {

src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ protected ADResultWriteRequest createResultWriteRequest(
103103
String configId,
104104
RequestPriority priority,
105105
AnomalyResult result,
106-
String resultIndex
106+
String resultIndex,
107+
String flattenResultIndex
107108
) {
108-
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
109+
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
109110
}
110111
}

src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void saveResult(AnomalyResult result, Config config) {
8686
config.getId(),
8787
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
8888
result,
89-
config.getCustomResultIndexOrAlias()
89+
config.getCustomResultIndexOrAlias(),
90+
config.getFlattenResultIndexAlias()
9091
)
9192
);
9293
}

src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java

+57-17
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
public class ADResultBulkTransportAction extends ResultBulkTransportAction<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest> {
4040

4141
private static final Logger LOG = LogManager.getLogger(ADResultBulkTransportAction.class);
42+
private final ClusterService clusterService;
43+
private final Client client;
4244

4345
@Inject
4446
public ADResultBulkTransportAction(
@@ -61,39 +63,77 @@ public ADResultBulkTransportAction(
6163
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
6264
ADResultBulkRequest::new
6365
);
66+
this.clusterService = clusterService;
67+
this.client = client;
6468
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
6569
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);
6670
}
6771

72+
/**
73+
* Prepares a {@link BulkRequest} for indexing anomaly detection results.
74+
*
75+
* This method processes a list of anomaly detection results provided in the {@link ADResultBulkRequest}.
76+
* Each result is evaluated based on the current indexing pressure and result priority. If a flattened
77+
* result index exists for the result, the result is also added to the flattened index.
78+
*
79+
* @param indexingPressurePercent the current percentage of indexing pressure. This value influences
80+
* whether a result is indexed based on predefined thresholds and probabilities.
81+
* @param request the {@link ADResultBulkRequest} containing anomaly detection results
82+
* to be processed.
83+
* @return a {@link BulkRequest} containing all results that are eligible for indexing.
84+
*
85+
* <p><b>Behavior:</b></p>
86+
* <ul>
87+
* <li>Results are added to the bulk request if the indexing pressure is within acceptable limits
88+
* or the result has high priority.</li>
89+
* <li>If a flattened result index exists for a result, it is added to the flattened index in addition
90+
* to the primary index.</li>
91+
* </ul>
92+
*
93+
* <p><b>Indexing Pressure Thresholds:</b></p>
94+
* <ul>
95+
* <li>Below the soft limit: All results are added.</li>
96+
* <li>Between the soft limit and the hard limit: High-priority results are always added, and
97+
* other results are added based on a probability that decreases with increasing pressure.</li>
98+
* <li>Above the hard limit: Only high-priority results are added.</li>
99+
* </ul>
100+
*
101+
* @see ADResultBulkRequest
102+
* @see BulkRequest
103+
* @see ADResultWriteRequest
104+
*/
68105
@Override
69106
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
70107
BulkRequest bulkRequest = new BulkRequest();
71108
List<ADResultWriteRequest> results = request.getResults();
72109

73-
if (indexingPressurePercent <= softLimit) {
74-
for (ADResultWriteRequest resultWriteRequest : results) {
75-
addResult(bulkRequest, resultWriteRequest.getResult(), resultWriteRequest.getResultIndex());
110+
for (ADResultWriteRequest resultWriteRequest : results) {
111+
AnomalyResult result = resultWriteRequest.getResult();
112+
String resultIndex = resultWriteRequest.getResultIndex();
113+
114+
if (shouldAddResult(indexingPressurePercent, result)) {
115+
addResult(bulkRequest, result, resultIndex);
116+
if (resultWriteRequest.getFlattenResultIndex() != null) {
117+
addResult(bulkRequest, result, resultWriteRequest.getFlattenResultIndex());
118+
}
76119
}
120+
}
121+
122+
return bulkRequest;
123+
}
124+
125+
private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult result) {
126+
if (indexingPressurePercent <= softLimit) {
127+
// Always add when below soft limit
128+
return true;
77129
} else if (indexingPressurePercent <= hardLimit) {
78130
// exceed soft limit (60%) but smaller than hard limit (90%)
79131
float acceptProbability = 1 - indexingPressurePercent;
80-
for (ADResultWriteRequest resultWriteRequest : results) {
81-
AnomalyResult result = resultWriteRequest.getResult();
82-
if (result.isHighPriority() || random.nextFloat() < acceptProbability) {
83-
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
84-
}
85-
}
132+
return result.isHighPriority() || random.nextFloat() < acceptProbability;
86133
} else {
87134
// if exceeding hard limit, only index non-zero grade or error result
88-
for (ADResultWriteRequest resultWriteRequest : results) {
89-
AnomalyResult result = resultWriteRequest.getResult();
90-
if (result.isHighPriority()) {
91-
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
92-
}
93-
}
135+
return result.isHighPriority();
94136
}
95-
96-
return bulkRequest;
97137
}
98138

99139
private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {

src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
7272
config.getId(),
7373
RequestPriority.MEDIUM,
7474
result,
75-
config.getCustomResultIndexOrAlias()
75+
config.getCustomResultIndexOrAlias(),
76+
config.getFlattenResultIndexAlias()
7677
);
7778
}
7879

src/main/java/org/opensearch/forecast/model/Forecaster.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ public static Forecaster parse(
437437
case RESULT_INDEX_FIELD_TTL:
438438
customResultIndexTTL = parser.intValue();
439439
break;
440-
case FLATTEN_RESULT_INDEX_MAPPING:
440+
case FLATTEN_CUSTOM_RESULT_INDEX:
441441
flattenResultIndexMapping = parser.booleanValue();
442442
break;
443443
case BREAKING_UI_CHANGE_TIME:

src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ public ForecastResultWriteRequest(
2525
String forecasterId,
2626
RequestPriority priority,
2727
ForecastResult result,
28-
String resultIndex
28+
String resultIndex,
29+
String flattenResultIndex
2930
) {
30-
super(expirationEpochMs, forecasterId, priority, result, resultIndex);
31+
super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex);
3132
}
3233

3334
public ForecastResultWriteRequest(StreamInput in) throws IOException {

src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ protected ForecastResultWriteRequest createResultWriteRequest(
103103
String configId,
104104
RequestPriority priority,
105105
ForecastResult result,
106-
String resultIndex
106+
String resultIndex,
107+
String flattenResultIndex
107108
) {
108-
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
109+
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
109110
}
110111
}

src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void saveResult(ForecastResult result, Config config) {
8383
config.getId(),
8484
RequestPriority.MEDIUM,
8585
result,
86-
config.getCustomResultIndexOrAlias()
86+
config.getCustomResultIndexOrAlias(),
87+
config.getFlattenResultIndexAlias()
8788
)
8889
);
8990
}

src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca
7676
config.getId(),
7777
RequestPriority.MEDIUM,
7878
result,
79-
config.getCustomResultIndexOrAlias()
79+
config.getCustomResultIndexOrAlias(),
80+
config.getFlattenResultIndexAlias()
8081
);
8182
}
8283
}

src/main/java/org/opensearch/timeseries/constant/CommonMessages.java

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
4444
public static String FAIL_TO_FIND_CONFIG_MSG = "Can't find config with id: ";
4545
public static final String CAN_NOT_CHANGE_CATEGORY_FIELD = "Can't change category field";
4646
public static final String CAN_NOT_CHANGE_CUSTOM_RESULT_INDEX = "Can't change custom result index";
47+
public static final String CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX = "Can't change flatten result index";
4748
public static final String CATEGORICAL_FIELD_TYPE_ERR_MSG = "Categorical field %s must be of type keyword or ip.";
4849
// Modifying message for FEATURE below may break the parseADValidationException method of ValidateAnomalyDetectorTransportAction
4950
public static final String FEATURE_INVALID_MSG_PREFIX = "Feature has an invalid query";

src/main/java/org/opensearch/timeseries/indices/IndexManagement.java

+58-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
package org.opensearch.timeseries.indices;
1313

14+
import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings;
1415
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
1516
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;
1617

@@ -89,6 +90,7 @@
8990
import org.opensearch.timeseries.settings.TimeSeriesSettings;
9091
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
9192

93+
import com.fasterxml.jackson.databind.ObjectMapper;
9294
import com.google.common.base.Charsets;
9395
import com.google.common.io.Resources;
9496

@@ -136,6 +138,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
136138
private NamedXContentRegistry xContentRegistry;
137139
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
138140
protected String customResultIndexPrefix;
141+
private final ObjectMapper objectMapper = new ObjectMapper();
139142

140143
protected class IndexState {
141144
// keep track of whether the mapping version is up-to-date
@@ -272,6 +275,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc
272275
return Resources.toString(url, Charsets.UTF_8);
273276
}
274277

278+
public static String getScripts(String scriptFileRelativePath) throws IOException {
279+
URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath);
280+
return Resources.toString(url, Charsets.UTF_8);
281+
}
282+
275283
protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) {
276284
request
277285
.settings(
@@ -1008,6 +1016,45 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
10081016
}
10091017
}
10101018

1019+
/**
1020+
* creates flattened result index
1021+
* @param flattenedResultIndexAlias the flattened result index alias
1022+
* @param actionListener the action listener
1023+
*/
1024+
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener) {
1025+
try {
1026+
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
1027+
logger.info("Initializing flattened result index: {}", indexName);
1028+
1029+
CreateIndexRequest request = new CreateIndexRequest(indexName)
1030+
.mapping(getFlattenedResultMappings(), XContentType.JSON)
1031+
.settings(settings);
1032+
1033+
if (flattenedResultIndexAlias != null) {
1034+
request.alias(new Alias(flattenedResultIndexAlias));
1035+
}
1036+
1037+
choosePrimaryShards(request, false);
1038+
1039+
adminClient.indices().create(request, ActionListener.wrap(response -> {
1040+
if (response.isAcknowledged()) {
1041+
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
1042+
actionListener.onResponse(response);
1043+
} else {
1044+
String errorMsg = "Index creation not acknowledged for index: " + indexName;
1045+
logger.error(errorMsg);
1046+
actionListener.onFailure(new IllegalStateException(errorMsg));
1047+
}
1048+
}, exception -> {
1049+
logger.error("Failed to create flattened result index: {}", indexName, exception);
1050+
actionListener.onFailure(exception);
1051+
}));
1052+
} catch (Exception e) {
1053+
logger.error("Error while initializing flattened result index: {}", flattenedResultIndexAlias, e);
1054+
actionListener.onFailure(e);
1055+
}
1056+
}
1057+
10111058
public <T> void validateCustomIndexForBackendJob(
10121059
String resultIndexOrAlias,
10131060
String securityLogId,
@@ -1252,15 +1299,18 @@ protected void rolloverAndDeleteHistoryIndex(
12521299
}
12531300

12541301
// perform rollover and delete on found custom result index alias
1255-
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));
1302+
candidateResultAliases.forEach(config -> {
1303+
handleResultIndexRolloverAndDelete(config.getCustomResultIndexOrAlias(), config, resultIndex);
1304+
if (config.getFlattenResultIndexMapping()) {
1305+
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();
1306+
handleResultIndexRolloverAndDelete(flattenedResultIndexAlias, config, resultIndex);
1307+
}
1308+
});
12561309
}, e -> { logger.error("Failed to get configs with custom result index alias.", e); }));
12571310
}
12581311

1259-
private void handleCustomResultIndex(Config config, IndexType resultIndex) {
1260-
RolloverRequest rolloverRequest = buildRolloverRequest(
1261-
config.getCustomResultIndexOrAlias(),
1262-
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
1263-
);
1312+
private void handleResultIndexRolloverAndDelete(String indexAlias, Config config, IndexType resultIndex) {
1313+
RolloverRequest rolloverRequest = buildRolloverRequest(indexAlias, getCustomResultIndexPattern(indexAlias));
12641314

12651315
// add rollover conditions if found in config
12661316
if (config.getCustomResultIndexMinAge() != null) {
@@ -1272,9 +1322,9 @@ private void handleCustomResultIndex(Config config, IndexType resultIndex) {
12721322

12731323
// perform rollover and delete on custom result index alias
12741324
proceedWithRolloverAndDelete(
1275-
config.getCustomResultIndexOrAlias(),
1325+
indexAlias,
12761326
rolloverRequest,
1277-
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
1327+
getAllCustomResultIndexPattern(indexAlias),
12781328
resultIndex,
12791329
config.getCustomResultIndexTTL()
12801330
);

0 commit comments

Comments
 (0)