diff --git a/README.md b/README.md index 6922d00..6bc416c 100755 --- a/README.md +++ b/README.md @@ -10,15 +10,56 @@ Although there are some repos for python to run twitter's anomaly detection algo This repo aims for rewriting twitter's Anomaly Detection algorithms in Python, and providing same functions for user. - ## Install ``` pip3 install tad ``` +## Requirement + +1.The data should have the Index which is a datetime type. Single series is processed so only pass single numeric series at a time. +2.Plotting function is based on matplotlib, the plot is retured in the results if user wants to change any appearnaces etc. ## Usage ``` import tad + +import pandas as pd +import matplotlib.pyplot as plt + +a = pd.DataFrame({'numeric_data_col1': + [1,1, 1, 1, 1, 1, 10, 1, 1, 1, 1, 1, 1, 1]}, + index=pd.to_datetime(['2020-01-01', '2020-01-02', '2020-01-03', + '2020-01-04', '2020-01-05','2020-01-06','2020-01-07','2020-01-08', + '2020-01-09','2020-01-10','2020-01-11','2020-01-12','2020-01-13', + '2020-01-14'])) + +results = anomaly_detect_ts(a['numeric_data_col1'], + direction='both', alpha=0.02, + max_anoms=0.20, + plot=True, longterm=True) +if results['plot']: #some anoms were detected and plot was also True. + plt.show() + ``` +results +{'anoms': 2020-01-14 1 + 2020-01-07 10 + dtype: int64, + 'expected': None, + 'plot': } + +Output shall be in the results dict + +results.anoms shall contain the anomalies detected + +results.plot shall contain a matplotlib plot if anoms were detected and plot was True + +results.expected tries to return expected values for certain dates. TODO: inconsistent as provides different outputs compared to anoms + +![Sample Script output](/resources/images/sample_execution.png) + +## Other Sample Images + +![Another sample of detecction using default parameters](/resources/images/sample_01.png) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6552e43..b7aa2fd 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ numpy scipy pandas -statsmodels \ No newline at end of file +statsmodels +matplotlib \ No newline at end of file diff --git a/resources/images/sample_01.png b/resources/images/sample_01.png new file mode 100644 index 0000000..569a213 Binary files /dev/null and b/resources/images/sample_01.png differ diff --git a/resources/images/sample_execution.png b/resources/images/sample_execution.png new file mode 100644 index 0000000..e7c1c31 Binary files /dev/null and b/resources/images/sample_execution.png differ diff --git a/tad/anomaly_detect_ts.py b/tad/anomaly_detect_ts.py index a042a46..0316905 100755 --- a/tad/anomaly_detect_ts.py +++ b/tad/anomaly_detect_ts.py @@ -58,10 +58,10 @@ title: Title for the output plot. verbose: Enable debug messages - - resampling: whether ms or sec granularity should be resampled to min granularity. + + resampling: whether ms or sec granularity should be resampled to min granularity. Defaults to False. - + period_override: Override the auto-generated period Defaults to None @@ -141,7 +141,7 @@ import datetime import statsmodels.api as sm import logging - +import matplotlib.pyplot as plt #this will be used for plotting. logger = logging.getLogger(__name__) @@ -155,7 +155,10 @@ def _handle_granularity_error(level): level : String the granularity that is below the min threshold """ - e_message = '%s granularity is not supported. Ensure granularity => minute or enable resampling' % level + #improving the message as if user selects Timestamp, Dimension, Value sort of data then repeated timelines + #will cause issues with the module. Ideally, user should only supply single KPI for a single dimension with timestamp. + + e_message = '%s granularity is not supported. Ensure granularity => minute or enable resampling. Please check if you are using multiple dimensions with same timestamps in the data which cause repetition of same timestamps.' % level raise ValueError(e_message) @@ -325,20 +328,26 @@ def _get_only_last_results(data, all_anoms, granularity, only_last): only_last : string day | hr The subset of anomalies to be returned """ - start_date = data.index[-1] - datetime.timedelta(days=7) + + #Unused variables start_date and x_subset_week were commented by aliasgherman + # on 2020-06-13 as the plot logic does not utilize them for now. + #start_date = data.index[-1] - datetime.timedelta(days=7) start_anoms = data.index[-1] - datetime.timedelta(days=1) if only_last == 'hr': # We need to change start_date and start_anoms for the hourly only_last option - start_date = datetime.datetime.combine( - (data.index[-1] - datetime.timedelta(days=2)).date(), datetime.time.min) + #start_date = datetime.datetime.combine( + # (data.index[-1] - datetime.timedelta(days=2)).date(), datetime.time.min) start_anoms = data.index[-1] - datetime.timedelta(hours=1) # subset the last days worth of data x_subset_single_day = data.loc[data.index > start_anoms] # When plotting anoms for the last day only we only show the previous weeks data - x_subset_week = data.loc[lambda df: ( - df.index <= start_anoms) & (df.index > start_date)] + ## Below was commented out by aliasgherman as the plot logic (v001) + ## does not use this variable and plots whole dataset. + ##x_subset_week = data.loc[lambda df: ( + ## df.index <= start_anoms) & (df.index > start_date)] + # return all_anoms.loc[all_anoms.index >= x_subset_single_day.index[0]] @@ -394,8 +403,9 @@ def _get_max_outliers(data, max_percent_anomalies): the input maximum number of anomalies per percent of data set values """ max_outliers = int(np.trunc(data.size * max_percent_anomalies)) - assert max_outliers, 'With longterm=True, AnomalyDetection splits the data into 2 week periods by default. You have {0} observations in a period, which is too few. Set a higher piecewise_median_period_weeks.'.format( - data.size) + if not max_outliers: + raise ValueError('With longterm=True, AnomalyDetection splits the data into 2 week periods by default. You have {0} observations in a period, which is too few. Set a higher piecewise_median_period_weeks.'.format( + data.size)) return max_outliers @@ -425,28 +435,38 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N logger.debug("The debug logs will be logged because verbose=%s", verbose) # validation - assert isinstance(x, pd.Series), 'Data must be a series(Pandas.Series)' - assert x.values.dtype in [int, float], 'Values of the series must be number' - assert x.index.dtype == np.dtype('datetime64[ns]'), 'Index of the series must be datetime' - assert max_anoms <= 0.49 and max_anoms >= 0, 'max_anoms must be non-negative and less than 50% ' - assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both' - assert only_last in [None, 'day', 'hr'], 'only_last options: None | day | hr' - assert threshold in [None, 'med_max', 'p95', 'p99'], 'threshold options: None | med_max | p95 | p99' - assert piecewise_median_period_weeks >= 2, 'piecewise_median_period_weeks must be greater than 2 weeks' + if isinstance(x, pd.Series) == False: + raise AssertionError('Data must be a series(Pandas.Series)') + #changing below as apparantly the large integer data like int64 was not captured by below + if x.values.dtype not in [int, float, 'int64']: + raise ValueError('Values of the series must be number') + if x.index.dtype != np.dtype('datetime64[ns]'): + raise ValueError('Index of the series must be datetime') + if max_anoms > 0.49 or max_anoms < 0: + raise AttributeError('max_anoms must be non-negative and less than 50% ') + if direction not in ['pos', 'neg', 'both']: + raise AttributeError('direction options: pos | neg | both') + if only_last not in [None, 'day', 'hr']: + raise AttributeError('only_last options: None | day | hr') + if threshold not in [None, 'med_max', 'p95', 'p99']: + raise AttributeError('threshold options: None | med_max | p95 | p99') + if piecewise_median_period_weeks < 2: + raise AttributeError('piecewise_median_period_weeks must be greater than 2 weeks') logger.debug('Completed validation of input parameters') if alpha < 0.01 or alpha > 0.1: logger.warning('alpha is the statistical significance and is usually between 0.01 and 0.1') data, period, granularity = _get_data_tuple(x, period_override, resampling) - if granularity is 'day': + if granularity == 'day': num_days_per_line = 7 + logger.info("Recording the variable in case plot function needs it. gran = day. {}".format(num_days_per_line)) only_last = 'day' if only_last == 'hr' else only_last max_anoms = _get_max_anoms(data, max_anoms) # If longterm is enabled, break the data into subset data frames and store in all_data - all_data = _process_long_term_data(data, period, granularity, piecewise_median_period_weeks) if longterm else [data] + all_data = _process_long_term_data(data, period, granularity, piecewise_median_period_weeks) if longterm else [data] all_anoms = pd.Series() seasonal_plus_trend = pd.Series() @@ -488,19 +508,45 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N 'plot': None } + ret_val = { + 'anoms': all_anoms, + 'expected': seasonal_plus_trend if e_value else None, + 'plot': 'TODO' if plot else None + } + if plot: # TODO additional refactoring and logic needed to support plotting - num_days_per_line + #num_days_per_line #breaks = _get_plot_breaks(granularity, only_last) # x_subset_week - raise Exception('TODO: Unsupported now') + ret_plot = _plot_anomalies(data, ret_val) + ret_val['plot'] = ret_plot - return { - 'anoms': all_anoms, - 'expected': seasonal_plus_trend if e_value else None, - 'plot': 'TODO' if plot else None - } + #raise Exception('TODO: Unsupported now') + + return ret_val + +def _plot_anomalies(data, results): + """ + Tries to plot the data and the anomalies detected in this data. + + ArgsL + data: Time series on which we are performing the anomaly detection. (full data) + results: the results dictionary which contains anomalies grouped in the key called 'anoms' + """ + anoms = pd.DataFrame(results) + df_plot = pd.DataFrame(data).join(anoms, how='left') + #df_plot = df_plot.fillna(0) #if no anomaly, then we will plot a zero. can be improved. + df_plot['anoms'].unique() + _, ax = plt.subplots(figsize=(14,6)) + ax.plot(df_plot['anoms'], color='r', marker='o', label='Anomaly', linestyle="None") + ax.plot(data, label=data.name) + ax.set_title(data.name) + ax.legend(loc='best') + ax.grid(b=True) + #plt.show() + return ax def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None, use_decomp=True, use_esd=False, direction="pos", verbose=False): @@ -522,11 +568,26 @@ def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None, """ # validation + assert num_obs_per_period, "must supply period length for time series decomposition" assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both' - assert data.size >= num_obs_per_period * \ - 2, 'Anomaly detection needs at least 2 periods worth of data' + ########################################################################### + # Changing below code. If the data contains broken dates then the data.size may be less than observation periods + # so for such cases, we should return empty obsevations + ########################################################################### + #assert data.size >= num_obs_per_period * \ + # 2, 'Anomaly detection needs at least 2 periods worth of data' + if data.size < num_obs_per_period * 2: + return { + 'anoms': pd.Series(), #return empty series + 'stl': data #return untouched data... + } + # test case can be any data set which has large gapes in the dates. + # like data contains dates from year 2000 till 2020 but for 2001, 2001-01-01 till 2001-01-04 and then from 2001-06-01. + # this will break the obs_period and data.size check. So I have just removed anomaly detection for these small patches. + ########################################################################### + assert data[data.isnull( )].empty, 'Data contains NA. We suggest replacing NA with interpolated values before detecting anomaly' diff --git a/tests/test_detect_ts.py b/tests/test_detect_ts.py index e9ce44c..684c462 100755 --- a/tests/test_detect_ts.py +++ b/tests/test_detect_ts.py @@ -24,22 +24,22 @@ def setUp(self): self.data1 = pd.read_csv(TEST_DATA_DIR / 'test_data_1.csv', index_col='timestamp', parse_dates=True, squeeze=True, date_parser=self.dparserfunc) - + self.data2 = pd.read_csv(TEST_DATA_DIR / 'test_data_2.csv', index_col='timestamp', parse_dates=True, squeeze=True, date_parser=self.dparserfunc) - + self.data3 = pd.read_csv(TEST_DATA_DIR / 'test_data_3.csv', index_col='timestamp', parse_dates=True, squeeze=True, date_parser=self.dparserfunc) - + self.data4 = pd.read_csv(TEST_DATA_DIR / 'test_data_4.csv', index_col='timestamp', parse_dates=True, squeeze=True, date_parser=self.dparserfunc) - + self.data5 = pd.read_csv(TEST_DATA_DIR / 'test_data_5.csv', index_col='timestamp', parse_dates=True, squeeze=True, - date_parser=self.dparserfunc) + date_parser=self.dparserfunc) def get_test_value(self, raw_value): return np.float64(raw_value) @@ -51,9 +51,9 @@ def test_anomaly_detect_ts_1(self): results = anomaly_detect_ts(self.data1, direction='both', alpha=0.05, plot=False, longterm=True) - values = results['anoms'].get_values() + values = results['anoms'].array - self.assertEquals(132, len(values)) + self.assertEqual(132, len(values)) self.assertTrue(self.get_test_value(40.0) in values) self.assertTrue(self.get_test_value(250.0) in values) self.assertTrue(self.get_test_value(210.0) in values) @@ -74,82 +74,82 @@ def test_anomaly_detect_ts_1(self): self.assertTrue(self.get_test_value(151.549) in values) self.assertTrue(self.get_test_value(147.028) in values) self.assertTrue(self.get_test_value(31.2614) in values) - + def test_anomaly_detect_ts_2(self): results = anomaly_detect_ts(self.data2, direction='both', alpha=0.02, max_anoms=0.02, plot=False, longterm=True) - values = results['anoms'].get_values() - - self.assertEquals(2, len(values)) + values = results['anoms'].array + + self.assertEqual(2, len(values)) self.assertTrue(self.get_test_value(-549.97419676451) in values) self.assertTrue(self.get_test_value(-3241.79887765979) in values) - + def test_anomaly_detect_ts_3(self): results = anomaly_detect_ts(self.data3, direction='both', alpha=0.02, max_anoms=0.02, plot=False, longterm=True) - values = results['anoms'].get_values() - - self.assertEquals(6, len(values)) + values = results['anoms'].array + + self.assertEqual(6, len(values)) self.assertTrue(self.get_test_value(677.306772096232) in values) self.assertTrue(self.get_test_value(3003.3770260296196) in values) - self.assertTrue(self.get_test_value(375.68211544563) in values) + self.assertTrue(self.get_test_value(375.68211544563) in values) self.assertTrue(self.get_test_value(4244.34731650009) in values) self.assertTrue(self.get_test_value(2030.44357652981) in values) self.assertTrue(self.get_test_value(4223.461867236129) in values) - + def test_anomaly_detect_ts_4(self): results = anomaly_detect_ts(self.data4, direction='both', alpha=0.02, max_anoms=0.02, plot=False, longterm=True) - values = results['anoms'].get_values() - - self.assertEquals(1, len(values)) + values = results['anoms'].array + + self.assertEqual(1, len(values)) self.assertTrue(self.get_test_value(-1449.62440286) in values) - + def test_anomaly_detect_ts_5(self): results = anomaly_detect_ts(self.data5, direction='both', alpha=0.02, max_anoms=0.02, plot=False, longterm=True) - values = results['anoms'].get_values() - - self.assertEquals(4, len(values)) + values = results['anoms'].array + + self.assertEqual(4, len(values)) self.assertTrue(self.get_test_value(-3355.47215640248) in values) self.assertTrue(self.get_test_value(941.905602754994) in values) self.assertTrue(self.get_test_value(-2428.98882200991) in values) self.assertTrue(self.get_test_value(-1263.4494013677302) in values) - + def test_detect_anoms(self): shesd = _detect_anoms(self.data1, k=0.02, alpha=0.05, num_obs_per_period=1440, use_decomp=True, use_esd=False, direction='both') - self.assertEquals(133, len(shesd['anoms'])) - + self.assertEqual(133, len(shesd['anoms'])) + def test__detect_anoms_pos(self): shesd = _detect_anoms(self.data1, k=0.02, alpha=0.05, num_obs_per_period=1440, use_decomp=True, use_esd=False, direction='pos') - self.assertEquals(50, len(shesd['anoms'])) + self.assertEqual(50, len(shesd['anoms'])) def test__detect_anoms_neg(self): shesd = _detect_anoms(self.data1, k=0.02, alpha=0.05, num_obs_per_period=1440, use_decomp=True, use_esd=False, direction='neg') - self.assertEquals(85, len(shesd['anoms'])) + self.assertEqual(85, len(shesd['anoms'])) def test__detect_anoms_use_decomp_false(self): shesd = _detect_anoms(self.data1, k=0.02, alpha=0.05, num_obs_per_period=1440, use_decomp=False, use_esd=False, direction='both') - self.assertEquals(133, len(shesd['anoms'])) + self.assertEqual(133, len(shesd['anoms'])) def test__detect_anoms_no_num_obs_per_period(self): - with self.assertRaises(AssertionError): + with self.assertRaises(AssertionError): _detect_anoms(self.data1, k=0.02, alpha=0.05, num_obs_per_period=None, use_decomp=False, use_esd=False, @@ -160,125 +160,131 @@ def test__detect_anoms_use_esd_true(self): num_obs_per_period=1440, use_decomp=True, use_esd=True, direction='both') - self.assertEquals(133, len(shesd['anoms'])) - + self.assertEqual(133, len(shesd['anoms'])) + def test_anomaly_detect_ts_last_only_none(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', only_last=None, plot=False) - self.assertEquals(132, len(results['anoms'])) + self.assertEqual(132, len(results['anoms'])) def test_anomaly_detect_ts_last_only_day(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', only_last='day', plot=False) - self.assertEquals(23, len(results['anoms'])) + self.assertEqual(23, len(results['anoms'])) def test_anomaly_detect_ts_last_only_hr(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', only_last='hr', plot=False) - values = results['anoms'].get_values() - - self.assertEquals(3, len(values)) + values = results['anoms'].array + + self.assertEqual(3, len(values)) self.assertTrue(self.get_test_value(40.0) in values) self.assertTrue(self.get_test_value(250.0) in values) - self.assertTrue(self.get_test_value(210.0) in values) - + self.assertTrue(self.get_test_value(210.0) in values) + def test_anomaly_detect_ts_pos_only(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, - direction='pos', + direction='pos', only_last=None, plot=False) - self.assertEquals(50, len(results['anoms'])) - + self.assertEqual(50, len(results['anoms'])) + def test_anomaly_detect_ts_neg_only(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, - direction='neg', + direction='neg', only_last=None, plot=False) - self.assertEquals(84, len(results['anoms'])) + self.assertEqual(84, len(results['anoms'])) def test_anomaly_detect_ts_med_max_threshold(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', threshold='med_max', only_last=None, plot=False) - values = results['anoms'].get_values() + values = results['anoms'].array - self.assertEquals(4, len(values)) + self.assertEqual(4, len(values)) self.assertTrue(self.get_test_value(203.231) in values) self.assertTrue(self.get_test_value(203.90099999999998) in values) - self.assertTrue(self.get_test_value(250.0) in values) - self.assertTrue(self.get_test_value(210.0) in values) + self.assertTrue(self.get_test_value(250.0) in values) + self.assertTrue(self.get_test_value(210.0) in values) def test_anomaly_detect_ts_longterm(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', threshold=None, only_last=None, longterm=True) - self.assertEquals(132, len(results['anoms'])) + self.assertEqual(132, len(results['anoms'])) def test_anomaly_detect_ts_piecewise_median_period_weeks(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, piecewise_median_period_weeks=4, direction='both', threshold=None, only_last=None, longterm=False) - self.assertEquals(132, len(results['anoms'])) + self.assertEqual(132, len(results['anoms'])) def test_invalid_data_parameter(self): - with self.assertRaises(AssertionError): + with self.assertRaises(AssertionError): anomaly_detect_ts(['invalid'], max_anoms=0.02, direction='both', threshold=None, only_last=None, longterm=False) def test_invalid_piecewise_median_period_weeks(self): - with self.assertRaises(AssertionError): + with self.assertRaises(AssertionError): anomaly_detect_ts(['invalid'], max_anoms=0.02, piecewise_median_period_weeks=1, direction='both', threshold=None, only_last=None, longterm=False, plot=False) - + def test_get_data_tuple(self): d_tuple = _get_data_tuple(self.data1, 24, None) raw_data = d_tuple[0] period = d_tuple[1] granularity = d_tuple[2] - - self.assertTrue(isinstance(raw_data, Series)) + + self.assertTrue(isinstance(raw_data, Series)) self.assertTrue(isinstance(period, int)) - self.assertTrue(isinstance(granularity, str)) - - self.assertEquals(24, period) - self.assertEquals('min', granularity) - self.assertEquals(14398, len(raw_data)) - + self.assertTrue(isinstance(granularity, str)) + + self.assertEqual(24, period) + self.assertEqual('min', granularity) + self.assertEqual(14398, len(raw_data)) + def test_get_max_outliers(self): - self.assertEquals(719, _get_max_outliers(self.data1, 0.05)) - + self.assertEqual(719, _get_max_outliers(self.data1, 0.05)) + def test_get_decomposed_data_tuple(self): data, smoothed_data = _get_decomposed_data_tuple(self.data1, 1440) self.assertTrue(isinstance(data, Series)) self.assertTrue(isinstance(smoothed_data, Series)) - self.assertEquals(14398, len(data)) - self.assertEquals(14398, len(smoothed_data)) - + self.assertEqual(14398, len(data)) + self.assertEqual(14398, len(smoothed_data)) + def test_perform_threshold_filter(self): results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', only_last=None, plot=False) periodic_max = self.data1.resample('1D').max() filtered_results = _perform_threshold_filter(results['anoms'], periodic_max, 'med_max') self.assertTrue(isinstance(filtered_results, Series)) - self.assertEquals(4, len(filtered_results)) - + self.assertEqual(4, len(filtered_results)) + def test_get_plot_breaks(self): - self.assertEquals(36, _get_plot_breaks('day', 'day')) - self.assertEquals(12, _get_plot_breaks('min', 'day')) - self.assertEquals(3, _get_plot_breaks('min', 'min')) - + self.assertEqual(36, _get_plot_breaks('day', 'day')) + self.assertEqual(12, _get_plot_breaks('min', 'day')) + self.assertEqual(3, _get_plot_breaks('min', 'min')) + def test_get_only_last_results(self): - results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', + results = anomaly_detect_ts(self.data1, max_anoms=0.02, direction='both', only_last=None, plot=False) last_day = _get_only_last_results(self.data1, results['anoms'], 'min', 'day') last_hr = _get_only_last_results(self.data1, results['anoms'], 'min', 'hr') - self.assertEquals(23, len(last_day)) - self.assertEquals(3, len(last_hr)) + self.assertEqual(23, len(last_day)) + self.assertEqual(3, len(last_hr)) + + def test_plot_data_files(self): + results = anomaly_detect_ts(self.data1, + direction='both', alpha=0.02, max_anoms=0.35, + plot=True, longterm=True) + self.assertEqual("count", results['plot'].get_title()) #just checking that the plot was returned for the test file if __name__ == '__main__': unittest.main()