Skip to content

Commit b9345e7

Browse files
authored
Fix exceptions in IntervalCalculation and ResultIndexingHandler (#1379) (#1386)
* Fix race condition in PageListener This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed.  - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task.  - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. * Fix exceptions in IntervalCalculation and ResultIndexingHandler - **IntervalCalculation**: Prevent an `ArrayIndexOutOfBoundsException` by returning early when there are fewer than two timestamps. Previously, the code assumed at least two timestamps, causing an exception when only one was present. - **ResultIndexingHandler**: Handle exceptions from asynchronous calls by logging error messages instead of throwing exceptions. Since the caller does not wait for these asynchronous operations, throwing exceptions had no effect and could lead to unhandled exceptions. Logging provides visibility without disrupting the caller's flow. Testing done: 1. added UT and ITs. --------- Signed-off-by: Kaituo Li <kaituo@amazon.com>
1 parent c95c430 commit b9345e7

12 files changed

+1664
-131
lines changed

build.gradle

-3
Original file line numberDiff line numberDiff line change
@@ -699,9 +699,6 @@ List<String> jacocoExclusions = [
699699

700700
// TODO: add test coverage (kaituo)
701701
'org.opensearch.forecast.*',
702-
'org.opensearch.timeseries.transport.ResultBulkTransportAction',
703-
'org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler',
704-
'org.opensearch.timeseries.transport.handler.ResultIndexingHandler',
705702
'org.opensearch.timeseries.ml.Sample',
706703
'org.opensearch.timeseries.ratelimit.FeatureRequest',
707704
'org.opensearch.ad.transport.ADHCImputeNodeRequest',

release-notes/opensearch-anomaly-detection.release-notes-2.18.0.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Compatible with OpenSearch 2.18.0
77

88
### Bug Fixes
99
* Bump RCF Version and Fix Default Rules Bug in AnomalyDetector ([#1334](https://github.com/opensearch-project/anomaly-detection/pull/1334))
10+
* Fix race condition in PageListener ([#1351](https://github.com/opensearch-project/anomaly-detection/pull/1351))
1011

1112
### Infrastructure
1213
* forward port flaky test fix and add forecasting security tests ([#1329](https://github.com/opensearch-project/anomaly-detection/pull/1329))

src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public ForecastResultBulkTransportAction(
5858
}
5959

6060
@Override
61-
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ForecastResultBulkRequest request) {
61+
public BulkRequest prepareBulkRequest(float indexingPressurePercent, ForecastResultBulkRequest request) {
6262
BulkRequest bulkRequest = new BulkRequest();
6363
List<ForecastResultWriteRequest> results = request.getResults();
6464

src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,9 @@ private void findMinimumInterval(LongBounds timeStampBounds, ActionListener<Inte
252252
.createSearchRequest(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), timeStampBounds, topEntity);
253253
final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(response -> {
254254
List<Long> timestamps = aggregationPrep.getTimestamps(response);
255-
if (timestamps.isEmpty()) {
256-
logger.warn("empty data, return one minute by default");
255+
if (timestamps.size() < 2) {
256+
// to calculate the difference we need at least 2 timestamps
257+
logger.warn("not enough data, return one minute by default");
257258
listener.onResponse(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES));
258259
return;
259260
}

src/main/java/org/opensearch/timeseries/transport/handler/ResultIndexingHandler.java

+60-71
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
3535
import org.opensearch.core.xcontent.XContentBuilder;
3636
import org.opensearch.threadpool.ThreadPool;
37-
import org.opensearch.timeseries.common.exception.EndRunException;
3837
import org.opensearch.timeseries.common.exception.TimeSeriesException;
3938
import org.opensearch.timeseries.indices.IndexManagement;
4039
import org.opensearch.timeseries.indices.TimeSeriesIndex;
@@ -109,88 +108,78 @@ public void setFixedDoc(boolean fixedDoc) {
109108
}
110109

111110
// TODO: check if user has permission to index.
112-
public void index(ResultType toSave, String detectorId, String indexOrAliasName) {
113-
try {
114-
if (indexOrAliasName != null) {
115-
if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, indexOrAliasName)) {
116-
LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId));
117-
return;
118-
}
119-
// We create custom result index when creating a detector. Custom result index can be rolled over and thus we may need to
120-
// create a new one.
121-
if (!timeSeriesIndices.doesIndexExist(indexOrAliasName) && !timeSeriesIndices.doesAliasExist(indexOrAliasName)) {
122-
timeSeriesIndices.initCustomResultIndexDirectly(indexOrAliasName, ActionListener.wrap(response -> {
123-
if (response.isAcknowledged()) {
124-
save(toSave, detectorId, indexOrAliasName);
125-
} else {
126-
throw new TimeSeriesException(
127-
detectorId,
111+
/**
112+
* Run async index operation. Cannot guarantee index is done after finishing executing the function as several calls
113+
* in the method are asynchronous.
114+
* @param toSave Result to save
115+
* @param configId config id
116+
* @param indexOrAliasName custom index or alias name
117+
*/
118+
public void index(ResultType toSave, String configId, String indexOrAliasName) {
119+
if (indexOrAliasName != null) {
120+
if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, indexOrAliasName)) {
121+
LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, configId));
122+
return;
123+
}
124+
// We create custom result index when creating a detector. Custom result index can be rolled over and thus we may need to
125+
// create a new one.
126+
if (!timeSeriesIndices.doesIndexExist(indexOrAliasName) && !timeSeriesIndices.doesAliasExist(indexOrAliasName)) {
127+
timeSeriesIndices.initCustomResultIndexDirectly(indexOrAliasName, ActionListener.wrap(response -> {
128+
if (response.isAcknowledged()) {
129+
save(toSave, configId, indexOrAliasName);
130+
} else {
131+
LOG
132+
.error(
128133
String
129134
.format(
130135
Locale.ROOT,
131136
"Creating custom result index %s with mappings call not acknowledged",
132137
indexOrAliasName
133138
)
134139
);
135-
}
136-
}, exception -> {
137-
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
138-
// It is possible the index has been created while we sending the create request
139-
save(toSave, detectorId, indexOrAliasName);
140-
} else {
141-
throw new TimeSeriesException(
142-
detectorId,
143-
String.format(Locale.ROOT, "cannot create result index %s", indexOrAliasName),
144-
exception
145-
);
146-
}
147-
}));
148-
} else {
149-
timeSeriesIndices.validateResultIndexMapping(indexOrAliasName, ActionListener.wrap(valid -> {
150-
if (!valid) {
151-
throw new EndRunException(detectorId, "wrong index mapping of custom AD result index", true);
152-
} else {
153-
save(toSave, detectorId, indexOrAliasName);
154-
}
155-
}, exception -> {
156-
throw new TimeSeriesException(
157-
detectorId,
158-
String.format(Locale.ROOT, "cannot validate result index %s", indexOrAliasName),
159-
exception
160-
);
161-
}));
162-
}
140+
}
141+
}, exception -> {
142+
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
143+
// It is possible the index has been created while we sending the create request
144+
save(toSave, configId, indexOrAliasName);
145+
} else {
146+
LOG.error(String.format(Locale.ROOT, "cannot create result index %s", indexOrAliasName), exception);
147+
}
148+
}));
163149
} else {
164-
if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.defaultResultIndexName)) {
165-
LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId));
166-
return;
167-
}
168-
if (!timeSeriesIndices.doesDefaultResultIndexExist()) {
169-
timeSeriesIndices
170-
.initDefaultResultIndexDirectly(
171-
ActionListener.wrap(initResponse -> onCreateIndexResponse(initResponse, toSave, detectorId), exception -> {
172-
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
173-
// It is possible the index has been created while we sending the create request
174-
save(toSave, detectorId);
175-
} else {
176-
throw new TimeSeriesException(
177-
detectorId,
150+
timeSeriesIndices.validateResultIndexMapping(indexOrAliasName, ActionListener.wrap(valid -> {
151+
if (!valid) {
152+
LOG.error("wrong index mapping of custom result index");
153+
} else {
154+
save(toSave, configId, indexOrAliasName);
155+
}
156+
}, exception -> { LOG.error(String.format(Locale.ROOT, "cannot validate result index %s", indexOrAliasName), exception); })
157+
);
158+
}
159+
} else {
160+
if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.defaultResultIndexName)) {
161+
LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, configId));
162+
return;
163+
}
164+
if (!timeSeriesIndices.doesDefaultResultIndexExist()) {
165+
timeSeriesIndices
166+
.initDefaultResultIndexDirectly(
167+
ActionListener.wrap(initResponse -> onCreateIndexResponse(initResponse, toSave, configId), exception -> {
168+
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
169+
// It is possible the index has been created while we sending the create request
170+
save(toSave, configId);
171+
} else {
172+
LOG
173+
.error(
178174
String.format(Locale.ROOT, "Unexpected error creating index %s", defaultResultIndexName),
179175
exception
180176
);
181-
}
182-
})
183-
);
184-
} else {
185-
save(toSave, detectorId);
186-
}
177+
}
178+
})
179+
);
180+
} else {
181+
save(toSave, configId);
187182
}
188-
} catch (Exception e) {
189-
throw new TimeSeriesException(
190-
detectorId,
191-
String.format(Locale.ROOT, "Error in saving %s for detector %s", defaultResultIndexName, detectorId),
192-
e
193-
);
194183
}
195184
}
196185

0 commit comments

Comments
 (0)