Browse Source

AnomalyCache class (#809)

pull/1/head
Alexandr Velikiy 4 years ago committed by rozetko
parent
commit
bcd70981c7
  1. 38
      analytics/analytics/analytic_types/cache.py
  2. 6
      analytics/analytics/analytic_types/detector_typing.py
  3. 2
      analytics/analytics/detectors/__init__.py
  4. 137
      analytics/analytics/detectors/anomaly_detector.py
  5. 1
      analytics/analytics/utils/__init__.py
  6. 12
      analytics/analytics/utils/meta.py
  7. 10
      analytics/tests/test_detectors.py
  8. 7
      analytics/tests/test_utils.py

38
analytics/analytics/analytic_types/cache.py

@ -0,0 +1,38 @@
from typing import Optional, List, Dict
from analytic_types.segment import AnomalyDetectorSegment
from analytic_types.detector_typing import Bound
from utils.meta import JSONClass, SerializableList
@JSONClass
class AnomalyCache:
def __init__(
self,
alpha: float,
confidence: float,
enable_bounds: str,
seasonality: Optional[int] = None,
segments: Optional[List[Dict]] = None,
time_step: Optional[int] = None,
):
self.alpha = alpha
self.confidence = confidence
self.enable_bounds = enable_bounds
if seasonality != None and seasonality < 0:
raise ValueError(f'Can`t create AnomalyCache: got invalid seasonality {seasonality}')
self.seasonality = seasonality
self.time_step = time_step
if segments != None:
anomaly_segments = map(AnomalyDetectorSegment.from_json, segments)
self.segments = SerializableList(anomaly_segments)
else:
self.segments = []
def set_segments(self, segments: List[AnomalyDetectorSegment]):
if len(segments) > 0:
self.segments = SerializableList(segments)
def get_enabled_bounds(self) -> Bound:
#TODO: use class with to_json()
return Bound(self.enable_bounds)

6
analytics/analytics/analytic_types/detector_typing.py

@ -1,10 +1,16 @@
from analytic_types import ModelCache, TimeSeries
from analytic_types.segment import Segment
from enum import Enum
from typing import List, Optional, Tuple
import utils.meta
class Bound(Enum):
ALL = 'ALL'
UPPER = 'UPPER'
LOWER = 'LOWER'
class DetectionResult:
def __init__(

2
analytics/analytics/detectors/__init__.py

@ -1,4 +1,4 @@
from detectors.detector import Detector, ProcessingDetector
from detectors.pattern_detector import PatternDetector
from detectors.threshold_detector import ThresholdDetector
from detectors.anomaly_detector import AnomalyDetector, Bound
from detectors.anomaly_detector import AnomalyDetector

137
analytics/analytics/detectors/anomaly_detector.py

@ -6,9 +6,10 @@ import math
from typing import Optional, Union, List, Tuple, Generator
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from analytic_types.detector_typing import DetectionResult, ProcessingResult, Bound
from analytic_types.data_bucket import DataBucket
from analytic_types.segment import Segment, AnomalyDetectorSegment
from analytic_types.cache import AnomalyCache
from detectors import Detector, ProcessingDetector
import utils
@ -17,10 +18,6 @@ MIN_DEPENDENCY_FACTOR = 0.1
BASIC_ALPHA = 0.5
logger = logging.getLogger('ANOMALY_DETECTOR')
class Bound(Enum):
ALL = 'ALL'
UPPER = 'UPPER'
LOWER = 'LOWER'
class AnomalyDetector(ProcessingDetector):
@ -29,25 +26,15 @@ class AnomalyDetector(ProcessingDetector):
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
segments = payload.get('segments')
enable_bounds = Bound(payload.get('enableBounds') or 'ALL')
prepared_segments = []
time_step = utils.find_interval(dataframe)
new_cache = {
'confidence': payload['confidence'],
'alpha': payload['alpha'],
'timeStep': time_step,
'enableBounds': enable_bounds.value
}
cache = AnomalyCache.from_json(payload)
cache.time_step = utils.find_interval(dataframe)
segments = cache.segments
if segments is not None:
seasonality = payload.get('seasonality')
assert seasonality is not None and seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
parsed_segments = map(Segment.from_json, segments)
if len(segments) > 0:
seasonality = cache.seasonality
prepared_segments = []
for segment in parsed_segments:
for segment in segments:
segment_len = (int(segment.to_timestamp) - int(segment.from_timestamp))
assert segment_len <= seasonality, \
f'seasonality {seasonality} must be greater than segment length {segment_len}'
@ -55,17 +42,17 @@ class AnomalyDetector(ProcessingDetector):
from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.from_timestamp, unit='ms'))
to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment.to_timestamp, unit='ms'))
segment_data = dataframe[from_index : to_index]
prepared_segments.append({
'from': segment.from_timestamp,
'to': segment.to_timestamp,
'data': segment_data.value.tolist()
})
new_cache['seasonality'] = seasonality
new_cache['segments'] = prepared_segments
prepared_segments.append(
AnomalyDetectorSegment(
segment.from_timestamp,
segment.to_timestamp,
segment_data.value.tolist()
)
)
cache.set_segments(prepared_segments)
return {
'cache': new_cache
'cache': cache.to_json()
}
# TODO: ModelCache -> DetectorState
@ -74,41 +61,37 @@ class AnomalyDetector(ProcessingDetector):
raise f'Analytic unit {self.analytic_unit_id} got empty cache'
data = dataframe['value']
# TODO: use class for cache to avoid using string literals
alpha = self.get_value_from_cache(cache, 'alpha', required = True)
confidence = self.get_value_from_cache(cache, 'confidence', required = True)
segments = self.get_value_from_cache(cache, 'segments')
enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL')
smoothed_data = utils.exponential_smoothing(data, alpha)
lower_bound = smoothed_data - confidence
upper_bound = smoothed_data + confidence
cache = AnomalyCache.from_json(cache)
segments = cache.segments
enabled_bounds = cache.get_enabled_bounds()
if segments is not None:
parsed_segments = map(AnomalyDetectorSegment.from_json, segments)
smoothed_data = utils.exponential_smoothing(data, cache.alpha)
time_step = self.get_value_from_cache(cache, 'timeStep', required = True)
seasonality = self.get_value_from_cache(cache, 'seasonality', required = True)
assert seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
lower_bound = smoothed_data - cache.confidence
upper_bound = smoothed_data + cache.confidence
if len(segments) > 0:
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
for segment in parsed_segments:
seasonality_index = seasonality // time_step
seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step)
for segment in segments:
seasonality_index = cache.seasonality // cache.time_step
seasonality_offset = self.get_seasonality_offset(
segment.from_timestamp,
cache.seasonality,
data_start_time,
cache.time_step
)
segment_data = pd.Series(segment.data)
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
upper_bound = self.add_season_to_data(upper_bound, segment_data, seasonality_offset, seasonality_index, Bound.UPPER)
detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds))
detected_segments = list(self.detections_generator(dataframe, upper_bound, lower_bound, enabled_bounds))
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return DetectionResult(cache, detected_segments, last_detection_time)
return DetectionResult(cache.to_json(), detected_segments, last_detection_time)
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]:
if cache is None:
@ -138,14 +121,15 @@ class AnomalyDetector(ProcessingDetector):
if cache is None:
raise ValueError('anomaly detector got None cache')
cache = AnomalyCache.from_json(cache)
for level in range(1, MAX_DEPENDENCY_LEVEL):
if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR:
if (1 - cache.alpha) ** level < MIN_DEPENDENCY_FACTOR:
break
seasonality = 0
if cache.get('segments') is not None and cache['seasonality'] > 0:
seasonality = cache['seasonality'] // cache['timeStep']
if len(cache.segments) > 0:
seasonality = cache.seasonality // cache.time_step
return max(level, seasonality)
def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult:
@ -160,31 +144,28 @@ class AnomalyDetector(ProcessingDetector):
# TODO: remove duplication with detect()
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult:
segments = self.get_value_from_cache(cache, 'segments')
alpha = self.get_value_from_cache(cache, 'alpha', required = True)
confidence = self.get_value_from_cache(cache, 'confidence', required = True)
enable_bounds = Bound(self.get_value_from_cache(cache, 'enableBounds') or 'ALL')
cache = AnomalyCache.from_json(cache)
segments = cache.segments
enabled_bounds = cache.get_enabled_bounds()
# TODO: exponential_smoothing should return dataframe with related timestamps
smoothed_data = utils.exponential_smoothing(dataframe['value'], alpha)
lower_bound = smoothed_data - confidence
upper_bound = smoothed_data + confidence
smoothed_data = utils.exponential_smoothing(dataframe['value'], cache.alpha)
if segments is not None:
segments: List[AnomalyDetectorSegment] = map(AnomalyDetectorSegment.from_json, segments)
seasonality = self.get_value_from_cache(cache, 'seasonality', required = True)
assert seasonality > 0, \
f'{self.analytic_unit_id} got invalid seasonality {seasonality}'
lower_bound = smoothed_data - cache.confidence
upper_bound = smoothed_data + cache.confidence
if len(segments) > 0:
data_start_time = utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][0])
time_step = self.get_value_from_cache(cache, 'timeStep', required = True)
for segment in segments:
seasonality_index = seasonality // time_step
seasonality_index = cache.seasonality // cache.time_step
# TODO: move it to utils and add tests
seasonality_offset = self.get_seasonality_offset(segment.from_timestamp, seasonality, data_start_time, time_step)
seasonality_offset = self.get_seasonality_offset(
segment.from_timestamp,
cache.seasonality,
data_start_time,
cache.time_step
)
segment_data = pd.Series(segment.data)
lower_bound = self.add_season_to_data(lower_bound, segment_data, seasonality_offset, seasonality_index, Bound.LOWER)
@ -196,11 +177,11 @@ class AnomalyDetector(ProcessingDetector):
lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist()))
upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist()))
if enable_bounds == Bound.ALL:
if enabled_bounds == Bound.ALL:
return ProcessingResult(lower_bound_timeseries, upper_bound_timeseries)
elif enable_bounds == Bound.UPPER:
elif enabled_bounds == Bound.UPPER:
return ProcessingResult(upper_bound = upper_bound_timeseries)
elif enable_bounds == Bound.LOWER:
elif enabled_bounds == Bound.LOWER:
return ProcessingResult(lower_bound = lower_bound_timeseries)
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series:
@ -275,14 +256,14 @@ class AnomalyDetector(ProcessingDetector):
dataframe: pd.DataFrame,
upper_bound: pd.DataFrame,
lower_bound: pd.DataFrame,
enable_bounds: Bound
enabled_bounds: Bound
) -> Generator[Segment, None, Segment]:
in_segment = False
segment_start = 0
bound: Bound = None
for idx, val in enumerate(dataframe['value'].values):
if val > upper_bound.values[idx]:
if enable_bounds == Bound.UPPER or enable_bounds == Bound.ALL:
if enabled_bounds == Bound.UPPER or enabled_bounds == Bound.ALL:
if not in_segment:
in_segment = True
segment_start = dataframe['timestamp'][idx]
@ -290,7 +271,7 @@ class AnomalyDetector(ProcessingDetector):
continue
if val < lower_bound.values[idx]:
if enable_bounds == Bound.LOWER or enable_bounds == Bound.ALL:
if enabled_bounds == Bound.LOWER or enabled_bounds == Bound.ALL:
if not in_segment:
in_segment = True
segment_start = dataframe['timestamp'][idx]

