diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 3501334..2414e2f 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -2,11 +2,11 @@ import config import detectors import logging import pandas as pd -from typing import Optional, Union, Generator +from typing import Optional, Union, Generator, List from models import ModelCache import concurrent.futures import asyncio - +import utils from utils import get_intersected_chunks, get_chunks, prepare_data @@ -62,7 +62,7 @@ class AnalyticUnitWorker: chunk_dataframe = prepare_data(chunk) detected = self._detector.detect(chunk_dataframe, cache) self.__append_detection_result(detection_result, detected) - + detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) return detection_result def cancel(self): @@ -84,6 +84,8 @@ class AnalyticUnitWorker: chunk_dataframe = prepare_data(chunk) detected = self._detector.consume_data(chunk_dataframe, cache) self.__append_detection_result(detection_result, detected) + + detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) if detection_result['lastDetectionTime'] is None: return None diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 06887c1..400ea3a 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -2,11 +2,14 @@ import logging import pandas as pd from typing import Optional, Union, List, Tuple +from analytic_types import AnalyticUnitId from analytic_types.data_bucket import DataBucket from detectors import Detector from models import ModelCache import utils +MAX_DEPENDENCY_LEVEL = 100 +MIN_DEPENDENCY_FACTOR = 0.1 logger = logging.getLogger('ANOMALY_DETECTOR') @@ -27,22 +30,27 @@ class AnomalyDetector(Detector): data = dataframe['value'] last_values = None if cache is not None: - last_values = cache['last_values'] + last_values = cache.get('last_values') - #TODO detection code here - smoth_data = utils.exponential_smoothing(data, cache['alpha']) - upper_bound = utils.exponential_smoothing(data + cache['confidence'], cache['alpha']) - lower_bound = utils.exponential_smoothing(data - cache['confidence'], cache['alpha']) + smothed_data = utils.exponential_smoothing(data, cache['alpha']) + upper_bound = smothed_data + cache['confidence'] + lower_bound = smothed_data - cache['confidence'] - segemnts = [] + anomaly_indexes = [] for idx, val in enumerate(data.values): - if val > upper_bound[idx] or val < lower_bound[idx]: - segemnts.append(idx) - - last_detection_time = dataframe['timestamp'][-1] + if val > upper_bound.values[idx] or val < lower_bound.values[idx]: + anomaly_indexes.append(data.index[idx]) + segments = utils.close_filtering(anomaly_indexes, 1) + segments = utils.get_start_and_end_of_segments(segments) + segments = [( + utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]), + utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]), + ) for segment in segments] + last_dataframe_time = dataframe.iloc[-1]['timestamp'] + last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) return { 'cache': cache, - 'segments': segemnts, + 'segments': segments, 'lastDetectionTime': last_detection_time } @@ -50,29 +58,21 @@ class AnomalyDetector(Detector): self.detect(data, cache) - def __smooth_data(self, dataframe: pd.DataFrame) -> List[Tuple[int, float]]: - ''' - smooth data using exponential smoothing/moving average/weighted_average - ''' - - def __get_confidence_window(self, smooth_data: pd.Series, condfidence: float) -> Tuple[pd.Series, pd.Series]: - ''' - build confidence interval above and below smoothed data - ''' - - def __get_dependency_level(self, alpha: float) -> int: + def get_window_size(self, cache: Optional[ModelCache]) -> int: ''' get the number of values that will affect the next value ''' - for level in range(1, 100): - if (1 - alpha) ** level < 0.1: - break - return level - - def get_window_size(self, cache: Optional[ModelCache]) -> int: if cache is None: raise ValueError('anomaly detector got None cache') + + for level in range(1, MAX_DEPENDENCY_LEVEL): + if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR: + break + return level - #TODO: calculate value based on `alpha` value from cache - return 1 + def get_intersections(self, segments: List[dict]) -> List[dict]: + segments = [[segment['from'], segment['to']] for segment in segments] + segments = utils.merge_intersecting_intervals(segments) + segments = [{'from': segment[0], 'to': segment[1]} for segment in segments] + return segments diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 6b44161..def7c3c 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -1,7 +1,7 @@ from models import ModelCache from abc import ABC, abstractmethod from pandas import DataFrame -from typing import Optional, Union +from typing import Optional, Union, List class Detector(ABC): @@ -24,3 +24,7 @@ class Detector(ABC): @abstractmethod def get_window_size(self, cache: Optional[ModelCache]) -> int: pass + + @abstractmethod + def get_intersections(self, segments: List[dict]) -> List[dict]: + pass diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index caaa053..9bcb06a 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -128,3 +128,6 @@ class PatternDetector(Detector): def get_window_size(self, cache: Optional[ModelCache]) -> int: if cache is None: return self.DEFAULT_WINDOW_SIZE return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE) + + def get_intersections(self, segments: List[dict]) -> List[dict]: + return segments diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index ad4fd13..8e787aa 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -2,7 +2,7 @@ import logging as log import pandas as pd import numpy as np -from typing import Optional +from typing import Optional, List from detectors import Detector from models import ModelCache @@ -80,3 +80,6 @@ class ThresholdDetector(Detector): def get_window_size(self, cache: Optional[ModelCache]) -> int: return self.WINDOW_SIZE + + def get_intersections(self, segments: List[dict]) -> List[dict]: + return segments diff --git a/analytics/analytics/utils/common.py b/analytics/analytics/utils/common.py index 76482e0..06bacff 100644 --- a/analytics/analytics/utils/common.py +++ b/analytics/analytics/utils/common.py @@ -114,7 +114,7 @@ def get_same_length(patterns_list): pat.extend(added_values) return patterns_list -def close_filtering(pattern_list, win_size): +def close_filtering(pattern_list: List[int], win_size: int) -> List[Tuple[int, int]]: if len(pattern_list) == 0: return [] s = [[pattern_list[0]]] @@ -127,6 +127,42 @@ def close_filtering(pattern_list, win_size): s.append([pattern_list[i]]) return s + +def merge_intersecting_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]: + ''' + At the entrance - list of intervals with start and end. + Find intersecting intervals in this list and merge it. + ''' + if len(intervals) < 2: + return intervals + intervals = sorted(intervals) + last_couple = intervals[0] + for i in range(1,len(intervals)): + if intervals[i][0] <= last_couple[1]: + intervals[i][0] = min(last_couple[0], intervals[i][0]) + intervals[i][1] = max(last_couple[1], intervals[i][1]) + intervals[i-1] = [] + last_couple = intervals[i] + intervals = [x for x in intervals if x != []] + return intervals + +def get_start_and_end_of_segments(segments: List[List[int]]) -> List[Tuple[int, int]]: + ''' + find start and end of segment: [1, 2, 3, 4] -> [1, 4] + if segment is 1 index - it will be doubled: [7] -> [7, 7] + ''' + result = [] + for segment in segments: + if len(segment) == 0: + continue + elif len(segment) > 1: + segment = [segment[0], segment[-1]] + else: + segment = [segment[0], segment[0]] + result.append(segment) + return result + + def best_pattern(pattern_list: list, data: pd.Series, dir: str) -> list: new_pattern_list = [] for val in pattern_list: diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 05d98e2..08ce2d8 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -1,7 +1,7 @@ import unittest import pandas as pd -from detectors import pattern_detector, threshold_detector +from detectors import pattern_detector, threshold_detector, anomaly_detector class TestPatternDetector(unittest.TestCase): @@ -28,3 +28,21 @@ class TestThresholdDetector(unittest.TestCase): with self.assertRaises(ValueError): detector.detect([], {}) + + +class TestAnomalyDetector(unittest.TestCase): + + def test_dataframe(self): + data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1] + data_ind = [1523889000000 + i for i in range(len(data_val))] + data = {'timestamp': data_ind, 'value': data_val} + dataframe = pd.DataFrame(data = data) + dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms') + cache = { + 'confidence': 2, + 'alpha': 0.1, + } + detector = anomaly_detector.AnomalyDetector() + detect_result = detector.detect(dataframe, cache) + result = [(1523889000005.0, 1523889000005.0)] + self.assertEqual(result, detect_result['segments']) diff --git a/analytics/tests/test_utils.py b/analytics/tests/test_utils.py index b7a0d01..494e70e 100644 --- a/analytics/tests/test_utils.py +++ b/analytics/tests/test_utils.py @@ -270,7 +270,7 @@ class TestUtils(unittest.TestCase): data = pd.Series([5,4,3,2,1,0,1,2,3]) result_list = [4, 5, 6] self.assertIn(utils.get_end_of_segment(data, False), result_list) - + def test_get_borders_of_peaks(self): data = pd.Series([1,0,1,2,3,2,1,0,0,1,2,3,4,3,2,2,1,0,1,2,3,4,5,3,2,1,0]) pattern_center = [4, 12, 22] @@ -278,7 +278,7 @@ class TestUtils(unittest.TestCase): confidence = 1.5 result = [(1, 7), (9, 15), (19, 25)] self.assertEqual(utils.get_borders_of_peaks(pattern_center, data, ws, confidence), result) - + def test_get_borders_of_peaks_for_trough(self): data = pd.Series([4,4,5,5,3,1,3,5,5,6,3,2]) pattern_center = [5] @@ -287,5 +287,87 @@ class TestUtils(unittest.TestCase): result = [(3, 7)] self.assertEqual(utils.get_borders_of_peaks(pattern_center, data, ws, confidence, inverse = True), result) + def test_get_start_and_end_of_segments(self): + segments = [[1, 2, 3, 4], [5, 6, 7], [8], [], [12, 12]] + result = [[1, 4], [5, 7], [8, 8], [12, 12]] + utils_result = utils.get_start_and_end_of_segments(segments) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_get_start_and_end_of_segments_empty(self): + segments = [] + result = [] + utils_result = utils.get_start_and_end_of_segments(segments) + self.assertEqual(result, utils_result) + + def test_merge_intersecting_intervals(self): + index = [[10, 20], [30, 40]] + result = [[10, 20], [30, 40]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_merge_intersecting_intervals_1(self): + index = [[10, 20], [13, 23], [15, 17], [20, 40]] + result = [[10, 40]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_merge_intersecting_intervals_empty(self): + index = [] + result = [] + utils_result = utils.merge_intersecting_intervals(index) + self.assertEqual(result, utils_result) + + def test_merge_intersecting_intervals_one(self): + index = [[10, 20]] + result = [[10, 20]] + utils_result = utils.merge_intersecting_intervals(index) + self.assertEqual(result, utils_result) + + def test_merge_intersecting_intervals_2(self): + index = [[10, 20], [13, 23], [25, 30], [35, 40]] + result = [[10, 23], [25, 30], [35, 40]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_merge_intersecting_intervals_3(self): + index = [[10, 50], [5, 40], [15, 25], [6, 50]] + result = [[5, 50]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_merge_intersecting_intervals_4(self): + index = [[5, 10], [10, 20], [25, 50]] + result = [[5, 20], [25, 50]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_merge_intersecting_intervals_5(self): + index = [[20, 40], [10, 15], [50, 60]] + result = [[10, 15], [20, 40], [50, 60]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + + def test_merge_intersecting_intervals_6(self): + index = [[20, 40], [10, 20], [50, 60]] + result = [[10, 40], [50, 60]] + utils_result = utils.merge_intersecting_intervals(index) + for idx, val in enumerate(utils_result): + self.assertEqual(result[idx][0], val[0]) + self.assertEqual(result[idx][1], val[1]) + if __name__ == '__main__': unittest.main()