diff --git a/analytics/analytics/analytic_types/cache.py b/analytics/analytics/analytic_types/cache.py new file mode 100644 index 0000000..efe3261 --- /dev/null +++ b/analytics/analytics/analytic_types/cache.py @@ -0,0 +1,38 @@ +from typing import Optional, List, Dict + +from analytic_types.segment import AnomalyDetectorSegment +from analytic_types.detector_typing import Bound + +from utils.meta import JSONClass, SerializableList + +@JSONClass +class AnomalyCache: + def __init__( + self, + alpha: float, + confidence: float, + enable_bounds: str, + seasonality: Optional[int] = None, + segments: Optional[List[Dict]] = None, + time_step: Optional[int] = None, + ): + self.alpha = alpha + self.confidence = confidence + self.enable_bounds = enable_bounds + if seasonality != None and seasonality < 0: + raise ValueError(f'Can`t create AnomalyCache: got invalid seasonality {seasonality}') + self.seasonality = seasonality + self.time_step = time_step + if segments != None: + anomaly_segments = map(AnomalyDetectorSegment.from_json, segments) + self.segments = SerializableList(anomaly_segments) + else: + self.segments = [] + + def set_segments(self, segments: List[AnomalyDetectorSegment]): + if len(segments) > 0: + self.segments = SerializableList(segments) + + def get_enabled_bounds(self) -> Bound: + #TODO: use class with to_json() + return Bound(self.enable_bounds) diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index 5e3fd35..87585cc 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -1,10 +1,16 @@ from analytic_types import ModelCache, TimeSeries from analytic_types.segment import Segment +from enum import Enum from typing import List, Optional, Tuple import utils.meta +class Bound(Enum): + ALL = 'ALL' + UPPER = 'UPPER' + LOWER = 'LOWER' + class DetectionResult: def __init__( diff --git a/analytics/analytics/detectors/__init__.py b/analytics/analytics/detectors/__init__.py index 1c76014..370f0f2 100644 --- a/analytics/analytics/detectors/__init__.py +++ b/analytics/analytics/detectors/__init__.py @@ -1,4 +1,4 @@ from detectors.detector import Detector, ProcessingDetector from detectors.pattern_detector import PatternDetector from detectors.threshold_detector import ThresholdDetector -from detectors.anomaly_detector import AnomalyDetector, Bound +from detectors.anomaly_detector import AnomalyDetector diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 0f2fd39..2d8ec5a 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -6,9 +6,10 @@ import math from typing import Optional, Union, List, Tuple, Generator from analytic_types import AnalyticUnitId, ModelCache -from analytic_types.detector_typing import DetectionResult, ProcessingResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult, Bound from analytic_types.data_bucket import DataBucket from analytic_types.segment import Segment, AnomalyDetectorSegment +from analytic_types.cache import AnomalyCache from detectors import Detector, ProcessingDetector import utils @@ -17,10 +18,6 @@ MIN_DEPENDENCY_FACTOR = 0.1 BASIC_ALPHA = 0.5 logger = logging.getLogger('ANOMALY_DETECTOR') -class Bound(Enum): - ALL = 'ALL' - UPPER = 'UPPER' - LOWER = 'LOWER' class AnomalyDetector(ProcessingDetector): @@ -29,25 +26,15 @@ class AnomalyDetector(ProcessingDetector): self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: - segments = payload.get('segments') - enable_bounds = Bound(payload.get('enableBounds') or 'ALL') - prepared_segments = [] - time_step = utils.find_interval(dataframe) - - new_cache = { - 'confidence': payload['confidence'], - 'alpha': payload['alpha'], - 'timeStep': time_step, - 'enableBounds': enable_bounds.value - } + cache = AnomalyCache.from_json(payload) + cache.time_step = utils.find_interval(dataframe) + segments = cache.segments - if segments is not None: - seasonality = payload.get('seasonality') - assert seasonality is not None and seasonality > 0, \ - f'{self.analytic_unit_id} got invalid seasonality {seasonality}' - parsed_segments = map(Segment.from_json, segments) + if len(segments) > 0: + seasonality = cache.seasonality + prepared_segments = [] - for segment in parsed_segments: + for segment in segments: segment_len = (int(segment.to_timestamp) - int(segment.from_timestamp)) assert segment_len <= seasonality, \ f'seasonality {seasonality} must be greater than segment length {segment_len}' @@ -55,17 +42,17 @@ class AnomalyDetector(ProcessingDetector): from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.from_timestamp, unit='ms')) to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.to_timestamp, unit='ms')) segment_data = dataframe[from_index : to_index] - prepared_segments.append({ - 'from': segment.from_timestamp, - 'to': segment.to_timestamp, - 'data': segment_data.value.tolist() - }) - - new_cache['seasonality'] = seasonality - new_cache['segments'] = prepared_segments + prepared_segments.append( + AnomalyDetectorSegment( + segment.from_timestamp, + segment.to_timestamp, + segment_data.value.tolist() + ) + ) + cache.set_segments(prepared_segments) return { - 'cache': new_cache + 'cache': cache.to_json() } # TODO: ModelCache -> DetectorState @@ -74,41 +61,37 @@ class AnomalyDetector(ProcessingDetector): raise f'Analytic unit {self.analytic_unit_id} got empty cache' data = dataframe['value'] - # TODO: use class for cache to avoid using string literals - alpha = self.get_value_from_cache(cache, 'alpha', required = True) - confidence = self.get_value_from_cache(cache, 'confidence', required = True) - segments = self.get_value_from_cache(cache, 'segments') - enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL') - - smoothed_data = utils.exponential_smoothing(data, alpha) - - lower_bound = smoothed_data - confidence - upper_bound = smoothed_data + confidence + cache = AnomalyCache.from_json(cache) + segments = cache.segments + enabled_bounds = cache.get_enabled_bounds() - if segments is not None: - parsed_segments = map(AnomalyDetectorSegment.from_json, segments) + smoothed_data = utils.exponential_smoothing(data, cache.alpha) - time_step = self.get_value_from_cache(cache, 'timeStep', required = True) - seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) - assert seasonality > 0, \ - f'{self.analytic_unit_id} got invalid seasonality {seasonality}' + lower_bound = smoothed_data - cache.confidence + upper_bound = smoothed_data + cache.confidence + if len(segments) > 0: data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - for segment in parsed_segments: - seasonality_index = seasonality // time_step - seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step) + for segment in segments: + seasonality_index = cache.seasonality // cache.time_step + seasonality_offset = self.get_seasonality_offset( + segment.from_timestamp, + cache.seasonality, + data_start_time, + cache.time_step + ) segment_data = pd.Series(segment.data) lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) - detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds)) + detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enabled_bounds)) last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) - return DetectionResult(cache, detected_segments, last_detection_time) + return DetectionResult(cache.to_json(), detected_segments, last_detection_time) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: if cache is None: @@ -138,14 +121,15 @@ class AnomalyDetector(ProcessingDetector): if cache is None: raise ValueError('anomaly detector got None cache') + cache = AnomalyCache.from_json(cache) for level in range(1, MAX_DEPENDENCY_LEVEL): - if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR: + if (1 - cache.alpha) ** level < MIN_DEPENDENCY_FACTOR: break seasonality = 0 - if cache.get('segments') is not None and cache['seasonality'] > 0: - seasonality = cache['seasonality'] // cache['timeStep'] + if len(cache.segments) > 0: + seasonality = cache.seasonality // cache.time_step return max(level, seasonality) def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: @@ -160,31 +144,28 @@ class AnomalyDetector(ProcessingDetector): # TODO: remove duplication with detect() def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult: - segments = self.get_value_from_cache(cache, 'segments') - alpha = self.get_value_from_cache(cache, 'alpha', required = True) - confidence = self.get_value_from_cache(cache, 'confidence', required = True) - enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL') + cache = AnomalyCache.from_json(cache) + segments = cache.segments + enabled_bounds = cache.get_enabled_bounds() # TODO: exponential_smoothing should return dataframe with related timestamps - smoothed_data = utils.exponential_smoothing(dataframe['value'], alpha) - - lower_bound = smoothed_data - confidence - upper_bound = smoothed_data + confidence + smoothed_data = utils.exponential_smoothing(dataframe['value'], cache.alpha) - if segments is not None: - segments: List[AnomalyDetectorSegment] = map(AnomalyDetectorSegment.from_json, segments) - seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) - assert seasonality > 0, \ - f'{self.analytic_unit_id} got invalid seasonality {seasonality}' + lower_bound = smoothed_data - cache.confidence + upper_bound = smoothed_data + cache.confidence + if len(segments) > 0: data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - time_step = self.get_value_from_cache(cache, 'timeStep', required = True) - for segment in segments: - seasonality_index = seasonality // time_step + seasonality_index = cache.seasonality // cache.time_step # TODO: move it to utils and add tests - seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step) + seasonality_offset = self.get_seasonality_offset( + segment.from_timestamp, + cache.seasonality, + data_start_time, + cache.time_step + ) segment_data = pd.Series(segment.data) lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) @@ -196,11 +177,11 @@ class AnomalyDetector(ProcessingDetector): lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist())) upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist())) - if enable_bounds == Bound.ALL: + if enabled_bounds == Bound.ALL: return ProcessingResult(lower_bound_timeseries, upper_bound_timeseries) - elif enable_bounds == Bound.UPPER: + elif enabled_bounds == Bound.UPPER: return ProcessingResult(upper_bound = upper_bound_timeseries) - elif enable_bounds == Bound.LOWER: + elif enabled_bounds == Bound.LOWER: return ProcessingResult(lower_bound = lower_bound_timeseries) def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series: @@ -275,14 +256,14 @@ class AnomalyDetector(ProcessingDetector): dataframe: pd.DataFrame, upper_bound: pd.DataFrame, lower_bound: pd.DataFrame, - enable_bounds: Bound + enabled_bounds: Bound ) -> Generator[Segment, None, Segment]: in_segment = False segment_start = 0 bound: Bound = None for idx, val in enumerate(dataframe['value'].values): if val > upper_bound.values[idx]: - if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL: + if enabled_bounds == Bound.UPPER or enabled_bounds == Bound.ALL: if not in_segment: in_segment = True segment_start = dataframe['timestamp'][idx] @@ -290,7 +271,7 @@ class AnomalyDetector(ProcessingDetector): continue if val < lower_bound.values[idx]: - if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL: + if enabled_bounds == Bound.LOWER or enabled_bounds == Bound.ALL: if not in_segment: in_segment = True segment_start = dataframe['timestamp'][idx] diff --git a/analytics/analytics/utils/__init__.py b/analytics/analytics/utils/__init__.py index d2c3555..21077e6 100644 --- a/analytics/analytics/utils/__init__.py +++ b/analytics/analytics/utils/__init__.py @@ -1,3 +1,4 @@ from utils.common import * from utils.time import * from utils.dataframe import * +from utils.meta import * diff --git a/analytics/analytics/utils/meta.py b/analytics/analytics/utils/meta.py index b6365d7..c2771a1 100644 --- a/analytics/analytics/utils/meta.py +++ b/analytics/analytics/utils/meta.py @@ -22,6 +22,12 @@ def is_field_private(field_name: str) -> Optional[str]: m = re.match(r'_[^(__)]+__', field_name) return m is not None +def serialize(obj): + if hasattr(obj, 'to_json') == True: + return obj.to_json() + else: + return obj + def inited_params(target_init): target_params = signature(target_init).parameters.values() if len(target_params) < 1: @@ -55,7 +61,7 @@ def JSONClass(target_class): where all None - values and private fileds are skipped """ return { - underscore_to_camel(k): v for k, v in self.__dict__.items() + underscore_to_camel(k): serialize(v) for k, v in self.__dict__.items() if v is not None and not is_field_private(k) } @@ -69,3 +75,7 @@ def JSONClass(target_class): target_class.to_json = to_json target_class.from_json = from_json return target_class + +class SerializableList(list): + def to_json(self): + return list(map(lambda s: s.to_json(), self)) diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 8916ceb..35951fd 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -1,8 +1,8 @@ import unittest import pandas as pd -from detectors import pattern_detector, threshold_detector, anomaly_detector, Bound -from analytic_types.detector_typing import DetectionResult, ProcessingResult +from detectors import pattern_detector, threshold_detector, anomaly_detector +from analytic_types.detector_typing import DetectionResult, ProcessingResult, Bound from analytic_types.segment import Segment from tests.test_dataset import create_dataframe, create_list_of_timestamps from utils import convert_pd_timestamp_to_ms @@ -73,6 +73,7 @@ class TestAnomalyDetector(unittest.TestCase): cache = { 'confidence': 2, 'alpha': 0.1, + 'enableBounds': 'ALL', 'timeStep': 1 } detector = anomaly_detector.AnomalyDetector('test_id') @@ -85,6 +86,7 @@ class TestAnomalyDetector(unittest.TestCase): cache = { 'confidence': 2, 'alpha': 0.1, + 'enableBounds': 'ALL', 'timeStep': 1, 'seasonality': 4, 'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [10] }] @@ -103,6 +105,7 @@ class TestAnomalyDetector(unittest.TestCase): cache = { 'confidence': 2, 'alpha': 0.1, + 'enableBounds': 'ALL', 'timeStep': 1 } detector = anomaly_detector.AnomalyDetector('test_id') @@ -135,6 +138,7 @@ class TestAnomalyDetector(unittest.TestCase): cache = { 'confidence': 2, 'alpha': 0.1, + 'enableBounds': 'ALL', 'timeStep': 1, 'seasonality': 5, 'segments': [{ 'from': 1523889000001, 'to': 1523889000002,'data': [1] }] @@ -182,7 +186,7 @@ class TestAnomalyDetector(unittest.TestCase): dataframe = create_dataframe(data) upper_bound = pd.Series([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) lower_bound = pd.Series([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) - segments = list(detector.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds=Bound.ALL)) + segments = list(detector.detections_generator(dataframe, upper_bound, lower_bound, enabled_bounds=Bound.ALL)) segments_borders = list(map(lambda s: [s.from_timestamp, s.to_timestamp], segments)) self.assertEqual(segments_borders, [[timestamps[2], timestamps[2]], [timestamps[4], timestamps[8]]]) diff --git a/analytics/tests/test_utils.py b/analytics/tests/test_utils.py index e1a81a4..b691069 100644 --- a/analytics/tests/test_utils.py +++ b/analytics/tests/test_utils.py @@ -362,5 +362,12 @@ class TestUtils(unittest.TestCase): self.assertEqual(got.from_timestamp, expected[0]) self.assertEqual(got.to_timestamp, expected[1]) + def test_serialize(self): + segment_list = [Segment(100,200)] + serialize_list = utils.meta.SerializableList(segment_list) + meta_result = utils.meta.serialize(serialize_list) + expected_result = [{ 'from': 100, 'to': 200 }] + self.assertEqual(meta_result, expected_result) + if __name__ == '__main__': unittest.main()