diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index e81ad08..bbbffa9 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -17,7 +17,7 @@ def get_detector_by_type( if detector_type == 'pattern': return detectors.PatternDetector(analytic_unit_type, analytic_unit_id) elif detector_type == 'threshold': - return detectors.ThresholdDetector() + return detectors.ThresholdDetector(analytic_unit_id) elif detector_type == 'anomaly': return detectors.AnomalyDetector(analytic_unit_id) diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 460f014..04d7d5a 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -1,8 +1,6 @@ from enum import Enum import logging import numpy as np -import operator -from collections import OrderedDict import pandas as pd import math from typing import Optional, Union, List, Tuple @@ -27,12 +25,12 @@ class Bound(Enum): class AnomalyDetector(ProcessingDetector): def __init__(self, analytic_unit_id: AnalyticUnitId): - self.analytic_unit_id = analytic_unit_id + super().__init__(analytic_unit_id) self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: segments = payload.get('segments') - enable_bounds: str = payload.get('enableBounds') or 'ALL' + enable_bounds = Bound(payload.get('enableBounds') or 'ALL') prepared_segments = [] time_step = utils.find_interval(dataframe) @@ -40,7 +38,7 @@ class AnomalyDetector(ProcessingDetector): 'confidence': payload['confidence'], 'alpha': payload['alpha'], 'timeStep': time_step, - 'enableBounds': enable_bounds + 'enableBounds': enable_bounds.value } if segments is not None: @@ -65,55 +63,53 @@ class AnomalyDetector(ProcessingDetector): 'cache': new_cache } - # TODO: ModelCache -> ModelState + # TODO: ModelCache -> DetectorState def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult: + if cache == None: + raise f'Analytic unit {self.analytic_unit_id} got empty cache' data = dataframe['value'] - time_step = cache['timeStep'] - segments = cache.get('segments') - enable_bounds: str = cache.get('enableBounds') or 'ALL' - smoothed_data = utils.exponential_smoothing(data, cache['alpha']) - - # TODO: use class for cache to avoid using string literals and Bound.TYPE.value - bounds = OrderedDict() - bounds[Bound.LOWER.value] = ( smoothed_data - cache['confidence'], operator.lt ) - bounds[Bound.UPPER.value] = ( smoothed_data + cache['confidence'], operator.gt ) + # TODO: use class for cache to avoid using string literals + alpha = self.get_value_from_cache(cache, 'alpha', required = True) + confidence = self.get_value_from_cache(cache, 'confidence', required = True) + segments = self.get_value_from_cache(cache, 'segments') + enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL') - if enable_bounds == Bound.LOWER.value: - del bounds[Bound.UPPER.value] - - if enable_bounds == Bound.UPPER.value: - del bounds[Bound.LOWER.value] + smoothed_data = utils.exponential_smoothing(data, alpha) + lower_bound = smoothed_data - confidence + upper_bound = smoothed_data + confidence if segments is not None: - seasonality = cache.get('seasonality') - assert seasonality is not None and seasonality > 0, \ + time_step = self.get_value_from_cache(cache, 'timeStep', required = True) + seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) + assert seasonality > 0, \ f'{self.analytic_unit_id} got invalid seasonality {seasonality}' data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - data_second_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][1]) for segment in segments: seasonality_index = seasonality // time_step season_count = math.ceil(abs(segment['from'] - data_start_time) / seasonality) start_seasonal_segment = segment['from'] + seasonality * season_count seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step - #TODO: upper and lower bounds for segment_data + segment_data = pd.Series(segment['data']) - for bound_type, bound_data in bounds.items(): - bound_data, _ = bound_data - bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type) - assert len(smoothed_data) == len(bounds[bound_type]), \ - f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}' + + lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) + upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) anomaly_indexes = [] for idx, val in enumerate(data.values): - for bound_type, bound_data in bounds.items(): - bound_data, comparator = bound_data - if comparator(val, bound_data.values[idx]): + if val > upper_bound.values[idx]: + if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL: anomaly_indexes.append(data.index[idx]) + + if val < lower_bound.values[idx]: + if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL: + anomaly_indexes.append(data.index[idx]) + # TODO: use Segment in utils segments = utils.close_filtering(anomaly_indexes, 1) segments = utils.get_start_and_end_of_segments(segments) @@ -176,34 +172,27 @@ class AnomalyDetector(ProcessingDetector): result.segments = utils.merge_intersecting_segments(result.segments, time_step) return result - # TODO: ModelCache -> ModelState (don't use string literals) + # TODO: remove duplication with detect() def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult: - segments = cache.get('segments') - enable_bounds: str = cache.get('enableBounds') or 'ALL' + segments = self.get_value_from_cache(cache, 'segments') + alpha = self.get_value_from_cache(cache, 'alpha', required = True) + confidence = self.get_value_from_cache(cache, 'confidence', required = True) + enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL') # TODO: exponential_smoothing should return dataframe with related timestamps - smoothed_data = utils.exponential_smoothing(dataframe['value'], cache['alpha']) - - bounds = OrderedDict() - bounds[Bound.LOWER.value] = smoothed_data - cache['confidence'] - bounds[Bound.UPPER.value] = smoothed_data + cache['confidence'] + smoothed_data = utils.exponential_smoothing(dataframe['value'], alpha) - if enable_bounds == Bound.LOWER.value: - del bounds[Bound.UPPER.value] - - if enable_bounds == Bound.UPPER.value: - del bounds[Bound.LOWER.value] - - - # TODO: remove duplication with detect() + lower_bound = smoothed_data - confidence + upper_bound = smoothed_data + confidence if segments is not None: - seasonality = cache.get('seasonality') - assert seasonality is not None and seasonality > 0, \ + seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) + assert seasonality > 0, \ f'{self.analytic_unit_id} got invalid seasonality {seasonality}' data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - time_step = cache['timeStep'] + + time_step = self.get_value_from_cache(cache, 'timeStep', required = True) for segment in segments: seasonality_index = seasonality // time_step @@ -212,19 +201,22 @@ class AnomalyDetector(ProcessingDetector): start_seasonal_segment = segment['from'] + seasonality * season_count seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step segment_data = pd.Series(segment['data']) - for bound_type, bound_data in bounds.items(): - bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type) - assert len(smoothed_data) == len(bounds[bound_type]), \ - f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}' + + lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) + upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) # TODO: support multiple segments timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) - result_bounds = {} - for bound_type, bound_data in bounds.items(): - result_bounds[bound_type] = list(zip(timestamps, bound_data.values.tolist())) - result = AnomalyProcessingResult(lower_bound=result_bounds.get(Bound.LOWER.value), upper_bound=result_bounds.get(Bound.UPPER.value)) - return result + lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist())) + upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist())) + + if enable_bounds == Bound.ALL: + return AnomalyProcessingResult(lower_bound_timeseries, upper_bound_timeseries) + elif enable_bounds == Bound.UPPER: + return AnomalyProcessingResult(upper_bound = upper_bound_timeseries) + elif enable_bounds == Bound.LOWER: + return AnomalyProcessingResult(lower_bound = lower_bound_timeseries) def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series: #data - smoothed data to which seasonality will be added @@ -236,14 +228,14 @@ class AnomalyDetector(ProcessingDetector): #TODO: add seasonality for non empty parts continue if (idx - offset) % seasonality == 0: - if bound_type == Bound.UPPER.value: + if bound_type == Bound.UPPER: upper_segment_bound = self.get_bounds_for_segment(segment)[0] data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0) - elif bound_type == Bound.LOWER.value: + elif bound_type == Bound.LOWER: lower_segment_bound = self.get_bounds_for_segment(segment)[1] data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0) else: - raise ValueError(f'unknown {bound_type}') + raise ValueError(f'unknown bound type: {bound_type.value}') return data[:len_smoothed_data] diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index f7ae2ca..3920fda 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -2,13 +2,16 @@ from abc import ABC, abstractmethod from pandas import DataFrame from typing import Optional, Union, List -from analytic_types import ModelCache, TimeSeries +from analytic_types import ModelCache, TimeSeries, AnalyticUnitId from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.segment import Segment class Detector(ABC): + def __init__(self, analytic_unit_id: AnalyticUnitId): + self.analytic_unit_id = analytic_unit_id + @abstractmethod def train(self, dataframe: DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: """ @@ -39,6 +42,12 @@ class Detector(ABC): result.cache = detection.cache return result + def get_value_from_cache(self, cache: ModelCache, key: str, required = False): + value = cache.get(key) + if value == None and required: + raise ValueError(f'Missing required "{key}" field in cache for analytic unit {self.analytic_unit_id}') + return value + class ProcessingDetector(Detector): diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index b95b9c1..936e6bc 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -41,7 +41,7 @@ class PatternDetector(Detector): DEFAULT_WINDOW_SIZE = 1 def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId): - self.analytic_unit_id = analytic_unit_id + super().__init__(analytic_unit_id) self.pattern_type = pattern_type self.model = resolve_model_by_pattern(self.pattern_type) self.bucket = DataBucket() diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 1e3352f..979e7f1 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -5,7 +5,7 @@ import pandas as pd import numpy as np from typing import Optional, List -from analytic_types import ModelCache +from analytic_types import ModelCache, AnalyticUnitId from analytic_types.detector_typing import DetectionResult from analytic_types.segment import Segment from detectors import Detector @@ -20,8 +20,8 @@ class ThresholdDetector(Detector): WINDOW_SIZE = 3 - def __init__(self): - pass + def __init__(self, analytic_unit_id: AnalyticUnitId): + super().__init__(analytic_unit_id) def train(self, dataframe: pd.DataFrame, threshold: dict, cache: Optional[ModelCache]) -> ModelCache: time_step = utils.find_interval(dataframe) diff --git a/analytics/analytics/utils/common.py b/analytics/analytics/utils/common.py index 191a980..96a3a91 100644 --- a/analytics/analytics/utils/common.py +++ b/analytics/analytics/utils/common.py @@ -36,6 +36,9 @@ def exponential_smoothing(series: pd.Series, alpha: float, last_smoothed_value: series.values[n] = result[n] else: result.append(alpha * series[n] + (1 - alpha) * result[n - 1]) + + assert len(result) == len(series), \ + f'len of smoothed data {len(result)} != len of original dataset {len(series)}' return pd.Series(result, index = series.index) def find_pattern(data: pd.Series, height: float, length: int, pattern_type: str) -> list: diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 6af58e5..9981625 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -2,7 +2,7 @@ import unittest import pandas as pd from detectors import pattern_detector, threshold_detector, anomaly_detector -from analytic_types.detector_typing import DetectionResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult class TestPatternDetector(unittest.TestCase): @@ -10,10 +10,9 @@ class TestPatternDetector(unittest.TestCase): data = [[0,1], [1,2]] dataframe = pd.DataFrame(data, columns=['timestamp', 'values']) - cache = {'windowSize': 10} + cache = { 'windowSize': 10 } detector = pattern_detector.PatternDetector('GENERAL', 'test_id') - with self.assertRaises(ValueError): detector.detect(dataframe, cache) @@ -22,8 +21,8 @@ class TestThresholdDetector(unittest.TestCase): def test_invalid_cache(self): - detector = threshold_detector.ThresholdDetector() - + detector = threshold_detector.ThresholdDetector('test_id') + with self.assertRaises(ValueError): detector.detect([], None) @@ -33,7 +32,7 @@ class TestThresholdDetector(unittest.TestCase): class TestAnomalyDetector(unittest.TestCase): - def test_dataframe(self): + def test_detect(self): data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1] data_ind = [1523889000000 + i for i in range(len(data_val))] data = {'timestamp': data_ind, 'value': data_val} @@ -45,8 +44,91 @@ class TestAnomalyDetector(unittest.TestCase): 'timeStep': 1 } detector = anomaly_detector.AnomalyDetector('test_id') - detect_result: DetectionResult = detector.detect(dataframe, cache) + detect_result: DetectionResult = detector.detect(dataframe, cache) detected_segments = list(map(lambda s: {'from': s.from_timestamp, 'to': s.to_timestamp}, detect_result.segments)) result = [{ 'from': 1523889000005.0, 'to': 1523889000005.0 }] self.assertEqual(result, detected_segments) + + cache = { + 'confidence': 2, + 'alpha': 0.1, + 'timeStep': 1, + 'seasonality': 4, + 'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [10] }] + } + detect_result: DetectionResult = detector.detect(dataframe, cache) + detected_segments = list(map(lambda s: {'from': s.from_timestamp, 'to': s.to_timestamp}, detect_result.segments)) + result = [] + self.assertEqual(result, detected_segments) + + def test_process_data(self): + data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1] + data_ind = [1523889000000 + i for i in range(len(data_val))] + data = {'timestamp': data_ind, 'value': data_val} + dataframe = pd.DataFrame(data = data) + dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms') + cache = { + 'confidence': 2, + 'alpha': 0.1, + 'timeStep': 1 + } + detector = anomaly_detector.AnomalyDetector('test_id') + detect_result: ProcessingResult = detector.process_data(dataframe, cache) + expected_result = { + 'lowerBound': [ + (1523889000000, -2.0), + (1523889000001, -1.9), + (1523889000002, -1.71), + (1523889000003, -1.6389999999999998), + (1523889000004, -1.4750999999999999), + (1523889000005, -0.5275899999999998), + (1523889000006, -0.5748309999999996), + (1523889000007, -0.5173478999999996), + (1523889000008, -0.5656131099999995) + ], + 'upperBound': [ + (1523889000000, 2.0), + (1523889000001, 2.1), + (1523889000002, 2.29), + (1523889000003, 2.361), + (1523889000004, 2.5249), + (1523889000005, 3.47241), + (1523889000006, 3.4251690000000004), + (1523889000007, 3.4826521), + (1523889000008, 3.4343868900000007) + ]} + self.assertEqual(detect_result.to_json(), expected_result) + + cache = { + 'confidence': 2, + 'alpha': 0.1, + 'timeStep': 1, + 'seasonality': 5, + 'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [1] }] + } + detect_result: ProcessingResult = detector.process_data(dataframe, cache) + expected_result = { + 'lowerBound': [ + (1523889000000, -2.0), + (1523889000001, -2.9), + (1523889000002, -1.71), + (1523889000003, -1.6389999999999998), + (1523889000004, -1.4750999999999999), + (1523889000005, -0.5275899999999998), + (1523889000006, -1.5748309999999996), + (1523889000007, -0.5173478999999996), + (1523889000008, -0.5656131099999995) + ], + 'upperBound': [ + (1523889000000, 2.0), + (1523889000001, 3.1), + (1523889000002, 2.29), + (1523889000003, 2.361), + (1523889000004, 2.5249), + (1523889000005, 3.47241), + (1523889000006, 4.425169), + (1523889000007, 3.4826521), + (1523889000008, 3.4343868900000007) + ]} + self.assertEqual(detect_result.to_json(), expected_result)