Skip to content

Commit 352075b

Browse files
committed
clean up
Signed-off-by: Jackie Han <jkhanjob@gmail.com>
1 parent 41d5949 commit 352075b

File tree

8 files changed

+152
-85
lines changed

8 files changed

+152
-85
lines changed

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ systemProp.org.gradle.warning.mode=fail
2727
systemProp.jdk.tls.client.protocols=TLSv1.2
2828

2929
# jvm args for faster test execution by default
30-
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m
30+
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m

gradle/wrapper/gradle-wrapper.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
44
networkTimeout=10000
55
validateDistributionUrl=true
66
zipStoreBase=GRADLE_USER_HOME
7-
zipStorePath=wrapper/dists
7+
zipStorePath=wrapper/dists

src/main/java/org/opensearch/ad/indices/ADIndexManagement.java

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE;
2020
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
2121
import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE;
22-
import static org.opensearch.ad.settings.AnomalyDetectorSettings.FLATTENED_ANOMALY_RESULTS_INDEX_MAPPING_FILE;
2322

2423
import java.io.IOException;
2524
import java.util.EnumMap;

src/main/java/org/opensearch/timeseries/indices/IndexManagement.java

+20-18
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc
273273
return Resources.toString(url, Charsets.UTF_8);
274274
}
275275

