Browse Source

Segment intersection in anomaly detector #615 (#616)

pull/1/head
Alexandr Velikiy 5 years ago committed by Evgeny Smyshlyaev
parent
commit
74d45bf4f4
  1. 8
      analytics/analytics/analytic_unit_worker.py
  2. 60
      analytics/analytics/detectors/anomaly_detector.py
  3. 6
      analytics/analytics/detectors/detector.py
  4. 3
      analytics/analytics/detectors/pattern_detector.py
  5. 5
      analytics/analytics/detectors/threshold_detector.py
  6. 38
      analytics/analytics/utils/common.py
  7. 20
      analytics/tests/test_detectors.py
  8. 86
      analytics/tests/test_utils.py

8
analytics/analytics/analytic_unit_worker.py

@ -2,11 +2,11 @@ import config
import detectors
import logging
import pandas as pd
from typing import Optional, Union, Generator
from typing import Optional, Union, Generator, List
from models import ModelCache
import concurrent.futures
import asyncio
import utils
from utils import get_intersected_chunks, get_chunks, prepare_data
@ -62,7 +62,7 @@ class AnalyticUnitWorker:
chunk_dataframe = prepare_data(chunk)
detected = self._detector.detect(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected)
detection_result['segments'] = self._detector.get_intersections(detection_result['segments'])
return detection_result
def cancel(self):
@ -84,6 +84,8 @@ class AnalyticUnitWorker:
chunk_dataframe = prepare_data(chunk)
detected = self._detector.consume_data(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected)
detection_result['segments'] = self._detector.get_intersections(detection_result['segments'])
if detection_result['lastDetectionTime'] is None:
return None

60
analytics/analytics/detectors/anomaly_detector.py

@ -2,11 +2,14 @@ import logging
import pandas as pd
from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId
from analytic_types.data_bucket import DataBucket
from detectors import Detector
from models import ModelCache
import utils
MAX_DEPENDENCY_LEVEL = 100
MIN_DEPENDENCY_FACTOR = 0.1
logger = logging.getLogger('ANOMALY_DETECTOR')
@ -27,22 +30,27 @@ class AnomalyDetector(Detector):
data = dataframe['value']
last_values = None
if cache is not None:
last_values = cache['last_values']
last_values = cache.get('last_values')
#TODO detection code here
smoth_data = utils.exponential_smoothing(data, cache['alpha'])
upper_bound = utils.exponential_smoothing(data + cache['confidence'], cache['alpha'])
lower_bound = utils.exponential_smoothing(data - cache['confidence'], cache['alpha'])
smothed_data = utils.exponential_smoothing(data, cache['alpha'])
upper_bound = smothed_data + cache['confidence']
lower_bound = smothed_data - cache['confidence']
segemnts = []
anomaly_indexes = []
for idx, val in enumerate(data.values):
if val > upper_bound[idx] or val < lower_bound[idx]:
segemnts.append(idx)
last_detection_time = dataframe['timestamp'][-1]
if val > upper_bound.values[idx] or val < lower_bound.values[idx]:
anomaly_indexes.append(data.index[idx])
segments = utils.close_filtering(anomaly_indexes, 1)
segments = utils.get_start_and_end_of_segments(segments)
segments = [(
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]),
) for segment in segments]
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return {
'cache': cache,
'segments': segemnts,
'segments': segments,
'lastDetectionTime': last_detection_time
}
@ -50,29 +58,21 @@ class AnomalyDetector(Detector):
self.detect(data, cache)
def __smooth_data(self, dataframe: pd.DataFrame) -> List[Tuple[int, float]]:
'''
smooth data using exponential smoothing/moving average/weighted_average
'''
def __get_confidence_window(self, smooth_data: pd.Series, condfidence: float) -> Tuple[pd.Series, pd.Series]:
'''
build confidence interval above and below smoothed data
'''
def __get_dependency_level(self, alpha: float) -> int:
def get_window_size(self, cache: Optional[ModelCache]) -> int:
'''
get the number of values that will affect the next value
'''
for level in range(1, 100):
if (1 - alpha) ** level < 0.1:
break
return level
def get_window_size(self, cache: Optional[ModelCache]) -> int:
if cache is None:
raise ValueError('anomaly detector got None cache')
for level in range(1, MAX_DEPENDENCY_LEVEL):
if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR:
break
return level
#TODO: calculate value based on `alpha` value from cache
return 1
def get_intersections(self, segments: List[dict]) -> List[dict]:
segments = [[segment['from'], segment['to']] for segment in segments]
segments = utils.merge_intersecting_intervals(segments)
segments = [{'from': segment[0], 'to': segment[1]} for segment in segments]
return segments

