Skip to content

Commit 590fb5a

Browse files
committed
build flatten resuilt index enabled into the ResultWriteRequest
Signed-off-by: Jackie Han <hnyng@amazon.com>
1 parent cf8bed4 commit 590fb5a

11 files changed

+37
-31
lines changed

src/main/java/org/opensearch/ad/ratelimit/ADResultWriteRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ public ADResultWriteRequest(
2525
String detectorId,
2626
RequestPriority priority,
2727
AnomalyResult result,
28-
String resultIndex
28+
String resultIndex,
29+
boolean flattenResultIndex
2930
) {
30-
super(expirationEpochMs, detectorId, priority, result, resultIndex);
31+
super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex);
3132
}
3233

3334
public ADResultWriteRequest(StreamInput in) throws IOException {

src/main/java/org/opensearch/ad/ratelimit/ADResultWriteWorker.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ protected ADResultWriteRequest createResultWriteRequest(
103103
String configId,
104104
RequestPriority priority,
105105
AnomalyResult result,
106-
String resultIndex
106+
String resultIndex,
107+
boolean flattenResultIndex
107108
) {
108-
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
109+
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
109110
}
110111
}

src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void saveResult(AnomalyResult result, Config config) {
8686
config.getId(),
8787
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
8888
result,
89-
config.getCustomResultIndexOrAlias()
89+
config.getCustomResultIndexOrAlias(),
90+
config.getFlattenResultIndexMapping()
9091
)
9192
);
9293
}

src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java

+4-15
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResult
120120

121121
if (shouldAddResult(indexingPressurePercent, result)) {
122122
addResult(bulkRequest, result, resultIndex);
123-
addToFlattenedIndexIfExists(bulkRequest, result, resultIndex);
123+
if (resultWriteRequest.getFlattenResultIndex()) {
124+
String flattenedResultIndexAlias = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
125+
addResult(bulkRequest, result, flattenedResultIndexAlias);
126+
}
124127
}
125128
}
126129

@@ -141,20 +144,6 @@ private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult res
141144
}
142145
}
143146

144-
private void addToFlattenedIndexIfExists(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
145-
String flattenedResultIndexAlias = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
146-
String configId = result.getConfigId();
147-
nodeStateManager.getConfig(configId, AnalysisType.AD, ActionListener.wrap(configOptional -> {
148-
if (configOptional.isEmpty()) {
149-
return;
150-
}
151-
Config config = configOptional.get();
152-
if (config.getFlattenResultIndexMapping()) {
153-
addResult(bulkRequest, result, flattenedResultIndexAlias);
154-
}
155-
}, e -> LOG.error("Fail to get config", e)));
156-
}
157-
158147
private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
159148
String index = resultIndex == null ? indexName : resultIndex;
160149
try (XContentBuilder builder = jsonBuilder()) {

src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
7272
config.getId(),
7373
RequestPriority.MEDIUM,
7474
result,
75-
config.getCustomResultIndexOrAlias()
75+
config.getCustomResultIndexOrAlias(),
76+
config.getFlattenResultIndexMapping()
7677
);
7778
}
7879

src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteRequest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ public ForecastResultWriteRequest(
2525
String forecasterId,
2626
RequestPriority priority,
2727
ForecastResult result,
28-
String resultIndex
28+
String resultIndex,
29+
boolean flattenResultIndex
2930
) {
30-
super(expirationEpochMs, forecasterId, priority, result, resultIndex);
31+
super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex);
3132
}
3233

3334
public ForecastResultWriteRequest(StreamInput in) throws IOException {

src/main/java/org/opensearch/forecast/ratelimit/ForecastResultWriteWorker.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ protected ForecastResultWriteRequest createResultWriteRequest(
103103
String configId,
104104
RequestPriority priority,
105105
ForecastResult result,
106-
String resultIndex
106+
String resultIndex,
107+
boolean flattenResultIndex
107108
) {
108-
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
109+
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
109110
}
110111
}

src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void saveResult(ForecastResult result, Config config) {
8383
config.getId(),
8484
RequestPriority.MEDIUM,
8585
result,
86-
config.getCustomResultIndexOrAlias()
86+
config.getCustomResultIndexOrAlias(),
87+
config.getFlattenResultIndexMapping()
8788
)
8889
);
8990
}

src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca
7676
config.getId(),
7777
RequestPriority.MEDIUM,
7878
result,
79-
config.getCustomResultIndexOrAlias()
79+
config.getCustomResultIndexOrAlias(),
80+
config.getFlattenResultIndexMapping()
8081
);
8182
}
8283
}

src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteRequest.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@ public abstract class ResultWriteRequest<ResultType extends IndexableResult> ext
2222
private final ResultType result;
2323
// If resultIndex is null, result will be stored in default result index.
2424
private final String resultIndex;
25+
private final boolean flattenResultIndex;
2526

26-
public ResultWriteRequest(long expirationEpochMs, String configId, RequestPriority priority, ResultType result, String resultIndex) {
27+
public ResultWriteRequest(long expirationEpochMs, String configId, RequestPriority priority, ResultType result, String resultIndex, boolean flattenResultIndex) {
2728
super(expirationEpochMs, configId, priority);
2829
this.result = result;
2930
this.resultIndex = resultIndex;
31+
this.flattenResultIndex = flattenResultIndex;
3032
}
3133

32-
public ResultWriteRequest(StreamInput in, Writeable.Reader<ResultType> resultReader) throws IOException {
34+
public ResultWriteRequest(StreamInput in, Reader<ResultType> resultReader) throws IOException {
3335
this.result = resultReader.read(in);
3436
this.resultIndex = in.readOptionalString();
37+
this.flattenResultIndex = in.readBoolean();
3538
}
3639

3740
@Override
@@ -47,4 +50,8 @@ public ResultType getResult() {
4750
public String getResultIndex() {
4851
return resultIndex;
4952
}
53+
54+
public boolean getFlattenResultIndex() {
55+
return flattenResultIndex;
56+
}
5057
}

src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ private ActionListener<Optional<? extends Config>> onGetConfig(
199199
id,
200200
resultToRetry.isHighPriority() ? RequestPriority.HIGH : RequestPriority.MEDIUM,
201201
resultToRetry,
202-
config.getCustomResultIndexOrAlias()
202+
config.getCustomResultIndexOrAlias(),
203+
config.getFlattenResultIndexMapping()
203204
)
204205
);
205206

@@ -216,6 +217,7 @@ protected abstract ResultWriteRequestType createResultWriteRequest(
216217
String configId,
217218
RequestPriority priority,
218219
ResultType result,
219-
String resultIndex
220+
String resultIndex,
221+
boolean flattenResultIndex
220222
);
221223
}

0 commit comments

Comments
 (0)