276+
public static String getScripts(String scriptFileRelativePath) throws IOException {
277+
URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath);
278+
return Resources.toString(url, Charsets.UTF_8);
279+
}
280+
276281
protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) {
277282
request
278283
.settings(
@@ -1019,26 +1024,23 @@ public void initFlattenedResultIndex(String indexName, ActionListener<CreateInde
10191024
logger.info("Initializing flattened result index: {}", indexName);
10201025

10211026
CreateIndexRequest request = new CreateIndexRequest(indexName)
1022-
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
1023-
.settings(settings);
1027+
.mapping(getFlattenedResultIndexMappings(), XContentType.JSON)
1028+
.settings(settings);
10241029
choosePrimaryShards(request, false);
10251030

1026-
adminClient.indices().create(request, ActionListener.wrap(
1027-
response -> {
1028-
if (response.isAcknowledged()) {
1029-
logger.info("Successfully created flattened result index: {}", indexName);
1030-
actionListener.onResponse(response);
1031-
} else {
1032-
String errorMsg = "Index creation not acknowledged for index: " + indexName;
1033-
logger.error(errorMsg);
1034-
actionListener.onFailure(new IllegalStateException(errorMsg));
1035-
}
1036-
},
1037-
exception -> {
1038-
logger.error("Failed to create flattened result index: {}", indexName, exception);
1039-
actionListener.onFailure(exception);
1040-
}
1041-
));
1031+
adminClient.indices().create(request, ActionListener.wrap(response -> {
1032+
if (response.isAcknowledged()) {
1033+
logger.info("Successfully created flattened result index: {}", indexName);
1034+
actionListener.onResponse(response);
1035+
} else {
1036+
String errorMsg = "Index creation not acknowledged for index: " + indexName;
1037+
logger.error(errorMsg);
1038+
actionListener.onFailure(new IllegalStateException(errorMsg));
1039+
}
1040+
}, exception -> {
1041+
logger.error("Failed to create flattened result index: {}", indexName, exception);
1042+
actionListener.onFailure(exception);
1043+
}));
10421044
}
10431045

10441046
/**

src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java

+61-63
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
99
import static org.opensearch.timeseries.constant.CommonMessages.CATEGORICAL_FIELD_TYPE_ERR_MSG;
1010
import static org.opensearch.timeseries.constant.CommonMessages.TIMESTAMP_VALIDATION_FAILED;
11+
import static org.opensearch.timeseries.indices.IndexManagement.getScripts;
1112
import static org.opensearch.timeseries.util.ParseUtils.parseAggregators;
1213
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
1314
import static org.opensearch.timeseries.util.RestHandlerUtils.isExceptionCausedByInvalidQuery;
@@ -25,14 +26,18 @@
2526
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
2627
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
2728
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
29+
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
2830
import org.opensearch.action.get.GetRequest;
2931
import org.opensearch.action.get.GetResponse;
3032
import org.opensearch.action.index.IndexRequest;
3133
import org.opensearch.action.index.IndexResponse;
34+
import org.opensearch.action.ingest.DeletePipelineRequest;
35+
import org.opensearch.action.ingest.PutPipelineRequest;
3236
import org.opensearch.action.search.SearchRequest;
3337
import org.opensearch.action.search.SearchResponse;
3438
import org.opensearch.action.support.IndicesOptions;
3539
import org.opensearch.action.support.WriteRequest;
40+
import org.opensearch.action.support.master.AcknowledgedResponse;
3641
import org.opensearch.action.support.replication.ReplicationResponse;
3742
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
3843
import org.opensearch.client.Client;
@@ -41,11 +46,14 @@
4146
import org.opensearch.common.unit.TimeValue;
4247
import org.opensearch.common.util.concurrent.ThreadContext;
4348
import org.opensearch.common.xcontent.XContentFactory;
49+
import org.opensearch.common.xcontent.XContentType;
4450
import org.opensearch.commons.authuser.User;
4551
import org.opensearch.core.action.ActionListener;
4652
import org.opensearch.core.action.ActionResponse;
53+
import org.opensearch.core.common.bytes.BytesReference;
4754
import org.opensearch.core.rest.RestStatus;
4855
import org.opensearch.core.xcontent.NamedXContentRegistry;
56+
import org.opensearch.core.xcontent.XContentBuilder;
4957
import org.opensearch.core.xcontent.XContentParser;
5058
import org.opensearch.index.query.BoolQueryBuilder;
5159
import org.opensearch.index.query.QueryBuilder;
@@ -68,6 +76,7 @@
6876
import org.opensearch.timeseries.model.TimeSeriesTask;
6977
import org.opensearch.timeseries.model.ValidationAspect;
7078
import org.opensearch.timeseries.model.ValidationIssueType;
79+
import org.opensearch.timeseries.settings.TimeSeriesSettings;
7180
import org.opensearch.timeseries.task.TaskCacheManager;
7281
import org.opensearch.timeseries.task.TaskManager;
7382
import org.opensearch.timeseries.util.*;
@@ -454,39 +463,35 @@ private void handlePutRequest(boolean indexingDryRun, ActionListener<T> listener
454463
}
455464

456465
private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listener) {
457-
createConfig(indexingDryRun, ActionListener.wrap(
458-
createConfigResponse -> {
459-
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
460-
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
461-
String detectorId = response.getId();
462-
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase();
463-
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase();
464-
465-
timeSeriesIndices.initFlattenedResultIndex(indexName, ActionListener.wrap(
466-
initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(
467-
pipelineResponse -> {
468-
updateResultIndexSetting(pipelineId, indexName, ActionListener.wrap(
469-
updateResponse -> listener.onResponse(createConfigResponse),
470-
listener::onFailure
471-
));
472-
},
473-
listener::onFailure
474-
)),
475-
listener::onFailure
476-
));
477-
} else {
478-
listener.onResponse(createConfigResponse);
479-
}
480-
},
481-
listener::onFailure
482-
));
466+
createConfig(indexingDryRun, ActionListener.wrap(createConfigResponse -> {
467+
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
468+
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
469+
String detectorId = response.getId();
470+
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase();
471+
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase();
472+
473+
timeSeriesIndices
474+
.initFlattenedResultIndex(
475+
indexName,
476+
ActionListener.wrap(initResponse -> setupIngestPipeline(detectorId, ActionListener.wrap(pipelineResponse -> {
477+
updateResultIndexSetting(
478+
pipelineId,
479+
indexName,
480+
ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
481+
);
482+
}, listener::onFailure)), listener::onFailure)
483+
);
484+
} else {
485+
listener.onResponse(createConfigResponse);
486+
}
487+
}, listener::onFailure));
483488
}
484489

485490
private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConfigResponse) {
486491
return !indexingDryRun
487-
&& config.getCustomResultIndexOrAlias() != null
488-
&& config.getFlattenResultIndexMapping()
489-
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
492+
&& config.getCustomResultIndexOrAlias() != null
493+
&& config.getFlattenResultIndexMapping()
494+
&& createConfigResponse instanceof IndexAnomalyDetectorResponse;
490495
}
491496

492497
protected void setupIngestPipeline(String detectorId, ActionListener<T> listener) {
@@ -498,22 +503,19 @@ protected void setupIngestPipeline(String detectorId, ActionListener<T> listener
498503

499504
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);
500505

501-
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(
502-
response -> {
503-
if (response.isAcknowledged()) {
504-
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
505-
listener.onResponse(null);
506-
} else {
507-
String errorMessage = "Ingest pipeline creation was not acknowledged for pipelineId: " + pipelineId;
508-
logger.error(errorMessage);
509-
listener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
510-
}
511-
},
512-
exception -> {
513-
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
514-
listener.onFailure(exception);
515-
}
516-
));
506+
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(response -> {
507+
if (response.isAcknowledged()) {
508+
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
509+
listener.onResponse(null);
510+
} else {
511+
String errorMessage = "Ingest pipeline creation was not acknowledged for pipelineId: " + pipelineId;
512+
logger.error(errorMessage);
513+
listener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
514+
}
515+
}, exception -> {
516+
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
517+
listener.onFailure(exception);
518+
}));
517519

518520
} catch (IOException e) {
519521
logger.error("Exception while building ingest pipeline definition for pipeline ID: {}", pipelineId, e);
@@ -555,22 +557,19 @@ protected void updateResultIndexSetting(String pipelineId, String flattenedResul
555557

556558
updateSettingsRequest.settings(settingsBuilder);
557559

558-
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
559-
response -> {
560-
if (response.isAcknowledged()) {
561-
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId);
562-
listener.onResponse(null);
563-
} else {
564-
String errorMsg = "Settings update not acknowledged for index: " + flattenedResultIndex;
565-
logger.error(errorMsg);
566-
listener.onFailure(new OpenSearchStatusException(errorMsg, RestStatus.INTERNAL_SERVER_ERROR));
567-
}
568-
},
569-
exception -> {
570-
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId, exception);
571-
listener.onFailure(exception);
572-
}
573-
));
560+
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> {
561+
if (response.isAcknowledged()) {
562+
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId);
563+
listener.onResponse(null);
564+
} else {
565+
String errorMsg = "Settings update not acknowledged for index: " + flattenedResultIndex;
566+
logger.error(errorMsg);
567+
listener.onFailure(new OpenSearchStatusException(errorMsg, RestStatus.INTERNAL_SERVER_ERROR));
568+
}
569+
}, exception -> {
570+
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndex, pipelineId, exception);
571+
listener.onFailure(exception);
572+
}));
574573
}
575574

576575
private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
@@ -611,7 +610,6 @@ public void onFailure(Exception e) {
611610
}
612611
}
613612
});
614-
>>>>>>> 2a322387 (add a feature that flattens custom result index when enabled)
615613
}
616614
}
617615

@@ -1018,7 +1016,7 @@ public void onFailure(Exception e) {
10181016
});
10191017
}
10201018

1021-
protected void onCreateMappingsResponse(CreateIndexResponse response, boolean indexingDryRun, ActionListener<T> listener) throws IOException {
1019+
protected void onCreateMappingsResponse(CreateIndexResponse response, boolean indexingDryRun, ActionListener<T> listener) {
10221020
if (response.isAcknowledged()) {
10231021
logger.info("Created {} with mappings.", CommonName.CONFIG_INDEX);
10241022
prepareConfigIndexing(indexingDryRun, listener);

src/main/java/org/opensearch/timeseries/settings/TimeSeriesSettings.java

+2
Original file line numberDiff line numberDiff line change
@@ -289,4 +289,6 @@ public class TimeSeriesSettings {
289289

290290
// max entities to track per detector
291291
public static final int MAX_TRACKING_ENTITIES = 1000000;
292+
293+
public static final String FLATTEN_CUSTOM_RESULT_INDEX_PAINLESS = "scripts/flatten-custom-result-index-painless.txt";
292294
}

src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java

-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.opensearch.action.support.ActionFilters;
2727
import org.opensearch.action.support.HandledTransportAction;
2828
import org.opensearch.client.Client;
29-
import org.opensearch.cluster.service.ClusterService;
3029
import org.opensearch.common.settings.Settings;
3130
import org.opensearch.core.action.ActionListener;
3231
import org.opensearch.core.common.io.stream.Writeable;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Create a map to store the relationship between feature_id and feature_name from feature_data
2+
def featureNameMap = [:];
3+
4+
// Populate the map from feature_data
5+
if (ctx.containsKey('feature_data') && ctx.feature_data != null) {
6+
for (int i = 0; i < ctx.feature_data.length; i++) {
7+
def feature = ctx.feature_data[i];
8+
if (feature != null && feature.containsKey('feature_id') && feature.containsKey('feature_name')) {
9+
featureNameMap[feature.feature_id] = feature.feature_name;
10+
ctx['feature_data_' + feature.feature_name] = feature.data; // Flatten feature_data as before
11+
}
12+
}
13+
}
14+
15+
// Flatten nested entity field
16+
if (ctx.containsKey('entity') && ctx.entity != null) {
17+
for (int i = 0; i < ctx.entity.length; i++) {
18+
def entity = ctx.entity[i];
19+
if (entity != null && entity.containsKey('name') && entity.containsKey('value')) {
20+
ctx['entity_' + entity.name] = entity.value;
21+
}
22+
}
23+
}
24+
25+
// Flatten nested relevant_attribution field
26+
if (ctx.containsKey('relevant_attribution') && ctx.relevant_attribution != null) {
27+
for (int i = 0; i < ctx.relevant_attribution.length; i++) {
28+
def attribution = ctx.relevant_attribution[i];
29+
if (attribution != null && attribution.containsKey('feature_id') && attribution.containsKey('data')) {
30+
def featureName = featureNameMap[attribution.feature_id];
31+
if (featureName != null) {
32+
ctx['relevant_attribution_' + featureName] = attribution.data;
33+
}
34+
}
35+
}
36+
}
37+
38+
// Flatten nested expected_values field
39+
if (ctx.containsKey('expected_values') && ctx.expected_values != null) {
40+
for (int i = 0; i < ctx.expected_values.length; i++) {
41+
def expected = ctx.expected_values[i];
42+
if (expected != null && expected.containsKey('value_list') && expected.value_list != null) {
43+
for (int j = 0; j < expected.value_list.length; j++) {
44+
def value = expected.value_list[j];
45+
if (value != null && value.containsKey('feature_id') && value.containsKey('data')) {
46+
def featureName = featureNameMap[value.feature_id];
47+
if (featureName != null) {
48+
ctx['expected_values_' + featureName] = value.data;
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}
55+
56+
// Flatten nested past_values field
57+
if (ctx.containsKey('past_values') && ctx.past_values != null) {
58+
for (int i = 0; i < ctx.past_values.length; i++) {
59+
def pastValue = ctx.past_values[i];
60+
if (pastValue != null && pastValue.containsKey('feature_id') && pastValue.containsKey('data')) {
61+
def featureName = featureNameMap[pastValue.feature_id];
62+
if (featureName != null) {
63+
ctx['past_value_' + featureName] = pastValue.data;
64+
}
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)