6
analytics/analytics/detectors/detector.py

@ -1,7 +1,7 @@
from models import ModelCache
from abc import ABC, abstractmethod
from pandas import DataFrame
from typing import Optional, Union
from typing import Optional, Union, List
class Detector(ABC):
@ -24,3 +24,7 @@ class Detector(ABC):
@abstractmethod
def get_window_size(self, cache: Optional[ModelCache]) -> int:
pass
@abstractmethod
def get_intersections(self, segments: List[dict]) -> List[dict]:
pass

3
analytics/analytics/detectors/pattern_detector.py

@ -128,3 +128,6 @@ class PatternDetector(Detector):
def get_window_size(self, cache: Optional[ModelCache]) -> int:
if cache is None: return self.DEFAULT_WINDOW_SIZE
return cache.get('windowSize', self.DEFAULT_WINDOW_SIZE)
def get_intersections(self, segments: List[dict]) -> List[dict]:
return segments

5
analytics/analytics/detectors/threshold_detector.py

@ -2,7 +2,7 @@ import logging as log
import pandas as pd
import numpy as np
from typing import Optional
from typing import Optional, List
from detectors import Detector
from models import ModelCache
@ -80,3 +80,6 @@ class ThresholdDetector(Detector):
def get_window_size(self, cache: Optional[ModelCache]) -> int:
return self.WINDOW_SIZE
def get_intersections(self, segments: List[dict]) -> List[dict]:
return segments

38
analytics/analytics/utils/common.py

@ -114,7 +114,7 @@ def get_same_length(patterns_list):
pat.extend(added_values)
return patterns_list
def close_filtering(pattern_list, win_size):
def close_filtering(pattern_list: List[int], win_size: int) -> List[Tuple[int, int]]:
if len(pattern_list) == 0:
return []
s = [[pattern_list[0]]]
@ -127,6 +127,42 @@ def close_filtering(pattern_list, win_size):
s.append([pattern_list[i]])
return s
def merge_intersecting_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
'''
At the entrance - list of intervals with start and end.
Find intersecting intervals in this list and merge it.
'''
if len(intervals) < 2:
return intervals
intervals = sorted(intervals)
last_couple = intervals[0]
for i in range(1,len(intervals)):
if intervals[i][0] <= last_couple[1]:
intervals[i][0] = min(last_couple[0], intervals[i][0])
intervals[i][1] = max(last_couple[1], intervals[i][1])
intervals[i-1] = []
last_couple = intervals[i]
intervals = [x for x in intervals if x != []]
return intervals
def get_start_and_end_of_segments(segments: List[List[int]]) -> List[Tuple[int, int]]:
'''
find start and end of segment: [1, 2, 3, 4] -> [1, 4]
if segment is 1 index - it will be doubled: [7] -> [7, 7]
'''
result = []
for segment in segments:
if len(segment) == 0:
continue
elif len(segment) > 1:
segment = [segment[0], segment[-1]]
else:
segment = [segment[0], segment[0]]
result.append(segment)
return result
def best_pattern(pattern_list: list, data: pd.Series, dir: str) -> list:
new_pattern_list = []
for val in pattern_list:

20
analytics/tests/test_detectors.py