1
analytics/analytics/utils/__init__.py

@ -1,3 +1,4 @@
from utils.common import *
from utils.time import *
from utils.dataframe import *
from utils.meta import *

12
analytics/analytics/utils/meta.py

@ -22,6 +22,12 @@ def is_field_private(field_name: str) -> Optional[str]:
m = re.match(r'_[^(__)]+__', field_name)
return m is not None
def serialize(obj):
if hasattr(obj, 'to_json') == True:
return obj.to_json()
else:
return obj
def inited_params(target_init):
target_params = signature(target_init).parameters.values()
if len(target_params) < 1:
@ -55,7 +61,7 @@ def JSONClass(target_class):
where all None - values and private fileds are skipped
"""
return {
underscore_to_camel(k): v for k, v in self.__dict__.items()
underscore_to_camel(k): serialize(v) for k, v in self.__dict__.items()
if v is not None and not is_field_private(k)
}
@ -69,3 +75,7 @@ def JSONClass(target_class):
target_class.to_json = to_json
target_class.from_json = from_json
return target_class
class SerializableList(list):
def to_json(self):
return list(map(lambda s: s.to_json(), self))

10
analytics/tests/test_detectors.py

@ -1,8 +1,8 @@
import unittest
import pandas as pd
from detectors import pattern_detector, threshold_detector, anomaly_detector, Bound
from analytic_types.detector_typing import DetectionResult, ProcessingResult
from detectors import pattern_detector, threshold_detector, anomaly_detector
from analytic_types.detector_typing import DetectionResult, ProcessingResult, Bound
from analytic_types.segment import Segment
from tests.test_dataset import create_dataframe, create_list_of_timestamps
from utils import convert_pd_timestamp_to_ms
@ -73,6 +73,7 @@ class TestAnomalyDetector(unittest.TestCase):
cache = {
'confidence': 2,
'alpha': 0.1,
'enableBounds': 'ALL',
'timeStep': 1
}
detector = anomaly_detector.AnomalyDetector('test_id')
@ -85,6 +86,7 @@ class TestAnomalyDetector(unittest.TestCase):
cache = {
'confidence': 2,
'alpha': 0.1,
'enableBounds': 'ALL',
'timeStep': 1,
'seasonality': 4,
'segments': [{ 'from': 1523889000001, 'to': 1523889000002, 'data': [10] }]
@ -103,6 +105,7 @@ class TestAnomalyDetector(unittest.TestCase):
cache = {
'confidence': 2,
'alpha': 0.1,
'enableBounds': 'ALL',
'timeStep': 1
}
detector = anomaly_detector.AnomalyDetector('test_id')
@ -135,6 +138,7 @@ class TestAnomalyDetector(unittest.TestCase):
cache = {
'confidence': 2,
'alpha': 0.1,
'enableBounds': 'ALL',
'timeStep': 1,
'seasonality': 5,
'segments': [{ 'from': 1523889000001, 'to': 1523889000002,'data': [1] }]
@ -182,7 +186,7 @@ class TestAnomalyDetector(unittest.TestCase):
dataframe = create_dataframe(data)
upper_bound = pd.Series([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
lower_bound = pd.Series([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
segments = list(detector.detections_generator(dataframe, upper_bound, lower_bound, enable_bounds=Bound.ALL))
segments = list(detector.detections_generator(dataframe, upper_bound, lower_bound, enabled_bounds=Bound.ALL))
segments_borders = list(map(lambda s: [s.from_timestamp, s.to_timestamp], segments))
self.assertEqual(segments_borders, [[timestamps[2], timestamps[2]], [timestamps[4], timestamps[8]]])

7
analytics/tests/test_utils.py

@ -362,5 +362,12 @@ class TestUtils(unittest.TestCase):
self.assertEqual(got.from_timestamp, expected[0])
self.assertEqual(got.to_timestamp, expected[1])
def test_serialize(self):
segment_list = [Segment(100,200)]
serialize_list = utils.meta.SerializableList(segment_list)
meta_result = utils.meta.serialize(serialize_list)
expected_result = [{ 'from': 100, 'to': 200 }]
self.assertEqual(meta_result, expected_result)
if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save