Browse Source

Refactor get_bounds_for_segment method (#843)

pull/1/head
Alexander Velikiy 4 years ago committed by GitHub
parent
commit
21658883a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      analytics/analytics/detectors/anomaly_detector.py
  2. 36
      analytics/tests/test_detectors.py

51
analytics/analytics/detectors/anomaly_detector.py

@ -4,6 +4,7 @@ import numpy as np
import pandas as pd
import math
from typing import Optional, Union, List, Tuple, Generator
import operator
from analytic_types import AnalyticUnitId, ModelCache
from analytic_types.detector_typing import DetectionResult, ProcessingResult, Bound
@ -195,54 +196,34 @@ class AnomalyDetector(ProcessingDetector):
continue
if (idx - offset) % seasonality == 0:
if bound_type == Bound.UPPER:
upper_segment_bound = self.get_bounds_for_segment(segment)[0]
upper_segment_bound = self.get_segment_bound(segment, Bound.UPPER)
data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0)
elif bound_type == Bound.LOWER:
lower_segment_bound = self.get_bounds_for_segment(segment)[1]
lower_segment_bound = self.get_segment_bound(segment, Bound.LOWER)
data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0)
else:
raise ValueError(f'unknown bound type: {bound_type.value}')
return data[:len_smoothed_data]
def get_bounds_for_segment(self, segment: pd.Series) -> Tuple[pd.Series, pd.Series]:
def get_segment_bound(self, segment: pd.Series, bound: Bound) -> pd.Series:
'''
segment is divided by the median to determine its top and bottom parts
parts are smoothed and raised so the segment is between them
segment is divided by the median to determine its top or bottom part
the part is smoothed and raised above the segment or put down below the segment
'''
if len(segment) < 2:
return segment, segment
return segment
comparison_operator = operator.gt if bound == Bound.UPPER else operator.le
segment = segment - segment.min()
segment_median = segment.median()
top_part = []
bottom_part = []
for val in segment.values:
if val > segment_median:
top_part.append(val)
bottom_part.append(segment_median)
else:
bottom_part.append(val)
top_part.append(segment_median)
top_part = pd.Series(top_part, index = segment.index)
bottom_part = pd.Series(bottom_part, index = segment.index)
smoothed_top_part = utils.exponential_smoothing(top_part, BASIC_ALPHA)
smoothed_bottom_part = utils.exponential_smoothing(bottom_part, BASIC_ALPHA)
top_difference = []
bottom_difference = []
for idx, val in enumerate(top_part):
top_difference.append(abs(val - smoothed_top_part[idx]))
bottom_difference.append(abs(bottom_part[idx] - smoothed_bottom_part[idx]))
max_diff_top = max(top_difference)
max_diff_bot = max(bottom_difference)
upper_bound = []
lower_bound = []
for val in smoothed_top_part.values:
upper_bound.append(val + max_diff_top)
for val in smoothed_bottom_part.values:
lower_bound.append(val + max_diff_bot)
upper_bound = pd.Series(upper_bound, index = segment.index)
lower_bound = pd.Series(lower_bound, index = segment.index)
return upper_bound, lower_bound
part = [val if comparison_operator(val, segment_median) else segment_median for val in segment.values]
part = pd.Series(part, index = segment.index)
smoothed_part = utils.exponential_smoothing(part, BASIC_ALPHA)
difference = [abs(x - y) for x, y in zip(part, smoothed_part)]
max_diff = max(difference)
bound = [val + max_diff for val in smoothed_part.values]
bound = pd.Series(bound, index = segment.index)
return bound
def get_seasonality_offset(self, from_timestamp: int, seasonality: int, data_start_time: int, time_step: int) -> int:
season_count = math.ceil(abs(from_timestamp - data_start_time) / seasonality)

36
analytics/tests/test_detectors.py

@ -211,7 +211,7 @@ class TestAnomalyDetector(unittest.TestCase):
result = [{ 'from': 1523889000010, 'to': 1523889000010 }]
self.assertEqual(result, detected_segments)
def test_get_bounds_for_segment(self):
def test_get_segment_bound(self):
detector = anomaly_detector.AnomalyDetector('test_id')
peak_segment = pd.Series([1,2,3,4,3,2,1])
trough_segment = pd.Series([4,3,2,1,2,3,4])
@ -223,39 +223,43 @@ class TestAnomalyDetector(unittest.TestCase):
'max_value': 3.5,
'min_value': 2.75
}
peak_detector_result = detector.get_bounds_for_segment(peak_segment)
trough_detector_result = detector.get_bounds_for_segment(trough_segment)
peak_detector_result_upper = detector.get_segment_bound(peak_segment, Bound.UPPER)
peak_detector_result_lower = detector.get_segment_bound(peak_segment, Bound.LOWER)
trough_detector_result_upper = detector.get_segment_bound(trough_segment, Bound.UPPER)
trough_detector_result_lower = detector.get_segment_bound(trough_segment, Bound.LOWER)
self.assertGreaterEqual(
max(peak_detector_result[0]),
max(peak_detector_result_upper),
expected_peak_segment_results['max_value']
)
self.assertLessEqual(
max(peak_detector_result[1]),
max(peak_detector_result_lower),
expected_peak_segment_results['min_value']
)
self.assertGreaterEqual(
max(trough_detector_result[0]),
max(trough_detector_result_upper),
expected_trough_segment_results['max_value']
)
self.assertLessEqual(
max(trough_detector_result[1]),
max(trough_detector_result_lower),
expected_trough_segment_results['min_value']
)
def test_get_bounds_for_segment_corner_cases(self):
def test_get_segment_bound_corner_cases(self):
detector = anomaly_detector.AnomalyDetector('test_id')
empty_segment = pd.Series([])
same_values_segment = pd.Series([2,2,2,2,2,2])
empty_detector_result = detector.get_bounds_for_segment(empty_segment)
same_values_detector_result = detector.get_bounds_for_segment(same_values_segment)
empty_detector_result_upper = detector.get_segment_bound(empty_segment, Bound.UPPER)
empty_detector_result_lower = detector.get_segment_bound(empty_segment, Bound.LOWER)
same_values_detector_result_upper = detector.get_segment_bound(same_values_segment, Bound.UPPER)
same_values_detector_result_lower = detector.get_segment_bound(same_values_segment, Bound.LOWER)
self.assertEqual(len(empty_detector_result[0]), 0)
self.assertEqual(len(empty_detector_result[1]), 0)
self.assertEqual(min(same_values_detector_result[0]), 0)
self.assertEqual(max(same_values_detector_result[0]), 0)
self.assertEqual(min(same_values_detector_result[1]), 0)
self.assertEqual(max(same_values_detector_result[1]), 0)
self.assertEqual(len(empty_detector_result_upper), 0)
self.assertEqual(len(empty_detector_result_lower), 0)
self.assertEqual(min(same_values_detector_result_upper), 0)
self.assertEqual(max(same_values_detector_result_upper), 0)
self.assertEqual(min(same_values_detector_result_lower), 0)
self.assertEqual(max(same_values_detector_result_lower), 0)
if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save