Browse Source

Segment seasonality offset #805 (#806)

pull/1/head
Alexandr Velikiy 5 years ago committed by rozetko
parent
commit
4d28344ae8
  1. 28
      analytics/analytics/analytic_types/segment.py
  2. 2
      analytics/analytics/detectors/__init__.py
  3. 110
      analytics/analytics/detectors/anomaly_detector.py
  4. 6
      analytics/tests/test_dataset.py
  5. 30
      analytics/tests/test_detectors.py

28
analytics/analytics/analytic_types/segment.py

@ -27,3 +27,31 @@ class Segment:
self.labeled = labeled self.labeled = labeled
self.deleted = deleted self.deleted = deleted
self.message = message self.message = message
@utils.meta.JSONClass
class AnomalyDetectorSegment(Segment):
'''
Used for segment manipulation instead of { 'from': ..., 'to': ..., 'data': ... } dict
'''
def __init__(
self,
from_timestamp: int,
to_timestamp: int,
data = [],
_id: Optional[str] = None,
analytic_unit_id: Optional[str] = None,
labeled: Optional[bool] = None,
deleted: Optional[bool] = None,
message: Optional[str] = None
):
super().__init__(
from_timestamp,
to_timestamp,
_id,
analytic_unit_id,
labeled,
deleted,
message
)
self.data = data

2
analytics/analytics/detectors/__init__.py

@ -1,4 +1,4 @@
from detectors.detector import Detector, ProcessingDetector from detectors.detector import Detector, ProcessingDetector
from detectors.pattern_detector import PatternDetector from detectors.pattern_detector import PatternDetector
from detectors.threshold_detector import ThresholdDetector from detectors.threshold_detector import ThresholdDetector
from detectors.anomaly_detector import AnomalyDetector from detectors.anomaly_detector import AnomalyDetector, Bound

110
analytics/analytics/detectors/anomaly_detector.py

