Skip to content

Commit 051acf0

Browse files
committed
**Make Custom Result Index Name an Alias**
Previously, we used configured custom result index names as the index name, creating the index only when a detector was created. This approach caused several problems for index management: 1. **Index State Management (ISM) Incompatibility**: ISM requires an alias for rolling over an index, but we only had the index name. 2. **Index Recognition Issue**: Even if we managed to roll over an index, AD could not recognize it because the index would not be recreated unless the detector was recreated. 3. **Result History and Top Anomaly Results**: The result history on the dashboard and top anomaly results were reading from a single index instead of an index pattern. Thus, after rolling over an index, the result history and top anomaly results would be lost. This PR addresses these issues: 1. **Custom Result Index Creation**: We now create a custom result index with the name `<custom-name-history-{now/d}-1>` and use the alias `custom-name` to point to it. 2. **Index Recreation**: We recreate an index when failing to find a result index, updating the configured alias to point to the new index. This ensures continuity when an index is rolled over and new indices do not exist. 3. **Query Index Pattern**: The top anomaly result API now queries an index pattern instead of a single index. The result history on the dashboard follows the same approach. The dashboard code will be posted in a separate PR. Additionally, this PR updates the custom result index mapping when the mapping is outdated, similar to existing logic on the default result index. **Testing Done**: * Successfully updated the mapping of the custom result index when the mapping is outdated. * Verified that the frontend can still see old and new results after a rollover. * Confirmed that the backend can still write to new indices after a rollover. * Ensured all existing tests pass. More tests will be added in the following PRs. Signed-off-by: Kaituo Li <kaituo@amazon.com>
1 parent 9eed2ff commit 051acf0

File tree

70 files changed

+1024
-428
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1024
-428
lines changed

.github/workflows/benchmark.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ jobs:
5252
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
5353
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
5454
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
55-
-Dtests.timeoutSuite=3600000! -Dtests.logs=true"
55+
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
5656
;;
5757
hc)
5858
su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \

src/main/java/org/opensearch/ad/ADJobProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected void validateResultIndexAndRunJob(
8181
ExecuteADResultResponseRecorder recorder,
8282
Config detector
8383
) {
84-
String resultIndex = jobParameter.getCustomResultIndex();
84+
String resultIndex = jobParameter.getCustomResultIndexOrAlias();
8585
if (resultIndex == null) {
8686
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
8787
return;

src/main/java/org/opensearch/ad/indices/ADIndex.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public enum ADIndex implements TimeSeriesIndex {
3737
false,
3838
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getCheckpointMappings)
3939
),
40-
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings));
40+
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)),
41+
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),;
4142

4243
private final String indexName;
4344
// whether we use an alias for the index
@@ -64,10 +65,4 @@ public boolean isAlias() {
6465
public String getMapping() {
6566
return mapping;
6667
}
67-
68-
@Override
69-
public boolean isJobIndex() {
70-
return CommonName.JOB_INDEX.equals(indexName);
71-
}
72-
7368
}

