Skip to content

Commit cf8bed4

Browse files
committed
utlizing a node state manager when writing results into flattened result index
Signed-off-by: Jackie Han <hnyng@amazon.com>
1 parent a623fcf commit cf8bed4

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@
3131
import org.opensearch.cluster.service.ClusterService;
3232
import org.opensearch.common.inject.Inject;
3333
import org.opensearch.common.settings.Settings;
34+
import org.opensearch.core.action.ActionListener;
3435
import org.opensearch.core.xcontent.XContentBuilder;
3536
import org.opensearch.index.IndexingPressure;
37+
import org.opensearch.timeseries.AnalysisType;
38+
import org.opensearch.timeseries.NodeStateManager;
39+
import org.opensearch.timeseries.model.Config;
3640
import org.opensearch.timeseries.transport.ResultBulkTransportAction;
3741
import org.opensearch.timeseries.util.RestHandlerUtils;
3842
import org.opensearch.transport.TransportService;
@@ -50,7 +54,8 @@ public ADResultBulkTransportAction(
5054
IndexingPressure indexingPressure,
5155
Settings settings,
5256
ClusterService clusterService,
53-
Client client
57+
Client client,
58+
NodeStateManager stateManager
5459
) {
5560
super(
5661
ADResultBulkAction.NAME,
@@ -62,7 +67,8 @@ public ADResultBulkTransportAction(
6267
AD_INDEX_PRESSURE_SOFT_LIMIT.get(settings),
6368
AD_INDEX_PRESSURE_HARD_LIMIT.get(settings),
6469
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
65-
ADResultBulkRequest::new
70+
ADResultBulkRequest::new,
71+
stateManager
6672
);
6773
this.clusterService = clusterService;
6874
this.client = client;
@@ -137,9 +143,16 @@ private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult res
137143

138144
private void addToFlattenedIndexIfExists(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
139145
String flattenedResultIndexAlias = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
140-
if (clusterService.state().metadata().hasAlias(flattenedResultIndexAlias)) {
141-
addResult(bulkRequest, result, flattenedResultIndexAlias);
142-
}
146+
String configId = result.getConfigId();
147+
nodeStateManager.getConfig(configId, AnalysisType.AD, ActionListener.wrap(configOptional -> {
148+
if (configOptional.isEmpty()) {
149+
return;
150+
}
151+
Config config = configOptional.get();
152+
if (config.getFlattenResultIndexMapping()) {
153+
addResult(bulkRequest, result, flattenedResultIndexAlias);
154+
}
155+
}, e -> LOG.error("Fail to get config", e)));
143156
}
144157

145158
private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {

src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.forecast.model.ForecastResult;
2727
import org.opensearch.forecast.ratelimit.ForecastResultWriteRequest;
2828
import org.opensearch.index.IndexingPressure;
29+
import org.opensearch.timeseries.NodeStateManager;
2930
import org.opensearch.timeseries.transport.ResultBulkTransportAction;
3031
import org.opensearch.transport.TransportService;
3132

@@ -39,7 +40,8 @@ public ForecastResultBulkTransportAction(
3940
IndexingPressure indexingPressure,
4041
Settings settings,
4142
ClusterService clusterService,
42-
Client client
43+
Client client,
44+
NodeStateManager stateManager
4345
) {
4446
super(
4547
ForecastResultBulkAction.NAME,
@@ -51,7 +53,8 @@ public ForecastResultBulkTransportAction(
5153
FORECAST_INDEX_PRESSURE_SOFT_LIMIT.get(settings),
5254
FORECAST_INDEX_PRESSURE_HARD_LIMIT.get(settings),
5355
ForecastIndex.RESULT.getIndexName(),
54-
ForecastResultBulkRequest::new
56+
ForecastResultBulkRequest::new,
57+
stateManager
5558
);
5659
clusterService.getClusterSettings().addSettingsUpdateConsumer(FORECAST_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
5760
clusterService.getClusterSettings().addSettingsUpdateConsumer(FORECAST_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);

src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.index.IndexingPressure;
3535
import org.opensearch.tasks.Task;
3636
import org.opensearch.threadpool.ThreadPool;
37+
import org.opensearch.timeseries.NodeStateManager;
3738
import org.opensearch.timeseries.model.IndexableResult;
3839
import org.opensearch.timeseries.ratelimit.ResultWriteRequest;
3940
import org.opensearch.timeseries.util.BulkUtil;
@@ -51,6 +52,7 @@ public abstract class ResultBulkTransportAction<ResultType extends IndexableResu
5152
protected String indexName;
5253
private Client client;
5354
protected Random random;
55+
protected NodeStateManager nodeStateManager;
5456

5557
public ResultBulkTransportAction(
5658
String actionName,
@@ -62,7 +64,8 @@ public ResultBulkTransportAction(
6264
float softLimit,
6365
float hardLimit,
6466
String indexName,
65-
Writeable.Reader<ResultBulkRequestType> requestReader
67+
Writeable.Reader<ResultBulkRequestType> requestReader,
68+
NodeStateManager nodeStateManager
6669
) {
6770
super(actionName, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
6871
this.indexingPressure = indexingPressure;
@@ -72,6 +75,7 @@ public ResultBulkTransportAction(
7275
this.softLimit = softLimit;
7376
this.hardLimit = hardLimit;
7477
this.indexName = indexName;
78+
this.nodeStateManager = nodeStateManager;
7579

7680
// random seed is 42. Can be any number
7781
this.random = new Random(42);

src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public void testCreateAnomalyDetector_withFlattenedResultIndex() throws Exceptio
241241
.format(Locale.ROOT, "/opensearch-ad-plugin-result-test_flattened_%s", id.toLowerCase(Locale.ROOT));
242242
// wait for the detector starts writing result
243243
try {
244-
Thread.sleep(60 * 1000);
244+
Thread.sleep(30 * 1000);
245245
} catch (InterruptedException e) {
246246
Thread.currentThread().interrupt();
247247
throw new RuntimeException("Thread was interrupted while waiting", e);

0 commit comments

Comments
 (0)