@ -3,12 +3,12 @@ import logging
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import math import math
from typing import Optional, Union, List, Tuple from typing import Optional, Union, List, Tuple, Generator
from analytic_types import AnalyticUnitId, ModelCache from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.data_bucket import DataBucket from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment from analytic_types.segment import Segment, AnomalyDetectorSegment
from detectors import Detector, ProcessingDetector from detectors import Detector, ProcessingDetector
import utils import utils
@ -45,16 +45,21 @@ class AnomalyDetector(ProcessingDetector):
seasonality = payload.get('seasonality') seasonality = payload.get('seasonality')
assert seasonality is not None and seasonality > 0, \ assert seasonality is not None and seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}' f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
parsed_segments = map(Segment.from_json, segments)
for segment in segments: for segment in parsed_segments:
segment_len = (int(segment['to']) - int(segment['from'])) segment_len = (int(segment.to_timestamp) - int(segment.from_timestamp))
assert segment_len <= seasonality, \ assert segment_len <= seasonality, \
f'seasonality {seasonality} must be great then segment length {segment_len}' f'seasonality {seasonality} must be greater than segment length {segment_len}'
from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'], unit='ms')) from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.from_timestamp, unit='ms'))
to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'], unit='ms')) to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.to_timestamp, unit='ms'))
segment_data = dataframe[from_index : to_index] segment_data = dataframe[from_index : to_index]
prepared_segments.append({'from': segment['from'], 'data': segment_data.value.tolist()}) prepared_segments.append({
'from': segment.from_timestamp,
'to': segment.to_timestamp,
'data': segment_data.value.tolist()
})
new_cache['seasonality'] = seasonality new_cache['seasonality'] = seasonality
new_cache['segments'] = prepared_segments new_cache['segments'] = prepared_segments
@ -81,6 +86,7 @@ class AnomalyDetector(ProcessingDetector):
upper_bound = smoothed_data + confidence upper_bound = smoothed_data + confidence
if segments is not None: if segments is not None:
parsed_segments = map(AnomalyDetectorSegment.from_json, segments)
time_step = self.get_value_from_cache(cache, 'timeStep', required = True) time_step = self.get_value_from_cache(cache, 'timeStep', required = True)
seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) seasonality = self.get_value_from_cache(cache, 'seasonality', required = True)
@ -89,40 +95,20 @@ class AnomalyDetector(ProcessingDetector):
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0]) data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
for segment in segments: for segment in parsed_segments:
seasonality_index = seasonality // time_step seasonality_index = seasonality // time_step
season_count = math.ceil(abs(segment['from'] - data_start_time) / seasonality) seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step)
start_seasonal_segment = segment['from'] + seasonality * season_count segment_data = pd.Series(segment.data)
seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step
segment_data = pd.Series(segment['data'])
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
anomaly_indexes = [] detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds))
for idx, val in enumerate(data.values):
if val > upper_bound.values[idx]:
if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL:
anomaly_indexes.append(data.index[idx])
if val < lower_bound.values[idx]:
if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL:
anomaly_indexes.append(data.index[idx])
# TODO: use Segment in utils
segments = utils.close_filtering(anomaly_indexes, 1)
segments = utils.get_start_and_end_of_segments(segments)
segments = [Segment(
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]),
f'{data[segment[0]]} out of bound'
) for segment in segments]
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(cache, segments, last_detection_time) return DetectionResult(cache, detected_segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
if cache is None: if cache is None:
@ -186,6 +172,7 @@ class AnomalyDetector(ProcessingDetector):
upper_bound = smoothed_data + confidence upper_bound = smoothed_data + confidence
if segments is not None: if segments is not None:
segments: List[AnomalyDetectorSegment] = map(AnomalyDetectorSegment.from_json, segments)
seasonality = self.get_value_from_cache(cache, 'seasonality', required = True) seasonality = self.get_value_from_cache(cache, 'seasonality', required = True)
assert seasonality > 0, \ assert seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}' f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
@ -197,10 +184,8 @@ class AnomalyDetector(ProcessingDetector):
for segment in segments: for segment in segments:
seasonality_index = seasonality // time_step seasonality_index = seasonality // time_step
# TODO: move it to utils and add tests # TODO: move it to utils and add tests
season_count = math.ceil(abs(segment['from'] - data_start_time) / seasonality) seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step)
start_seasonal_segment = segment['from'] + seasonality * season_count segment_data = pd.Series(segment.data)
seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step
segment_data = pd.Series(segment['data'])
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER) lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER) upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
@ -277,3 +262,54 @@ class AnomalyDetector(ProcessingDetector):
upper_bound = pd.Series(upper_bound, index = segment.index) upper_bound = pd.Series(upper_bound, index = segment.index)
lower_bound = pd.Series(lower_bound, index = segment.index) lower_bound = pd.Series(lower_bound, index = segment.index)
return upper_bound, lower_bound return upper_bound, lower_bound
def get_seasonality_offset(self, from_timestamp: int, seasonality: int, data_start_time: int, time_step: int) -> int:
season_count = math.ceil(abs(from_timestamp - data_start_time) / seasonality)
start_seasonal_segment = from_timestamp + seasonality * season_count
seasonality_time_offset = abs(start_seasonal_segment - data_start_time) % seasonality
seasonality_offset = math.ceil(seasonality_time_offset / time_step)
return seasonality_offset
def detections_generator(
self,
dataframe: pd.DataFrame,
upper_bound: pd.DataFrame,
lower_bound: pd.DataFrame,
enable_bounds: Bound
) -> Generator[Segment, None, Segment]:
in_segment = False
segment_start = 0
bound: Bound = None
for idx, val in enumerate(dataframe['value'].values):
if val > upper_bound.values[idx]:
if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL:
if not in_segment:
in_segment = True
segment_start = dataframe['timestamp'][idx]
bound = Bound.UPPER
continue
if val < lower_bound.values[idx]:
if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL:
if not in_segment:
in_segment = True
segment_start = dataframe['timestamp'][idx]
bound = Bound.LOWER
continue
if in_segment:
segment_end = dataframe['timestamp'][idx - 1]
yield Segment(
utils.convert_pd_timestamp_to_ms(segment_start),
utils.convert_pd_timestamp_to_ms(segment_end),
message=f'{val} out of {str(bound.value)} bound'
)
in_segment = False
else:
if in_segment:
segment_end = dataframe['timestamp'][idx]
return Segment(
utils.convert_pd_timestamp_to_ms(segment_start),
utils.convert_pd_timestamp_to_ms(segment_end),
message=f'{val} out of {str(bound.value)} bound'
)

