13
13
14
14
import java .time .Clock ;
15
15
import java .time .Duration ;
16
+ import java .time .Instant ;
17
+ import java .util .ArrayList ;
16
18
import java .util .List ;
17
19
20
+ import org .apache .commons .lang3 .tuple .Pair ;
18
21
import org .apache .logging .log4j .LogManager ;
19
22
import org .apache .logging .log4j .Logger ;
20
23
import org .opensearch .ad .indices .ADIndex ;
21
24
import org .opensearch .ad .indices .ADIndexManagement ;
22
25
import org .opensearch .ad .ml .IgnoreSimilarExtractor .ThresholdArrays ;
23
26
import org .opensearch .ad .model .AnomalyDetector ;
27
+ import org .opensearch .ad .model .AnomalyResult ;
24
28
import org .opensearch .ad .ratelimit .ADCheckpointWriteWorker ;
29
+ import org .opensearch .forecast .ml .RCFCasterResult ;
30
+ import org .opensearch .forecast .model .ForecastResult ;
25
31
import org .opensearch .threadpool .ThreadPool ;
26
32
import org .opensearch .timeseries .AnalysisType ;
27
33
import org .opensearch .timeseries .NodeStateManager ;
34
40
import org .opensearch .timeseries .model .Config ;
35
41
import org .opensearch .timeseries .ratelimit .RequestPriority ;
36
42
import org .opensearch .timeseries .settings .TimeSeriesSettings ;
43
+ import org .opensearch .timeseries .util .ModelUtil ;
44
+ import org .opensearch .timeseries .util .ParseUtils ;
37
45
38
46
import com .amazon .randomcutforest .config .ForestMode ;
39
47
import com .amazon .randomcutforest .config .Precision ;
40
48
import com .amazon .randomcutforest .config .TransformMethod ;
49
+ import com .amazon .randomcutforest .parkservices .AnomalyDescriptor ;
41
50
import com .amazon .randomcutforest .parkservices .ThresholdedRandomCutForest ;
42
51
43
52
/**
44
53
* Training models for HCAD detectors
45
54
*
46
55
*/
47
56
public class ADColdStart extends
48
- ModelColdStart <ThresholdedRandomCutForest , ADIndex , ADIndexManagement , ADCheckpointDao , ADCheckpointWriteWorker > {
57
+ ModelColdStart <ThresholdedRandomCutForest , ADIndex , ADIndexManagement , ADCheckpointDao , ADCheckpointWriteWorker , AnomalyResult > {
49
58
private static final Logger logger = LogManager .getLogger (ADColdStart .class );
50
59
51
60
/**
@@ -87,7 +96,8 @@ public ADColdStart(
87
96
ADCheckpointWriteWorker checkpointWriteWorker ,
88
97
long rcfSeed ,
89
98
int maxRoundofColdStart ,
90
- int coolDownMinutes
99
+ int coolDownMinutes ,
100
+ int resultSchemaVersion
91
101
) {
92
102
super (
93
103
modelTtl ,
@@ -107,7 +117,8 @@ public ADColdStart(
107
117
featureManager ,
108
118
maxRoundofColdStart ,
109
119
TimeSeriesAnalyticsPlugin .AD_THREAD_POOL_NAME ,
110
- AnalysisType .AD
120
+ AnalysisType .AD ,
121
+ resultSchemaVersion
111
122
);
112
123
}
113
124
@@ -126,7 +137,8 @@ public ADColdStart(
126
137
Duration modelTtl ,
127
138
ADCheckpointWriteWorker checkpointWriteQueue ,
128
139
int maxRoundofColdStart ,
129
- int coolDownMinutes
140
+ int coolDownMinutes ,
141
+ int resultSchemaVersion
130
142
) {
131
143
this (
132
144
clock ,
@@ -144,7 +156,8 @@ public ADColdStart(
144
156
checkpointWriteQueue ,
145
157
-1 ,
146
158
maxRoundofColdStart ,
147
- coolDownMinutes
159
+ coolDownMinutes ,
160
+ resultSchemaVersion
148
161
);
149
162
}
150
163
@@ -158,7 +171,7 @@ public ADColdStart(
158
171
* training data in result index so that the frontend can plot it.
159
172
*/
160
173
@ Override
161
- protected List <Sample > trainModelFromDataSegments (
174
+ protected List <AnomalyResult > trainModelFromDataSegments (
162
175
List <Sample > pointSamples ,
163
176
ModelState <ThresholdedRandomCutForest > entityState ,
164
177
Config config ,
@@ -185,6 +198,7 @@ protected List<Sample> trainModelFromDataSegments(
185
198
.numberOfTrees (numberOfTrees )
186
199
.timeDecay (config .getTimeDecay ())
187
200
.transformDecay (config .getTimeDecay ())
201
+ // allow enough samples before emitting scores to park service
188
202
.outputAfter (Math .max (shingleSize , numMinSamples ))
189
203
.initialAcceptFraction (initialAcceptFraction )
190
204
.parallelExecutionEnabled (false )
@@ -221,21 +235,71 @@ protected List<Sample> trainModelFromDataSegments(
221
235
// use build instead of new TRCF(Builder) because build method did extra validation and initialization
222
236
ThresholdedRandomCutForest trcf = rcfBuilder .build ();
223
237
238
+ // Prepare for sequential processing
239
+ double [][] sequentialData = new double [pointSamples .size ()][];
240
+ List <Pair <Instant , Instant >> sequentialTime = new ArrayList <>();
241
+
242
+ // Convert the list of Sample objects into a 2D array + a parallel list of time pairs
224
243
for (int i = 0 ; i < pointSamples .size (); i ++) {
225
244
Sample dataSample = pointSamples .get (i );
226
245
double [] dataValue = dataSample .getValueList ();
227
- // We don't keep missing values during cold start as the actual data may not be reconstructed during the early stage.
228
- trcf .process (dataValue , dataSample .getDataEndTime ().getEpochSecond ());
246
+
247
+ sequentialData [i ] = dataValue ;
248
+ // Store start and end times together
249
+ sequentialTime .add (Pair .of (dataSample .getDataStartTime (), dataSample .getDataEndTime ()));
250
+ }
251
+
252
+ // Process data in one go
253
+ List <AnomalyDescriptor > descriptors = trcf .processSequentially (sequentialData , x -> true );
254
+
255
+ // Check for size mismatch
256
+ if (descriptors .size () != sequentialTime .size ()) {
257
+ logger .warn (
258
+ "processSequentially returned a different size than expected: got [{}], expected [{}]." ,
259
+ descriptors .size (),
260
+ sequentialTime .size ()
261
+ );
262
+ return null ;
263
+ }
264
+
265
+ // Build anomaly results from sequential descriptors
266
+ List <AnomalyResult > results = new ArrayList <>();
267
+ for (int i = 0 ; i < descriptors .size (); i ++) {
268
+ AnomalyDescriptor descriptor = descriptors .get (i );
269
+ double [] dataValue = sequentialData [i ];
270
+ Pair <Instant , Instant > time = sequentialTime .get (i );
271
+
272
+ // Convert the descriptor into a thresholding result, or anomaly result
273
+ ThresholdingResult thresholdingResult =
274
+ ModelUtil .toResult (trcf .getForest (), descriptor , dataValue , false , config );
275
+
276
+ Instant now = Instant .now ();
277
+ results .addAll (
278
+ thresholdingResult .toIndexableResults (
279
+ config ,
280
+ time .getLeft (), // Data start time
281
+ time .getRight (), // Data end time
282
+ now ,
283
+ now ,
284
+ ParseUtils .getFeatureData (dataValue , config ),
285
+ entityState .getEntity (),
286
+ resultMappingVersion ,
287
+ entityState .getModelId (),
288
+ taskId ,
289
+ null
290
+ )
291
+ );
229
292
}
230
293
294
+
231
295
entityState .setModel (trcf );
232
296
233
297
entityState .setLastUsedTime (clock .instant ());
234
298
235
299
// save to checkpoint
236
300
checkpointWriteWorker .write (entityState , true , RequestPriority .MEDIUM );
237
301
238
- return pointSamples ;
302
+ return results ;
239
303
}
240
304
241
305
public static void applyRule (ThresholdedRandomCutForest .Builder rcfBuilder , AnomalyDetector detector ) {
0 commit comments