Skip to content

Commit 7079636

Browse files
authored
Add serde for RCFCaster (aws#360)
* Add serde for RCFCaster This PR adds state and mapper class for RCFCaster. This PR also formats CPUTest and V2TRCFToV3StateConverterTest using spotless. The content of those two files are not changed. Testing done: 1. added unit tests. * address comments
1 parent e8bf267 commit 7079636

File tree

11 files changed

+562
-75
lines changed

11 files changed

+562
-75
lines changed

.gitignore

+7
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,10 @@ build
22
target
33
.idea
44
*.iml
5+
.project
6+
.settings
7+
.classpath
8+
._.DS_Store
9+
.DS_Store
10+
Java/*/bin/
11+

Java/core/src/main/java/com/amazon/randomcutforest/state/Version.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ public class Version {
1919
public static final String V2_0 = "2.0";
2020
public static final String V2_1 = "2.1";
2121
public static final String V3_0 = "3.0";
22+
public static final String V3_5 = "3.5";
2223
}

Java/core/src/test/java/com/amazon/randomcutforest/CPUTest.java

+56-52
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,39 @@
1515

1616
package com.amazon.randomcutforest;
1717

18-
import com.amazon.randomcutforest.testutils.ShingledMultiDimDataWithKeys;
18+
import java.util.Arrays;
19+
import java.util.concurrent.ForkJoinPool;
20+
1921
import org.junit.jupiter.api.Tag;
2022
import org.junit.jupiter.api.Test;
2123

22-
import java.util.Arrays;
23-
import java.util.concurrent.ForkJoinPool;
24+
import com.amazon.randomcutforest.testutils.ShingledMultiDimDataWithKeys;
2425

2526
/**
26-
* The following "test" is intended to provide an approximate estimate of the improvement
27-
* from parallelization. At the outset, we remark that running the test from inside
28-
* an IDE/environment may reflect more of the environment. Issues such as warming are not
29-
* reflected in this test.
27+
* The following "test" is intended to provide an approximate estimate of the
28+
* improvement from parallelization. At the outset, we remark that running the
29+
* test from inside an IDE/environment may reflect more of the environment.
30+
* Issues such as warming are not reflected in this test.
3031
*
31-
* Users who wish to obtain more calibrated estimates should use a benchmark -- preferably
32-
* using their own "typical" data and their end to end setup. Performance of RCF is data dependent.
33-
* Such users may be invoking RCF functions differently from a standard "impute, score, update"
34-
* process recommended for streaming time series data.
32+
* Users who wish to obtain more calibrated estimates should use a benchmark --
33+
* preferably using their own "typical" data and their end to end setup.
34+
* Performance of RCF is data dependent. Such users may be invoking RCF
35+
* functions differently from a standard "impute, score, update" process
36+
* recommended for streaming time series data.
3537
*
36-
* Moreover, in the context of a large number of models, the rate at which the models require
37-
* updates is also a factor and not controlled herein.
38+
* Moreover, in the context of a large number of models, the rate at which the
39+
* models require updates is also a factor and not controlled herein.
3840
*
39-
* The two tests should produce near identical sum of scores, and (root) mean squared error of
40-
* the impute up to machine precision (since the order of the arithmetic operations would vary).
41+
* The two tests should produce near identical sum of scores, and (root) mean
42+
* squared error of the impute up to machine precision (since the order of the
43+
* arithmetic operations would vary).
4144
*
42-
* To summarize the lessons, it appears that parallelism almost always helps (upto resource limitations).
43-
* If an user is considering a single model -- say from a console or dashboard, they should consider
44-
* having parallel threads enabled. For large number of models, it may be worthwhile
45-
* to also investigate different ways of achieving parallelism and not just attempt to
46-
* change the executor framework.
45+
* To summarize the lessons, it appears that parallelism almost always helps
46+
* (upto resource limitations). If an user is considering a single model -- say
47+
* from a console or dashboard, they should consider having parallel threads
48+
* enabled. For large number of models, it may be worthwhile to also investigate
49+
* different ways of achieving parallelism and not just attempt to change the
50+
* executor framework.
4751
*
4852
*/
4953

@@ -65,23 +69,23 @@ public class CPUTest {
6569

6670
@Test
6771
public void profileTestSync() {
68-
double [] mse = new double [numberOfForests];
69-
int [] mseCount = new int[numberOfForests];
70-
double [] score =new double[numberOfForests];
71-
72-
double[][] data = ShingledMultiDimDataWithKeys.getMultiDimData(DATA_SIZE, 60, 100, 5, 0, numberOfAttributes).data;
73-
74-
RandomCutForest [] forests = new RandomCutForest [numberOfForests];
75-
for (int k = 0;k<numberOfForests; k++) {
76-
forests[k] = RandomCutForest.builder().numberOfTrees(numberOfTrees).dimensions(dimensions).shingleSize(shingleSize)
77-
.boundingBoxCacheFraction(boundingBoxCacheFraction).randomSeed(99+k).outputAfter(10)
78-
.parallelExecutionEnabled(true)
79-
.threadPoolSize(numberOfThreads)
72+
double[] mse = new double[numberOfForests];
73+
int[] mseCount = new int[numberOfForests];
74+
double[] score = new double[numberOfForests];
75+
76+
double[][] data = ShingledMultiDimDataWithKeys.getMultiDimData(DATA_SIZE, 60, 100, 5, 0,
77+
numberOfAttributes).data;
78+
79+
RandomCutForest[] forests = new RandomCutForest[numberOfForests];
80+
for (int k = 0; k < numberOfForests; k++) {
81+
forests[k] = RandomCutForest.builder().numberOfTrees(numberOfTrees).dimensions(dimensions)
82+
.shingleSize(shingleSize).boundingBoxCacheFraction(boundingBoxCacheFraction).randomSeed(99 + k)
83+
.outputAfter(10).parallelExecutionEnabled(true).threadPoolSize(numberOfThreads)
8084
.internalShinglingEnabled(true).initialAcceptFraction(0.1).sampleSize(sampleSize).build();
8185
}
8286

8387
for (int j = 0; j < data.length; j++) {
84-
for (int k = 0;k<numberOfForests; k++) {
88+
for (int k = 0; k < numberOfForests; k++) {
8589
score[k] += forests[k].getAnomalyScore(data[j]);
8690
if (j % 10 == 0 && j > 0) {
8791
double[] result = forests[k].extrapolate(1);
@@ -97,7 +101,7 @@ public void profileTestSync() {
97101
forests[k].update(data[j]);
98102
}
99103
}
100-
for(int k=0;k<numberOfForests;k++) {
104+
for (int k = 0; k < numberOfForests; k++) {
101105
System.out.println(" Forest " + k);
102106
System.out.println(" MSE " + mse[k] / mseCount[k]);
103107
System.out.println(" scoresum " + score[k] / data.length);
@@ -106,30 +110,30 @@ public void profileTestSync() {
106110

107111
@Test
108112
public void profileTestASync() {
109-
double [] mse = new double [numberOfForests];
110-
int [] mseCount = new int[numberOfForests];
111-
double [] score =new double[numberOfForests];
112-
113-
double[][] data = ShingledMultiDimDataWithKeys.getMultiDimData(DATA_SIZE, 60, 100, 5, 0, numberOfAttributes).data;
114-
115-
RandomCutForest [] forests = new RandomCutForest [numberOfForests];
116-
for (int k = 0;k<numberOfForests; k++) {
117-
forests[k] = RandomCutForest.builder().numberOfTrees(numberOfTrees).dimensions(dimensions).shingleSize(shingleSize)
118-
.boundingBoxCacheFraction(boundingBoxCacheFraction).randomSeed(99+k).outputAfter(10)
119-
.parallelExecutionEnabled(false)
120-
.internalShinglingEnabled(true).initialAcceptFraction(0.1).sampleSize(sampleSize).build();
113+
double[] mse = new double[numberOfForests];
114+
int[] mseCount = new int[numberOfForests];
115+
double[] score = new double[numberOfForests];
116+
117+
double[][] data = ShingledMultiDimDataWithKeys.getMultiDimData(DATA_SIZE, 60, 100, 5, 0,
118+
numberOfAttributes).data;
119+
120+
RandomCutForest[] forests = new RandomCutForest[numberOfForests];
121+
for (int k = 0; k < numberOfForests; k++) {
122+
forests[k] = RandomCutForest.builder().numberOfTrees(numberOfTrees).dimensions(dimensions)
123+
.shingleSize(shingleSize).boundingBoxCacheFraction(boundingBoxCacheFraction).randomSeed(99 + k)
124+
.outputAfter(10).parallelExecutionEnabled(false).internalShinglingEnabled(true)
125+
.initialAcceptFraction(0.1).sampleSize(sampleSize).build();
121126
}
122127

123128
ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads);
124-
int [] indices = new int[numberOfForests];
125-
for(int k=0;k<numberOfForests;k++){
129+
int[] indices = new int[numberOfForests];
130+
for (int k = 0; k < numberOfForests; k++) {
126131
indices[k] = k;
127132
}
128133

129134
for (int j = 0; j < data.length; j++) {
130-
int finalJ=j;
131-
forkJoinPool.submit( () ->
132-
Arrays.stream(indices).parallel().forEach(k -> {
135+
int finalJ = j;
136+
forkJoinPool.submit(() -> Arrays.stream(indices).parallel().forEach(k -> {
133137
score[k] += forests[k].getAnomalyScore(data[finalJ]);
134138
if (finalJ % 10 == 0 && finalJ > 0) {
135139
double[] result = forests[k].extrapolate(1);
@@ -145,7 +149,7 @@ public void profileTestASync() {
145149
forests[k].update(data[finalJ]);
146150
})).join();
147151
}
148-
for(int k=0;k<numberOfForests;k++) {
152+
for (int k = 0; k < numberOfForests; k++) {
149153
System.out.println(" Forest " + k);
150154
System.out.println(" MSE " + mse[k] / mseCount[k]);
151155
System.out.println(" scoresum " + score[k] / data.length);

Java/parkservices/src/main/java/com/amazon/randomcutforest/parkservices/ErrorHandler.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import java.util.Arrays;
2323
import java.util.function.BiFunction;
2424

25+
import lombok.Getter;
26+
import lombok.Setter;
27+
2528
import com.amazon.randomcutforest.parkservices.calibration.Calibration;
2629
import com.amazon.randomcutforest.returntypes.DiVector;
2730
import com.amazon.randomcutforest.returntypes.RangeVector;
@@ -36,6 +39,8 @@
3639
// can be involved and out of current scope of this library. We simplify the issue to calibrating two
3740
// fixed quantiles and hence additive updates are reasonable.
3841

42+
@Getter
43+
@Setter
3944
public class ErrorHandler {
4045

4146
/**
@@ -101,40 +106,45 @@ public ErrorHandler(RCFCaster.Builder builder) {
101106
* the folloqing would be useful when states and mappers get written
102107
*/
103108
public ErrorHandler(int errorHorizon, int forecastHorizon, int sequenceIndex, double percentile, int inputLength,
104-
int dimensions, float[] actualsFlattened, float[] pastForecastsFlattened, float[] auxilliary) {
109+
float[] actualsFlattened, float[] pastForecastsFlattened, float[] auxilliary) {
105110
checkArgument(forecastHorizon > 0, " incorrect forecast horizon");
106111
checkArgument(errorHorizon >= forecastHorizon, "incorrect error horizon");
107112
checkArgument(actualsFlattened != null || pastForecastsFlattened == null,
108113
" actuals and forecasts are a mismatch");
109-
checkArgument(inputLength > 0 && dimensions > 0 && dimensions % inputLength == 0, "incorrect parameters");
114+
checkArgument(inputLength > 0, "incorrect parameters");
110115
this.sequenceIndex = sequenceIndex;
111116
this.errorHorizon = errorHorizon;
112117
this.percentile = percentile;
113118
this.forecastHorizon = forecastHorizon;
114119
int currentLength = (actualsFlattened == null) ? 0 : actualsFlattened.length;
115120
checkArgument(currentLength % inputLength == 0, "actuals array is incorrect");
116121
int forecastLength = (pastForecastsFlattened == null) ? 0 : pastForecastsFlattened.length;
117-
checkArgument(forecastLength == currentLength * dimensions * 3 / inputLength, "misaligned forecasts");
122+
118123
int arrayLength = max(forecastHorizon + errorHorizon, currentLength / inputLength);
119124
this.pastForecasts = new RangeVector[arrayLength];
120125
this.actuals = new float[arrayLength][inputLength];
121126

122127
int length = forecastHorizon * inputLength;
128+
// currentLength = (number of actual time steps stored) x inputLength and for
129+
// each of the stored time steps we get a forecast whose length is
130+
// forecastHorizon x inputLength (and then upper and lower for each, hence x 3)
131+
// so forecastLength = number of actual time steps stored x forecastHorizon x
132+
// inputLength x 3
133+
// = currentLength x forecastHorizon x 3
134+
checkArgument(forecastLength == currentLength * 3 * forecastHorizon, "misaligned forecasts");
123135

124136
this.errorMean = new float[length];
125137
this.errorRMSE = new DiVector(length);
126138
this.intervalPrecision = new float[length];
139+
this.adders = new RangeVector(length);
127140
this.multipliers = new RangeVector(length);
128141
this.errorDistribution = new RangeVector(length);
129142

130143
if (pastForecastsFlattened != null) {
131-
for (int i = 0; i < currentLength / inputLength; i++) {
132-
float[] values = Arrays.copyOfRange(pastForecastsFlattened, i * 3 * dimensions,
133-
(i * 3 + 1) * dimensions);
134-
float[] upper = Arrays.copyOfRange(pastForecastsFlattened, (i * 3 + 1) * dimensions,
135-
(i * 3 + 2) * dimensions);
136-
float[] lower = Arrays.copyOfRange(pastForecastsFlattened, (i * 3 + 3) * dimensions,
137-
(i * 3 + 3) * dimensions);
144+
for (int i = 0; i < arrayLength; i++) {
145+
float[] values = Arrays.copyOfRange(pastForecastsFlattened, i * 3 * length, (i * 3 + 1) * length);
146+
float[] upper = Arrays.copyOfRange(pastForecastsFlattened, (i * 3 + 1) * length, (i * 3 + 2) * length);
147+
float[] lower = Arrays.copyOfRange(pastForecastsFlattened, (i * 3 + 2) * length, (i * 3 + 3) * length);
138148
pastForecasts[i] = new RangeVector(values, upper, lower);
139149
System.arraycopy(actualsFlattened, i * inputLength, actuals[i], 0, inputLength);
140150
}
@@ -145,7 +155,7 @@ public ErrorHandler(int errorHorizon, int forecastHorizon, int sequenceIndex, do
145155
/**
146156
* the following the core subroutine, which calibrates; but the application of
147157
* the calibration is controlled
148-
*
158+
*
149159
* @param descriptor the current forecast
150160
* @param calibrationMethod the choice of the callibration
151161
*/
@@ -212,7 +222,7 @@ public RangeVector computeErrorPercentile(double percentile, BiFunction<Float, F
212222
* the following function is provided such that the calibration of errors can be
213223
* performed using a different function. e.g., SMAPE type evaluation using
214224
* RCFCaster.alternateError
215-
*
225+
*
216226
* @param percentile the desired percentile (we recomment leaving this at 0.1 --
217227
* the algorithm likely will never have sufficiently many
218228
* observations to have a very fine grain distribution;

Java/parkservices/src/main/java/com/amazon/randomcutforest/parkservices/RCFCaster.java

+19
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,19 @@
2121

2222
import java.util.function.BiFunction;
2323

24+
import lombok.Getter;
25+
import lombok.Setter;
26+
27+
import com.amazon.randomcutforest.RandomCutForest;
2428
import com.amazon.randomcutforest.config.ForestMode;
2529
import com.amazon.randomcutforest.config.TransformMethod;
2630
import com.amazon.randomcutforest.parkservices.calibration.Calibration;
31+
import com.amazon.randomcutforest.parkservices.preprocessor.Preprocessor;
2732
import com.amazon.randomcutforest.parkservices.returntypes.TimedRangeVector;
2833
import com.amazon.randomcutforest.returntypes.RangeVector;
2934

35+
@Getter
36+
@Setter
3037
public class RCFCaster extends ThresholdedRandomCutForest {
3138

3239
public static double DEFAULT_ERROR_PERCENTILE = 0.1;
@@ -76,6 +83,7 @@ public Builder calibration(Calibration calibrationMethod) {
7683
return this;
7784
}
7885

86+
@Override
7987
public RCFCaster build() {
8088
checkArgument(forecastHorizon > 0, "need non-negative horizon");
8189
checkArgument(shingleSize > 0, "need shingle size > 1");
@@ -110,6 +118,17 @@ public RCFCaster(Builder builder) {
110118
calibrationMethod = builder.calibrationMethod;
111119
}
112120

121+
// for mappers
122+
public RCFCaster(ForestMode forestMode, TransformMethod transformMethod, RandomCutForest forest,
123+
PredictorCorrector predictorCorrector, Preprocessor preprocessor, RCFComputeDescriptor descriptor,
124+
int forecastHorizon, ErrorHandler errorHandler, int errorHorizon, Calibration calibrationMethod) {
125+
super(forestMode, transformMethod, forest, predictorCorrector, preprocessor, descriptor);
126+
this.forecastHorizon = forecastHorizon;
127+
this.errorHandler = errorHandler;
128+
this.errorHorizon = errorHorizon;
129+
this.calibrationMethod = calibrationMethod;
130+
}
131+
113132
/**
114133
* a single call that preprocesses data, compute score/grade, generates forecast
115134
* and updates state

0 commit comments

Comments
 (0)