@ -1,7 +1,7 @@
import unittest
import pandas as pd
from detectors import pattern_detector, threshold_detector
from detectors import pattern_detector, threshold_detector, anomaly_detector
class TestPatternDetector(unittest.TestCase):
@ -28,3 +28,21 @@ class TestThresholdDetector(unittest.TestCase):
with self.assertRaises(ValueError):
detector.detect([], {})
class TestAnomalyDetector(unittest.TestCase):
def test_dataframe(self):
data_val = [0, 1, 2, 1, 2, 10, 1, 2, 1]
data_ind = [1523889000000 + i for i in range(len(data_val))]
data = {'timestamp': data_ind, 'value': data_val}
dataframe = pd.DataFrame(data = data)
dataframe['timestamp'] = pd.to_datetime(dataframe['timestamp'], unit='ms')
cache = {
'confidence': 2,
'alpha': 0.1,
}
detector = anomaly_detector.AnomalyDetector()
detect_result = detector.detect(dataframe, cache)
result = [(1523889000005.0, 1523889000005.0)]
self.assertEqual(result, detect_result['segments'])

86
analytics/tests/test_utils.py

@ -270,7 +270,7 @@ class TestUtils(unittest.TestCase):
data = pd.Series([5,4,3,2,1,0,1,2,3])
result_list = [4, 5, 6]
self.assertIn(utils.get_end_of_segment(data, False), result_list)
def test_get_borders_of_peaks(self):
data = pd.Series([1,0,1,2,3,2,1,0,0,1,2,3,4,3,2,2,1,0,1,2,3,4,5,3,2,1,0])
pattern_center = [4, 12, 22]
@ -278,7 +278,7 @@ class TestUtils(unittest.TestCase):
confidence = 1.5
result = [(1, 7), (9, 15), (19, 25)]
self.assertEqual(utils.get_borders_of_peaks(pattern_center, data, ws, confidence), result)
def test_get_borders_of_peaks_for_trough(self):
data = pd.Series([4,4,5,5,3,1,3,5,5,6,3,2])
pattern_center = [5]
@ -287,5 +287,87 @@ class TestUtils(unittest.TestCase):
result = [(3, 7)]
self.assertEqual(utils.get_borders_of_peaks(pattern_center, data, ws, confidence, inverse = True), result)
def test_get_start_and_end_of_segments(self):
segments = [[1, 2, 3, 4], [5, 6, 7], [8], [], [12, 12]]
result = [[1, 4], [5, 7], [8, 8], [12, 12]]
utils_result = utils.get_start_and_end_of_segments(segments)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_get_start_and_end_of_segments_empty(self):
segments = []
result = []
utils_result = utils.get_start_and_end_of_segments(segments)
self.assertEqual(result, utils_result)
def test_merge_intersecting_intervals(self):
index = [[10, 20], [30, 40]]
result = [[10, 20], [30, 40]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_merge_intersecting_intervals_1(self):
index = [[10, 20], [13, 23], [15, 17], [20, 40]]
result = [[10, 40]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_merge_intersecting_intervals_empty(self):
index = []
result = []
utils_result = utils.merge_intersecting_intervals(index)
self.assertEqual(result, utils_result)
def test_merge_intersecting_intervals_one(self):
index = [[10, 20]]
result = [[10, 20]]
utils_result = utils.merge_intersecting_intervals(index)
self.assertEqual(result, utils_result)
def test_merge_intersecting_intervals_2(self):
index = [[10, 20], [13, 23], [25, 30], [35, 40]]
result = [[10, 23], [25, 30], [35, 40]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_merge_intersecting_intervals_3(self):
index = [[10, 50], [5, 40], [15, 25], [6, 50]]
result = [[5, 50]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_merge_intersecting_intervals_4(self):
index = [[5, 10], [10, 20], [25, 50]]
result = [[5, 20], [25, 50]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_merge_intersecting_intervals_5(self):
index = [[20, 40], [10, 15], [50, 60]]
result = [[10, 15], [20, 40], [50, 60]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
def test_merge_intersecting_intervals_6(self):
index = [[20, 40], [10, 20], [50, 60]]
result = [[10, 40], [50, 60]]
utils_result = utils.merge_intersecting_intervals(index)
for idx, val in enumerate(utils_result):
self.assertEqual(result[idx][0], val[0])
self.assertEqual(result[idx][1], val[1])
if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save