6
analytics/tests/test_dataset.py

@ -5,6 +5,7 @@ from utils import prepare_data
import models import models
import random import random
import scipy.signal import scipy.signal
from typing import List
from analytic_types.segment import Segment from analytic_types.segment import Segment
@ -372,11 +373,14 @@ if __name__ == '__main__':
unittest.main() unittest.main()
def create_dataframe(data_val: list) -> pd.DataFrame: def create_dataframe(data_val: list) -> pd.DataFrame:
data_ind = [1523889000000 + i for i in range(len(data_val))] data_ind = create_list_of_timestamps(len(data_val))
data = {'timestamp': data_ind, 'value': data_val} data = {'timestamp': data_ind, 'value': data_val}
dataframe = pd.DataFrame(data) dataframe = pd.DataFrame(data)
dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms') dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms')
return dataframe return dataframe
def create_list_of_timestamps(length: int) -> List[int]:
return [1523889000000 + i for i in range(length)]
def create_random_model(window_size: int) -> list: def create_random_model(window_size: int) -> list:
return [random.randint(0, 100) for _ in range(window_size * 2 + 1)] return [random.randint(0, 100) for _ in range(window_size * 2 + 1)]

30
analytics/tests/test_detectors.py

@ -1,10 +1,11 @@
import unittest import unittest
import pandas as pd import pandas as pd
from detectors import pattern_detector, threshold_detector, anomaly_detector from detectors import pattern_detector, threshold_detector, anomaly_detector, Bound
from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.segment import Segment from analytic_types.segment import Segment
from tests.test_dataset import create_dataframe from tests.test_dataset import create_dataframe, create_list_of_timestamps
from utils import convert_pd_timestamp_to_ms
class TestPatternDetector(unittest.TestCase): class TestPatternDetector(unittest.TestCase):
@ -163,3 +164,28 @@ class TestAnomalyDetector(unittest.TestCase):
(1523889000008, 3.4343868900000007) (1523889000008, 3.4343868900000007)
]} ]}
self.assertEqual(detect_result.to_json(), expected_result) self.assertEqual(detect_result.to_json(), expected_result)
def test_get_seasonality_offset(self):
detector = anomaly_detector.AnomalyDetector('test_id')
from_timestamp = 1573700973027
seasonality = 3600000
data_start_time = 1573698780000
time_step = 30000
detected_offset = detector.get_seasonality_offset(from_timestamp, seasonality, data_start_time, time_step)
expected_offset = 74
self.assertEqual(detected_offset, expected_offset)
def test_segment_generator(self):
detector = anomaly_detector.AnomalyDetector('test_id')
data = [1, 1, 5, 1, -4, 5, 5, 5, -3, 1]
timestamps = create_list_of_timestamps(len(data))
dataframe = create_dataframe(data)
upper_bound = pd.Series([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
lower_bound = pd.Series([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
segments = list(detector.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds=Bound.ALL))
segments_borders = list(map(lambda s: [s.from_timestamp, s.to_timestamp], segments))
self.assertEqual(segments_borders, [[timestamps[2], timestamps[2]], [timestamps[4], timestamps[8]]])
if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save