Skip to content

Commit 1130a1b

Browse files
authored
Refactoring NodeStateManager etc. to support forecasting functionality (opensearch-project#965)
* Refactoring NodeStateManager etc. to support forecasting functionality This commit extends the codebase to support both Anomaly Detection (AD) and forecasting. It contains a mixture of refactoring, renaming, removal of unused code, and package moving tasks. Here are the details: Refactoring: - `NodeStateManager.getAnomalyDetector` is now `getConfig`, with added functionality to fetch a Forecaster. The method comments are updated for clarity. - Existing methods (`getFeatureSamplesForPeriods`, `getColdStartSamplesForPeriods`, `createPreviewSearchRequest`, `getMinDataTime`) have been added in `SearchFeatureDao` to handle forecasting logic. - Adjusted `SecurityClientUtil` and `ParseUtils` to handle forecasting logic. - Cleaned up `NodeState` to differentiate state for AD and forecasting. Renaming: - `AnomalyDetectorJob` is renamed to `Job` to facilitate reuse for forecasting. - `NodeStateManager.getAnomalyDetectorJob` is renamed to `getJob`. - Certain settings in `AnomalyDetectorSettings` are renamed to reflect they are meant for the AD setting. They have been marked as deprecated and new settings are used in `TimeSeriesSettings` instead. - `IndexAnomalyDetectorJobActionHandler.getAnomalyDetectorJobForWrite` is renamed to `getJobForWrite`. - `ADSafeSecurityInjector` is renamed to `TimeSeriesSafeSecurityInjector`. Removing unused code: - Synchronous code in `ClientUtil`, `IndexUtils`, and `CheckpointDao` is removed. - The unused class `Throttler` is deleted. - Mapping file names are changed, and the code referencing these files is adjusted. Package moving: - Several classes (`ClientUtil`, `MultiResponsesDelegateActionListener`, `SafeSecurityInjector`, `SecurityUtil`, `ExceptionUtil`, `SearchFeatureDao`, `CleanState`, `ExpiringState`, `MaintenanceState`, `NodeState`, `SingleStreamModelIdMapper`, `BackPressureRouting`) are moved to the respective `org.opensearch.timeseries` packages. Miscellaneous: - Fixed compiler failures caused by changes in opensearch-project/OpenSearch#8730 by replacing `DoubleArrayList` with `java.util.ArrayList`. - Updates the Backwards Compatibility (bwc) version to align with the core's incremented bwc version as per [OpenSearch PR #8670](opensearch-project/OpenSearch#8670). This change prevents the issue described in [OpenSearch Issue #5076](opensearch-project/OpenSearch#5076). Testing: - Executed a `gradle build`. - Added new tests for `ClientUtil` and `NodeStateManager`. Signed-off-by: Kaituo Li <kaituo@amazon.com> * improve comment Signed-off-by: Kaituo Li <kaituo@amazon.com> * fix compiler error and comments Signed-off-by: Kaituo Li <kaituo@amazon.com> --------- Signed-off-by: Kaituo Li <kaituo@amazon.com>
1 parent f0ed43b commit 1130a1b

File tree

165 files changed

+1708
-1655
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

165 files changed

+1708
-1655
lines changed

build.gradle

+1-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ buildscript {
3434
js_resource_folder = "src/test/resources/job-scheduler"
3535
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
3636
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
37-
bwcVersionShort = "2.9.0"
37+
bwcVersionShort = "2.10.0"
3838
bwcVersion = bwcVersionShort + ".0"
3939
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
4040
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
@@ -672,8 +672,6 @@ List<String> jacocoExclusions = [
672672
'org.opensearch.timeseries.settings.TimeSeriesSettings',
673673
'org.opensearch.forecast.settings.ForecastSettings',
674674

675-
'org.opensearch.ad.util.ClientUtil',
676-
677675
'org.opensearch.ad.transport.CronRequest',
678676
'org.opensearch.ad.AnomalyDetectorRunner',
679677

src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java

+20-18
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,12 @@
3434
import org.opensearch.ad.indices.ADIndexManagement;
3535
import org.opensearch.ad.model.ADTaskState;
3636
import org.opensearch.ad.model.AnomalyDetector;
37-
import org.opensearch.ad.model.AnomalyDetectorJob;
3837
import org.opensearch.ad.settings.AnomalyDetectorSettings;
3938
import org.opensearch.ad.task.ADTaskManager;
4039
import org.opensearch.ad.transport.AnomalyResultAction;
4140
import org.opensearch.ad.transport.AnomalyResultRequest;
4241
import org.opensearch.ad.transport.AnomalyResultResponse;
4342
import org.opensearch.ad.transport.AnomalyResultTransportAction;
44-
import org.opensearch.ad.util.SecurityUtil;
4543
import org.opensearch.client.Client;
4644
import org.opensearch.common.settings.Settings;
4745
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
@@ -58,11 +56,15 @@
5856
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
5957
import org.opensearch.jobscheduler.spi.utils.LockService;
6058
import org.opensearch.threadpool.ThreadPool;
59+
import org.opensearch.timeseries.AnalysisType;
60+
import org.opensearch.timeseries.NodeStateManager;
6161
import org.opensearch.timeseries.common.exception.EndRunException;
6262
import org.opensearch.timeseries.common.exception.InternalFailure;
6363
import org.opensearch.timeseries.common.exception.TimeSeriesException;
6464
import org.opensearch.timeseries.constant.CommonName;
6565
import org.opensearch.timeseries.function.ExecutorFunction;
66+
import org.opensearch.timeseries.model.Job;
67+
import org.opensearch.timeseries.util.SecurityUtil;
6668

6769
import com.google.common.base.Throwables;
6870

@@ -134,12 +136,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
134136
String detectorId = scheduledJobParameter.getName();
135137
log.info("Start to run AD job {}", detectorId);
136138
adTaskManager.refreshRealtimeJobRunTime(detectorId);
137-
if (!(scheduledJobParameter instanceof AnomalyDetectorJob)) {
139+
if (!(scheduledJobParameter instanceof Job)) {
138140
throw new IllegalArgumentException(
139-
"Job parameter is not instance of AnomalyDetectorJob, type: " + scheduledJobParameter.getClass().getCanonicalName()
141+
"Job parameter is not instance of Job, type: " + scheduledJobParameter.getClass().getCanonicalName()
140142
);
141143
}
142-
AnomalyDetectorJob jobParameter = (AnomalyDetectorJob) scheduledJobParameter;
144+
Job jobParameter = (Job) scheduledJobParameter;
143145
Instant executionStartTime = Instant.now();
144146
IntervalSchedule schedule = (IntervalSchedule) jobParameter.getSchedule();
145147
Instant detectionStartTime = executionStartTime.minus(schedule.getInterval(), schedule.getUnit());
@@ -148,12 +150,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
148150

149151
Runnable runnable = () -> {
150152
try {
151-
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
153+
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
152154
if (!detectorOptional.isPresent()) {
153155
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
154156
return;
155157
}
156-
AnomalyDetector detector = detectorOptional.get();
158+
AnomalyDetector detector = (AnomalyDetector) detectorOptional.get();
157159

158160
if (jobParameter.getLockDurationSeconds() != null) {
159161
lockService
@@ -216,7 +218,7 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
216218
* @param detector associated detector accessor
217219
*/
218220
protected void runAdJob(
219-
AnomalyDetectorJob jobParameter,
221+
Job jobParameter,
220222
LockService lockService,
221223
LockModel lock,
222224
Instant detectionStartTime,
@@ -284,7 +286,7 @@ protected void runAdJob(
284286
}
285287

286288
private void runAnomalyDetectionJob(
287-
AnomalyDetectorJob jobParameter,
289+
Job jobParameter,
288290
LockService lockService,
289291
LockModel lock,
290292
Instant detectionStartTime,
@@ -393,7 +395,7 @@ private void runAnomalyDetectionJob(
393395
* @param detector associated detector accessor
394396
*/
395397
protected void handleAdException(
396-
AnomalyDetectorJob jobParameter,
398+
Job jobParameter,
397399
LockService lockService,
398400
LockModel lock,
399401
Instant detectionStartTime,
@@ -482,7 +484,7 @@ protected void handleAdException(
482484
}
483485

484486
private void stopAdJobForEndRunException(
485-
AnomalyDetectorJob jobParameter,
487+
Job jobParameter,
486488
LockService lockService,
487489
LockModel lock,
488490
Instant detectionStartTime,
@@ -524,9 +526,9 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
524526
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString())
525527
) {
526528
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
527-
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
529+
Job job = Job.parse(parser);
528530
if (job.isEnabled()) {
529-
AnomalyDetectorJob newJob = new AnomalyDetectorJob(
531+
Job newJob = new Job(
530532
job.getName(),
531533
job.getSchedule(),
532534
job.getWindowDelay(),
@@ -566,7 +568,7 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
566568
}
567569

568570
private void indexAnomalyResult(
569-
AnomalyDetectorJob jobParameter,
571+
Job jobParameter,
570572
LockService lockService,
571573
LockModel lock,
572574
Instant detectionStartTime,
@@ -590,7 +592,7 @@ private void indexAnomalyResult(
590592
}
591593

592594
private void indexAnomalyResultException(
593-
AnomalyDetectorJob jobParameter,
595+
Job jobParameter,
594596
LockService lockService,
595597
LockModel lock,
596598
Instant detectionStartTime,
@@ -621,7 +623,7 @@ private void indexAnomalyResultException(
621623
}
622624

623625
private void indexAnomalyResultException(
624-
AnomalyDetectorJob jobParameter,
626+
Job jobParameter,
625627
LockService lockService,
626628
LockModel lock,
627629
Instant detectionStartTime,
@@ -646,7 +648,7 @@ private void indexAnomalyResultException(
646648
}
647649

648650
private void indexAnomalyResultException(
649-
AnomalyDetectorJob jobParameter,
651+
Job jobParameter,
650652
LockService lockService,
651653
LockModel lock,
652654
Instant detectionStartTime,
@@ -666,7 +668,7 @@ private void indexAnomalyResultException(
666668
}
667669
}
668670

669-
private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) {
671+
private void releaseLock(Job jobParameter, LockService lockService, LockModel lock) {
670672
lockService
671673
.release(
672674
lock,

src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.opensearch.ad.constant.ADCommonName;
3636
import org.opensearch.ad.model.ADTaskType;
3737
import org.opensearch.ad.model.AnomalyDetector;
38-
import org.opensearch.ad.model.AnomalyDetectorJob;
3938
import org.opensearch.ad.model.DetectorProfile;
4039
import org.opensearch.ad.model.DetectorProfileName;
4140
import org.opensearch.ad.model.DetectorState;
@@ -49,9 +48,6 @@
4948
import org.opensearch.ad.transport.RCFPollingAction;
5049
import org.opensearch.ad.transport.RCFPollingRequest;
5150
import org.opensearch.ad.transport.RCFPollingResponse;
52-
import org.opensearch.ad.util.ExceptionUtil;
53-
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
54-
import org.opensearch.ad.util.SecurityClientUtil;
5551
import org.opensearch.client.Client;
5652
import org.opensearch.cluster.node.DiscoveryNode;
5753
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
@@ -68,11 +64,16 @@
6864
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
6965
import org.opensearch.search.aggregations.metrics.InternalCardinality;
7066
import org.opensearch.search.builder.SearchSourceBuilder;
67+
import org.opensearch.timeseries.AnalysisType;
7168
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
7269
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
7370
import org.opensearch.timeseries.constant.CommonName;
7471
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
72+
import org.opensearch.timeseries.model.Job;
7573
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
74+
import org.opensearch.timeseries.util.ExceptionUtil;
75+
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
76+
import org.opensearch.timeseries.util.SecurityClientUtil;
7677
import org.opensearch.transport.TransportService;
7778

7879
public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
@@ -159,7 +160,7 @@ private void prepareProfile(
159160
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
160161
) {
161162
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
162-
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
163+
Job job = Job.parse(parser);
163164
long enabledTimeMs = job.getEnabledTime().toEpochMilli();
164165

165166
boolean isMultiEntityDetector = detector.isHighCardinality();
@@ -315,6 +316,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
315316
client::search,
316317
detector.getId(),
317318
client,
319+
AnalysisType.AD,
318320
searchResponseListener
319321
);
320322
} else {
@@ -368,6 +370,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
368370
client::search,
369371
detector.getId(),
370372
client,
373+
AnalysisType.AD,
371374
searchResponseListener
372375
);
373376
}
@@ -418,7 +421,7 @@ private void profileStateRelated(
418421
private void profileModels(
419422
AnomalyDetector detector,
420423
Set<DetectorProfileName> profiles,
421-
AnomalyDetectorJob job,
424+
Job job,
422425
boolean forMultiEntityDetector,
423426
MultiResponsesDelegateActionListener<DetectorProfile> listener
424427
) {
@@ -430,7 +433,7 @@ private void profileModels(
430433
private ActionListener<ProfileResponse> onModelResponse(
431434
AnomalyDetector detector,
432435
Set<DetectorProfileName> profilesToCollect,
433-
AnomalyDetectorJob job,
436+
Job job,
434437
MultiResponsesDelegateActionListener<DetectorProfile> listener
435438
) {
436439
boolean isMultientityDetector = detector.isHighCardinality();
@@ -464,7 +467,7 @@ private ActionListener<ProfileResponse> onModelResponse(
464467
}
465468

466469
private void profileMultiEntityDetectorStateRelated(
467-
AnomalyDetectorJob job,
470+
Job job,
468471
Set<DetectorProfileName> profilesToCollect,
469472
ProfileResponse profileResponse,
470473
DetectorProfile.Builder profileBuilder,

src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@
3333
import org.opensearch.ad.model.AnomalyDetector;
3434
import org.opensearch.ad.model.AnomalyResult;
3535
import org.opensearch.ad.model.EntityAnomalyResult;
36-
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
3736
import org.opensearch.common.util.concurrent.ThreadContext;
3837
import org.opensearch.timeseries.model.Entity;
3938
import org.opensearch.timeseries.model.Feature;
4039
import org.opensearch.timeseries.model.FeatureData;
40+
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
4141

4242
/**
4343
* Runner to trigger an anomaly detector.

src/main/java/org/opensearch/ad/EntityProfileRunner.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.opensearch.ad.constant.ADCommonMessages;
2929
import org.opensearch.ad.constant.ADCommonName;
3030
import org.opensearch.ad.model.AnomalyDetector;
31-
import org.opensearch.ad.model.AnomalyDetectorJob;
3231
import org.opensearch.ad.model.AnomalyResult;
3332
import org.opensearch.ad.model.EntityProfile;
3433
import org.opensearch.ad.model.EntityProfileName;
@@ -38,8 +37,6 @@
3837
import org.opensearch.ad.transport.EntityProfileAction;
3938
import org.opensearch.ad.transport.EntityProfileRequest;
4039
import org.opensearch.ad.transport.EntityProfileResponse;
41-
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
42-
import org.opensearch.ad.util.SecurityClientUtil;
4340
import org.opensearch.client.Client;
4441
import org.opensearch.cluster.routing.Preference;
4542
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
@@ -53,11 +50,15 @@
5350
import org.opensearch.index.query.TermQueryBuilder;
5451
import org.opensearch.search.aggregations.AggregationBuilders;
5552
import org.opensearch.search.builder.SearchSourceBuilder;
53+
import org.opensearch.timeseries.AnalysisType;
5654
import org.opensearch.timeseries.constant.CommonMessages;
5755
import org.opensearch.timeseries.constant.CommonName;
5856
import org.opensearch.timeseries.model.Entity;
5957
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
58+
import org.opensearch.timeseries.model.Job;
59+
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
6060
import org.opensearch.timeseries.util.ParseUtils;
61+
import org.opensearch.timeseries.util.SecurityClientUtil;
6162

6263
public class EntityProfileRunner extends AbstractProfileRunner {
6364
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
@@ -188,6 +189,7 @@ private void validateEntity(
188189
client::search,
189190
detector.getId(),
190191
client,
192+
AnalysisType.AD,
191193
searchResponseListener
192194
);
193195

@@ -228,7 +230,7 @@ private void getJob(
228230
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
229231
) {
230232
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
231-
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
233+
Job job = Job.parse(parser);
232234

233235
int totalResponsesToWait = 0;
234236
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
@@ -331,7 +333,7 @@ private void profileStateRelated(
331333
Entity entityValue,
332334
Set<EntityProfileName> profilesToCollect,
333335
AnomalyDetector detector,
334-
AnomalyDetectorJob job,
336+
Job job,
335337
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
336338
) {
337339
if (totalUpdates == 0) {

src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,22 @@
3838
import org.opensearch.ad.transport.RCFPollingAction;
3939
import org.opensearch.ad.transport.RCFPollingRequest;
4040
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
41-
import org.opensearch.ad.util.ExceptionUtil;
4241
import org.opensearch.client.Client;
4342
import org.opensearch.cluster.node.DiscoveryNode;
4443
import org.opensearch.common.unit.TimeValue;
4544
import org.opensearch.commons.authuser.User;
4645
import org.opensearch.search.SearchHits;
4746
import org.opensearch.threadpool.ThreadPool;
47+
import org.opensearch.timeseries.AnalysisType;
48+
import org.opensearch.timeseries.NodeStateManager;
4849
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
4950
import org.opensearch.timeseries.common.exception.EndRunException;
5051
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
5152
import org.opensearch.timeseries.common.exception.TimeSeriesException;
5253
import org.opensearch.timeseries.model.FeatureData;
5354
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
5455
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
56+
import org.opensearch.timeseries.util.ExceptionUtil;
5557

5658
public class ExecuteADResultResponseRecorder {
5759
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);
@@ -337,20 +339,20 @@ private void confirmTotalRCFUpdatesFound(
337339
String error,
338340
ActionListener<Long> listener
339341
) {
340-
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
342+
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
341343
if (!detectorOptional.isPresent()) {
342344
listener.onFailure(new TimeSeriesException(detectorId, "fail to get detector"));
343345
return;
344346
}
345-
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
347+
nodeStateManager.getJob(detectorId, ActionListener.wrap(jobOptional -> {
346348
if (!jobOptional.isPresent()) {
347349
listener.onFailure(new TimeSeriesException(detectorId, "fail to get job"));
348350
return;
349351
}
350352

351353
ProfileUtil
352354
.confirmDetectorRealtimeInitStatus(
353-
detectorOptional.get(),
355+
(AnomalyDetector) detectorOptional.get(),
354356
jobOptional.get().getEnabledTime().toEpochMilli(),
355357
client,
356358
ActionListener.wrap(searchResponse -> {

src/main/java/org/opensearch/ad/caching/CacheBuffer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.commons.lang.builder.HashCodeBuilder;
2626
import org.apache.logging.log4j.LogManager;
2727
import org.apache.logging.log4j.Logger;
28-
import org.opensearch.ad.ExpiringState;
2928
import org.opensearch.ad.MemoryTracker;
3029
import org.opensearch.ad.MemoryTracker.Origin;
3130
import org.opensearch.ad.ml.EntityModel;
@@ -36,6 +35,7 @@
3635
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
3736
import org.opensearch.ad.ratelimit.RequestPriority;
3837
import org.opensearch.ad.util.DateUtils;
38+
import org.opensearch.timeseries.ExpiringState;
3939

4040
/**
4141
* We use a layered cache to manage active entities’ states. We have a two-level

0 commit comments

Comments
 (0)