diff --git a/analytics/analytics/analytic_types/segment.py b/analytics/analytics/analytic_types/segment.py index e15033a..8c45427 100644 --- a/analytics/analytics/analytic_types/segment.py +++ b/analytics/analytics/analytic_types/segment.py @@ -27,3 +27,31 @@ class Segment: self.labeled = labeled self.deleted = deleted self.message = message + +@utils.meta.JSONClass +class AnomalyDetectorSegment(Segment): + ''' + Used for segment manipulation instead of { 'from': ..., 'to': ..., 'data': ... } dict + ''' + + def __init__( + self, + from_timestamp: int, + to_timestamp: int, + data = [], + _id: Optional[str] = None, + analytic_unit_id: Optional[str] = None, + labeled: Optional[bool] = None, + deleted: Optional[bool] = None, + message: Optional[str] = None + ): + super().__init__( + from_timestamp, + to_timestamp, + _id, + analytic_unit_id, + labeled, + deleted, + message + ) + self.data = data diff --git a/analytics/analytics/detectors/__init__.py b/analytics/analytics/detectors/__init__.py index 370f0f2..1c76014 100644 --- a/analytics/analytics/detectors/__init__.py +++ b/analytics/analytics/detectors/__init__.py @@ -1,4 +1,4 @@ from detectors.detector import Detector, ProcessingDetector from detectors.pattern_detector import PatternDetector from detectors.threshold_detector import ThresholdDetector -from detectors.anomaly_detector import AnomalyDetector +from detectors.anomaly_detector import AnomalyDetector, Bound diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 7ab342d..0f2fd39 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -3,12 +3,12 @@ import logging import numpy as np import pandas as pd import math -from typing import Optional, Union, List, Tuple +from typing import Optional, Union, List, Tuple, Generator from analytic_types import AnalyticUnitId, ModelCache from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.data_bucket import DataBucket -from analytic_types.segment import Segment +from analytic_types.segment import Segment, AnomalyDetectorSegment from detectors import Detector, ProcessingDetector import utils @@ -45,16 +45,21 @@ class AnomalyDetector(ProcessingDetector): seasonality = payload.get('seasonality') assert seasonality is not None and seasonality > 0, \ f'{self.analytic_unit_id} got invalid seasonality {seasonality}' + parsed_segments = map(Segment.from_json, segments) - for segment in segments: - segment_len = (int(segment['to']) - int(segment['from'])) + for segment in parsed_segments: + segment_len = (int(segment.to_timestamp) - int(segment.from_timestamp)) assert segment_len <= seasonality, \ - f'seasonality {seasonality} must be great then segment length {segment_len}' + f'seasonality {seasonality} must be greater than segment length {segment_len}' - from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'], unit='ms')) - to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'], unit='ms')) + from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.from_timestamp, unit='ms')) + to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.to_timestamp, unit='ms')) segment_data = dataframe[from_index : to_index] - prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()}) + prepared_segments.append({ + 'from': segment.from_timestamp, + 'to': segment.to_timestamp, + 'data': segment_data.value.tolist() + }) new_cache['seasonality'] = seasonality new_cache['segments'] = prepared_segments @@ -81,6 +86,7 @@ class AnomalyDetector(ProcessingDetector): upper_bound = smoothed_data + confidence if segments is not None: + parsed_segments = map(AnomalyDetectorSegment.from_json, segments) time_step = self.get_value_from_cache(cache, 'timeStep', required = True) seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) @@ -89,40 +95,20 @@ class AnomalyDetector(ProcessingDetector): data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) - for segment in segments: + for segment in parsed_segments: seasonality_index = seasonality // time_step - season_count = math.ceil(abs(segment['from'] - data_start_time) / seasonality) - start_seasonal_segment = segment['from'] + seasonality * season_count - seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step - - segment_data = pd.Series(segment['data']) + seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step) + segment_data = pd.Series(segment.data) lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) - anomaly_indexes = [] - for idx, val in enumerate(data.values): - if val > upper_bound.values[idx]: - if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL: - anomaly_indexes.append(data.index[idx]) - - if val < lower_bound.values[idx]: - if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL: - anomaly_indexes.append(data.index[idx]) - - # TODO: use Segment in utils - segments = utils.close_filtering(anomaly_indexes, 1) - segments = utils.get_start_and_end_of_segments(segments) - segments = [Segment( - utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]), - utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]), - f'{data[segment[0]]} out of bound' - ) for segment in segments] + detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds)) last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) - return DetectionResult(cache, segments, last_detection_time) + return DetectionResult(cache, detected_segments, last_detection_time) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: if cache is None: @@ -186,6 +172,7 @@ class AnomalyDetector(ProcessingDetector): upper_bound = smoothed_data + confidence if segments is not None: + segments: List[AnomalyDetectorSegment] = map(AnomalyDetectorSegment.from_json, segments) seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) assert seasonality > 0, \ f'{self.analytic_unit_id} got invalid seasonality {seasonality}' @@ -197,10 +184,8 @@ class AnomalyDetector(ProcessingDetector): for segment in segments: seasonality_index = seasonality // time_step # TODO: move it to utils and add tests - season_count = math.ceil(abs(segment['from'] - data_start_time) / seasonality) - start_seasonal_segment = segment['from'] + seasonality * season_count - seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step - segment_data = pd.Series(segment['data']) + seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step) + segment_data = pd.Series(segment.data) lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) @@ -277,3 +262,54 @@ class AnomalyDetector(ProcessingDetector): upper_bound = pd.Series(upper_bound, index = segment.index) lower_bound = pd.Series(lower_bound, index = segment.index) return upper_bound, lower_bound + + def get_seasonality_offset(self, from_timestamp: int, seasonality: int, data_start_time: int, time_step: int) -> int: + season_count = math.ceil(abs(from_timestamp - data_start_time) / seasonality) + start_seasonal_segment = from_timestamp + seasonality * season_count + seasonality_time_offset = abs(start_seasonal_segment - data_start_time) % seasonality + seasonality_offset = math.ceil(seasonality_time_offset / time_step) + return seasonality_offset + + def detections_generator( + self, + dataframe: pd.DataFrame, + upper_bound: pd.DataFrame, + lower_bound: pd.DataFrame, + enable_bounds: Bound + ) -> Generator[Segment, None, Segment]: + in_segment = False + segment_start = 0 + bound: Bound = None + for idx, val in enumerate(dataframe['value'].values): + if val > upper_bound.values[idx]: + if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL: + if not in_segment: + in_segment = True + segment_start = dataframe['timestamp'][idx] + bound = Bound.UPPER + continue + + if val < lower_bound.values[idx]: + if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL: + if not in_segment: + in_segment = True + segment_start = dataframe['timestamp'][idx] + bound = Bound.LOWER + continue + + if in_segment: + segment_end = dataframe['timestamp'][idx - 1] + yield Segment( + utils.convert_pd_timestamp_to_ms(segment_start), + utils.convert_pd_timestamp_to_ms(segment_end), + message=f'{val} out of {str(bound.value)} bound' + ) + in_segment = False + else: + if in_segment: + segment_end = dataframe['timestamp'][idx] + return Segment( + utils.convert_pd_timestamp_to_ms(segment_start), + utils.convert_pd_timestamp_to_ms(segment_end), + message=f'{val} out of {str(bound.value)} bound' + ) diff --git a/analytics/tests/test_dataset.py b/analytics/tests/test_dataset.py index 4bc7d98..d74f2bc 100644 --- a/analytics/tests/test_dataset.py +++ b/analytics/tests/test_dataset.py @@ -5,6 +5,7 @@ from utils import prepare_data import models import random import scipy.signal +from typing import List from analytic_types.segment import Segment @@ -372,11 +373,14 @@ if __name__ == '__main__': unittest.main() def create_dataframe(data_val: list) -> pd.DataFrame: - data_ind = [1523889000000 + i for i in range(len(data_val))] + data_ind = create_list_of_timestamps(len(data_val)) data = {'timestamp': data_ind, 'value': data_val} dataframe = pd.DataFrame(data) dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms') return dataframe +def create_list_of_timestamps(length: int) -> List[int]: + return [1523889000000 + i for i in range(length)] + def create_random_model(window_size: int) -> list: return [random.randint(0, 100) for _ in range(window_size * 2 + 1)] diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 66a13f6..8916ceb 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -1,10 +1,11 @@ import unittest import pandas as pd -from detectors import pattern_detector, threshold_detector, anomaly_detector +from detectors import pattern_detector, threshold_detector, anomaly_detector, Bound from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.segment import Segment -from tests.test_dataset import create_dataframe +from tests.test_dataset import create_dataframe, create_list_of_timestamps +from utils import convert_pd_timestamp_to_ms class TestPatternDetector(unittest.TestCase): @@ -136,7 +137,7 @@ class TestAnomalyDetector(unittest.TestCase): 'alpha': 0.1, 'timeStep': 1, 'seasonality': 5, - 'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [1] }] + 'segments': [{ 'from': 1523889000001, 'to': 1523889000002,'data': [1] }] } detect_result: ProcessingResult = detector.process_data(dataframe, cache) expected_result = { @@ -163,3 +164,28 @@ class TestAnomalyDetector(unittest.TestCase): (1523889000008, 3.4343868900000007) ]} self.assertEqual(detect_result.to_json(), expected_result) + + def test_get_seasonality_offset(self): + detector = anomaly_detector.AnomalyDetector('test_id') + from_timestamp = 1573700973027 + seasonality = 3600000 + data_start_time = 1573698780000 + time_step = 30000 + detected_offset = detector.get_seasonality_offset(from_timestamp, seasonality, data_start_time, time_step) + expected_offset = 74 + self.assertEqual(detected_offset, expected_offset) + + def test_segment_generator(self): + detector = anomaly_detector.AnomalyDetector('test_id') + data = [1, 1, 5, 1, -4, 5, 5, 5, -3, 1] + timestamps = create_list_of_timestamps(len(data)) + dataframe = create_dataframe(data) + upper_bound = pd.Series([2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) + lower_bound = pd.Series([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + segments = list(detector.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds=Bound.ALL)) + + segments_borders = list(map(lambda s: [s.from_timestamp, s.to_timestamp], segments)) + self.assertEqual(segments_borders, [[timestamps[2], timestamps[2]], [timestamps[4], timestamps[8]]]) + +if __name__ == '__main__': + unittest.main()