src/main/java/org/opensearch/ad/indices/ADIndexManagement.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
import org.opensearch.action.delete.DeleteRequest;
3131
import org.opensearch.action.index.IndexRequest;
3232
import org.opensearch.ad.constant.ADCommonName;
33+
import org.opensearch.ad.model.AnomalyDetector;
3334
import org.opensearch.ad.model.AnomalyResult;
3435
import org.opensearch.client.Client;
3536
import org.opensearch.cluster.service.ClusterService;
3637
import org.opensearch.common.settings.Settings;
3738
import org.opensearch.common.xcontent.XContentType;
3839
import org.opensearch.core.action.ActionListener;
40+
import org.opensearch.core.xcontent.NamedXContentRegistry;
3941
import org.opensearch.core.xcontent.ToXContent;
4042
import org.opensearch.core.xcontent.XContentBuilder;
4143
import org.opensearch.threadpool.ThreadPool;
@@ -64,6 +66,7 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
6466
* @param settings OS cluster setting
6567
* @param nodeFilter Used to filter eligible nodes to host AD indices
6668
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
69+
* @param xContentRegistry registry for json parser
6770
* @throws IOException
6871
*/
6972
public ADIndexManagement(
@@ -72,7 +75,8 @@ public ADIndexManagement(
7275
ThreadPool threadPool,
7376
Settings settings,
7477
DiscoveryNodeFilterer nodeFilter,
75-
int maxUpdateRunningTimes
78+
int maxUpdateRunningTimes,
79+
NamedXContentRegistry xContentRegistry
7680
)
7781
throws IOException {
7882
super(
@@ -87,7 +91,10 @@ public ADIndexManagement(
8791
AD_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings),
8892
AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD.get(settings),
8993
AD_RESULT_HISTORY_RETENTION_PERIOD.get(settings),
90-
ADIndex.RESULT.getMapping()
94+
ADIndex.RESULT.getMapping(),
95+
xContentRegistry,
96+
AnomalyDetector::parse,
97+
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*"
9198
);
9299
this.clusterService.addLocalNodeClusterManagerListener(this);
93100

@@ -181,7 +188,7 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
181188
AD_RESULT_HISTORY_INDEX_PATTERN,
182189
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
183190
true,
184-
AD_RESULT_HISTORY_INDEX_PATTERN,
191+
true,
185192
ADIndex.RESULT,
186193
actionListener
187194
);
@@ -270,6 +277,6 @@ protected DeleteRequest createDummyDeleteRequest(String resultIndex) throws IOEx
270277

271278
@Override
272279
public void initCustomResultIndexDirectly(String resultIndex, ActionListener<CreateIndexResponse> actionListener) {
273-
initResultIndexDirectly(resultIndex, null, false, AD_RESULT_HISTORY_INDEX_PATTERN, ADIndex.RESULT, actionListener);
280+
initResultIndexDirectly(getCustomResultIndexPattern(resultIndex), resultIndex, false, false, ADIndex.RESULT, actionListener);
274281
}
275282
}

src/main/java/org/opensearch/ad/model/ADTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
336336
detector.getLastUpdateTime(),
337337
detector.getCategoryFields(),
338338
detector.getUser(),
339-
detector.getCustomResultIndex(),
339+
detector.getCustomResultIndexOrAlias(),
340340
detector.getImputationOption(),
341341
detector.getRecencyEmphasis(),
342342
detector.getSeasonIntervals(),

src/main/java/org/opensearch/ad/model/AnomalyDetector.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
265265
} else {
266266
this.uiMetadata = null;
267267
}
268-
customResultIndex = input.readOptionalString();
268+
customResultIndexOrAlias = input.readOptionalString();
269269
if (input.readBoolean()) {
270270
this.imputationOption = new ImputationOption(input);
271271
} else {
@@ -326,7 +326,7 @@ public void writeTo(StreamOutput output) throws IOException {
326326
} else {
327327
output.writeBoolean(false);
328328
}
329-
output.writeOptionalString(customResultIndex);
329+
output.writeOptionalString(customResultIndexOrAlias);
330330
if (imputationOption != null) {
331331
output.writeBoolean(true);
332332
imputationOption.writeTo(output);

src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void saveResult(AnomalyResult result, Config config) {
8787
config.getId(),
8888
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
8989
result,
90-
config.getCustomResultIndex()
90+
config.getCustomResultIndexOrAlias()
9191
)
9292
);
9393
}

src/main/java/org/opensearch/ad/rest/AbstractADSearchAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.opensearch.ad.constant.ADCommonMessages;
1414
import org.opensearch.ad.settings.ADEnabledSetting;
1515
import org.opensearch.core.xcontent.ToXContentObject;
16-
import org.opensearch.timeseries.AbstractSearchAction;
16+
import org.opensearch.timeseries.rest.AbstractSearchAction;
1717

1818
public abstract class AbstractADSearchAction<T extends ToXContentObject> extends AbstractSearchAction<T> {
1919

src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ protected AnomalyDetector copyConfig(User user, Config config) {
231231
Instant.now(),
232232
config.getCategoryFields(),
233233
user,
234-
config.getCustomResultIndex(),
234+
config.getCustomResultIndexOrAlias(),
235235
config.getImputationOption(),
236236
config.getRecencyEmphasis(),
237237
config.getSeasonIntervals(),

src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,7 @@ private void detectAnomaly(
11581158
user = adTask.getUser().getName();
11591159
roles = adTask.getUser().getRoles();
11601160
}
1161-
String resultIndex = adTask.getDetector().getCustomResultIndex();
1161+
String resultIndex = adTask.getDetector().getCustomResultIndexOrAlias();
11621162

11631163
if (resultIndex == null) {
11641164
// if result index is null, store anomaly result directly

src/main/java/org/opensearch/ad/task/ADTaskManager.java

+48-13
Original file line numberDiff line numberDiff line change
@@ -571,31 +571,66 @@ protected void scaleTaskLaneOnCoordinatingNode(
571571
TransportService transportService,
572572
ActionListener<JobResponse> listener
573573
) {
574-
DiscoveryNode coordinatingNode = getCoordinatingNode(adTask);
575574
transportService
576575
.sendRequest(
577-
coordinatingNode,
576+
getCoordinatingNode(adTask),
578577
ForwardADTaskAction.NAME,
579578
new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS),
580579
transportRequestOptions,
581580
new ActionListenerResponseHandler<>(listener, JobResponse::new)
582581
);
583582
}
584583

584+
/**
585+
* Retrieves the coordinating node for the given ADTask.
586+
*
587+
* This method looks for a node in the list of eligible data nodes that matches the coordinating node ID
588+
* and version specified in the ADTask. The matching criteria are:
589+
* 1. The node ID must be equal to the coordinating node ID.
590+
* 2. Both node versions must be either null or equal.
591+
*
592+
* If the coordinating node ID and the local node have different software versions, this method will
593+
* throw a ResourceNotFoundException.
594+
*
595+
* @param adTask the ADTask containing the coordinating node information.
596+
* @return a DiscoveryNode containing the matching DiscoveryNode if found, or throws ResourceNotFoundException if no match is found.
597+
* The caller is supposed to handle the thrown exception.
598+
* @throws ResourceNotFoundException if the coordinating node has a different version than the local node or if the coordinating node is not found.
599+
*/
585600
private DiscoveryNode getCoordinatingNode(ADTask adTask) {
586-
String coordinatingNode = adTask.getCoordinatingNode();
587-
DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes();
588-
DiscoveryNode targetNode = null;
589-
for (DiscoveryNode node : eligibleDataNodes) {
590-
if (node.getId().equals(coordinatingNode)) {
591-
targetNode = node;
592-
break;
601+
try {
602+
String coordinatingNodeId = adTask.getCoordinatingNode();
603+
Version coordinatingNodeVersion = hashRing.getVersion(coordinatingNodeId);
604+
Version localNodeVersion = hashRing.getVersion(clusterService.localNode().getId());
605+
if (!isSameVersion(coordinatingNodeVersion, localNodeVersion)) {
606+
throw new ResourceNotFoundException(
607+
adTask.getConfigId(),
608+
"AD task coordinating node has different version than local node"
609+
);
593610
}
594-
}
595-
if (targetNode == null) {
611+
612+
DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes();
613+
614+
for (DiscoveryNode node : eligibleDataNodes) {
615+
String nodeId = node.getId();
616+
if (nodeId == null) {
617+
continue;
618+
}
619+
620+
if (nodeId.equals(coordinatingNodeId)) {
621+
return node;
622+
}
623+
}
624+
625+
throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
626+
} catch (Exception e) {
627+
logger.error("Error locating coordinating node", e);
596628
throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
597629
}
598-
return targetNode;
630+
}
631+
632+
private boolean isSameVersion(Version version1, Version version2) {
633+
return (version1 == null && version2 == null) || (version1 != null && version2 != null && version1.compareTo(version2) == 0);
599634
}
600635

601636
@Override
@@ -791,7 +826,7 @@ public <T> void cleanConfigCache(
791826
} catch (ResourceNotFoundException e) {
792827
logger
793828
.warn(
794-
"Task coordinating node left cluster, taskId: {}, detectorId: {}, coordinatingNode: {}",
829+
"Task coordinating node left cluster or has different software version, taskId: {}, detectorId: {}, coordinatingNode: {}",
795830
taskId,
796831
detectorId,
797832
coordinatingNode

src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public ADResultBulkTransportAction(
6868
@Override
6969
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
7070
BulkRequest bulkRequest = new BulkRequest();
71-
List<ADResultWriteRequest> results = request.getAnomalyResults();
71+
List<ADResultWriteRequest> results = request.getResults();
7272

7373
if (indexingPressurePercent <= softLimit) {
7474
for (ADResultWriteRequest resultWriteRequest : results) {

src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
8080
config.getId(),
8181
RequestPriority.MEDIUM,
8282
result,
83-
config.getCustomResultIndex()
83+
config.getCustomResultIndexOrAlias()
8484
);
8585
}
8686

src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,10 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio
301301
SearchRequest searchRequest = generateSearchRequest(request);
302302

303303
// Adding search over any custom result indices
304-
String rawCustomResultIndex = getAdResponse.getDetector().getCustomResultIndex();
305-
String customResultIndex = rawCustomResultIndex == null ? null : rawCustomResultIndex.trim();
306-
if (!Strings.isNullOrEmpty(customResultIndex)) {
307-
searchRequest.indices(defaultIndex, customResultIndex);
304+
String rawCustomResultIndexPattern = getAdResponse.getDetector().getCustomResultIndexPattern();
305+
String customResultIndexPattern = rawCustomResultIndexPattern == null ? null : rawCustomResultIndexPattern.trim();
306+
if (!Strings.isNullOrEmpty(customResultIndexPattern)) {
307+
searchRequest.indices(defaultIndex, customResultIndexPattern);
308308
}
309309

310310
// Utilizing the existing search() from SearchHandler to handle security permissions. Both user role
@@ -321,7 +321,7 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio
321321
clock.millis() + TOP_ANOMALY_RESULT_TIMEOUT_IN_MILLIS,
322322
request.getSize(),
323323
orderType,
324-
customResultIndex
324+
customResultIndexPattern
325325
)
326326
);
327327

src/main/java/org/opensearch/ad/transport/handler/ADIndexMemoryPressureAwareResultHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.apache.logging.log4j.Logger;
1616
import org.opensearch.ad.indices.ADIndex;
1717
import org.opensearch.ad.indices.ADIndexManagement;
18+
import org.opensearch.ad.model.AnomalyResult;
19+
import org.opensearch.ad.ratelimit.ADResultWriteRequest;
1820
import org.opensearch.ad.transport.ADResultBulkAction;
1921
import org.opensearch.ad.transport.ADResultBulkRequest;
2022
import org.opensearch.client.Client;
@@ -27,7 +29,7 @@
2729
import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler;
2830

2931
public class ADIndexMemoryPressureAwareResultHandler extends
30-
IndexMemoryPressureAwareResultHandler<ADResultBulkRequest, ResultBulkResponse, ADIndex, ADIndexManagement> {
32+
IndexMemoryPressureAwareResultHandler<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest, ResultBulkResponse, ADIndex, ADIndexManagement> {
3133
private static final Logger LOG = LogManager.getLogger(ADIndexMemoryPressureAwareResultHandler.class);
3234

3335
@Inject

src/main/java/org/opensearch/forecast/ForecastJobProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected void validateResultIndexAndRunJob(
8585
Exception exception = new EndRunException(configId, e.getMessage(), false);
8686
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector);
8787
});
88-
String resultIndex = jobParameter.getCustomResultIndex();
88+
String resultIndex = jobParameter.getCustomResultIndexOrAlias();
8989
if (resultIndex == null) {
9090
indexManagement.validateDefaultResultIndexForBackendJob(configId, user, roles, () -> {
9191
listener.onResponse(true);

src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java

-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public class ForecastCommonMessages {
4646
// ======================================
4747
// Used for custom forecast result index
4848
// ======================================
49-
public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index ";
5049
public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX;
5150

5251
// ======================================

src/main/java/org/opensearch/forecast/indices/ForecastIndex.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public enum ForecastIndex implements TimeSeriesIndex {
3737
ForecastCommonName.FORECAST_STATE_INDEX,
3838
false,
3939
ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getStateMappings)
40-
);
40+
),
41+
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getResultMappings));
4142

4243
private final String indexName;
4344
// whether we use an alias for the index
@@ -64,9 +65,4 @@ public boolean isAlias() {
6465
public String getMapping() {
6566
return mapping;
6667
}
67-
68-
@Override
69-
public boolean isJobIndex() {
70-
return CommonName.JOB_INDEX.equals(indexName);
71-
}
7268
}

0 commit comments

Comments
 (0)