From 21658883a975c1783eee0419d2e862371ea8e4b6 Mon Sep 17 00:00:00 2001 From: Alexander Velikiy <39257464+VargBurz@users.noreply.github.com> Date: Fri, 14 Feb 2020 13:39:45 +0300 Subject: [PATCH] Refactor get_bounds_for_segment method (#843) --- .../analytics/detectors/anomaly_detector.py | 51 ++++++------------- analytics/tests/test_detectors.py | 36 +++++++------ 2 files changed, 36 insertions(+), 51 deletions(-) diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 2d8ec5a..21ee9b7 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd import math from typing import Optional, Union, List, Tuple, Generator +import operator from analytic_types import AnalyticUnitId, ModelCache from analytic_types.detector_typing import DetectionResult, ProcessingResult, Bound @@ -195,54 +196,34 @@ class AnomalyDetector(ProcessingDetector): continue if (idx - offset) % seasonality == 0: if bound_type == Bound.UPPER: - upper_segment_bound = self.get_bounds_for_segment(segment)[0] + upper_segment_bound = self.get_segment_bound(segment, Bound.UPPER) data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0) elif bound_type == Bound.LOWER: - lower_segment_bound = self.get_bounds_for_segment(segment)[1] + lower_segment_bound = self.get_segment_bound(segment, Bound.LOWER) data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0) else: raise ValueError(f'unknown bound type: {bound_type.value}') return data[:len_smoothed_data] - def get_bounds_for_segment(self, segment: pd.Series) -> Tuple[pd.Series, pd.Series]: + def get_segment_bound(self, segment: pd.Series, bound: Bound) -> pd.Series: ''' - segment is divided by the median to determine its top and bottom parts - parts are smoothed and raised so the segment is between them + segment is divided by the median to determine its top or bottom part + the part is smoothed and raised above the segment or put down below the segment ''' if len(segment) < 2: - return segment, segment + return segment + comparison_operator = operator.gt if bound == Bound.UPPER else operator.le segment = segment - segment.min() segment_median = segment.median() - top_part = [] - bottom_part = [] - for val in segment.values: - if val > segment_median: - top_part.append(val) - bottom_part.append(segment_median) - else: - bottom_part.append(val) - top_part.append(segment_median) - top_part = pd.Series(top_part, index = segment.index) - bottom_part = pd.Series(bottom_part, index = segment.index) - smoothed_top_part = utils.exponential_smoothing(top_part, BASIC_ALPHA) - smoothed_bottom_part = utils.exponential_smoothing(bottom_part, BASIC_ALPHA) - top_difference = [] - bottom_difference = [] - for idx, val in enumerate(top_part): - top_difference.append(abs(val - smoothed_top_part[idx])) - bottom_difference.append(abs(bottom_part[idx] - smoothed_bottom_part[idx])) - max_diff_top = max(top_difference) - max_diff_bot = max(bottom_difference) - upper_bound = [] - lower_bound = [] - for val in smoothed_top_part.values: - upper_bound.append(val + max_diff_top) - for val in smoothed_bottom_part.values: - lower_bound.append(val + max_diff_bot) - upper_bound = pd.Series(upper_bound, index = segment.index) - lower_bound = pd.Series(lower_bound, index = segment.index) - return upper_bound, lower_bound + part = [val if comparison_operator(val, segment_median) else segment_median for val in segment.values] + part = pd.Series(part, index = segment.index) + smoothed_part = utils.exponential_smoothing(part, BASIC_ALPHA) + difference = [abs(x - y) for x, y in zip(part, smoothed_part)] + max_diff = max(difference) + bound = [val + max_diff for val in smoothed_part.values] + bound = pd.Series(bound, index = segment.index) + return bound def get_seasonality_offset(self, from_timestamp: int, seasonality: int, data_start_time: int, time_step: int) -> int: season_count = math.ceil(abs(from_timestamp - data_start_time) / seasonality) diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index 573cc9c..e568dfe 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -211,7 +211,7 @@ class TestAnomalyDetector(unittest.TestCase): result = [{ 'from': 1523889000010, 'to': 1523889000010 }] self.assertEqual(result, detected_segments) - def test_get_bounds_for_segment(self): + def test_get_segment_bound(self): detector = anomaly_detector.AnomalyDetector('test_id') peak_segment = pd.Series([1,2,3,4,3,2,1]) trough_segment = pd.Series([4,3,2,1,2,3,4]) @@ -223,39 +223,43 @@ class TestAnomalyDetector(unittest.TestCase): 'max_value': 3.5, 'min_value': 2.75 } - peak_detector_result = detector.get_bounds_for_segment(peak_segment) - trough_detector_result = detector.get_bounds_for_segment(trough_segment) + peak_detector_result_upper = detector.get_segment_bound(peak_segment, Bound.UPPER) + peak_detector_result_lower = detector.get_segment_bound(peak_segment, Bound.LOWER) + trough_detector_result_upper = detector.get_segment_bound(trough_segment, Bound.UPPER) + trough_detector_result_lower = detector.get_segment_bound(trough_segment, Bound.LOWER) self.assertGreaterEqual( - max(peak_detector_result[0]), + max(peak_detector_result_upper), expected_peak_segment_results['max_value'] ) self.assertLessEqual( - max(peak_detector_result[1]), + max(peak_detector_result_lower), expected_peak_segment_results['min_value'] ) self.assertGreaterEqual( - max(trough_detector_result[0]), + max(trough_detector_result_upper), expected_trough_segment_results['max_value'] ) self.assertLessEqual( - max(trough_detector_result[1]), + max(trough_detector_result_lower), expected_trough_segment_results['min_value'] ) - def test_get_bounds_for_segment_corner_cases(self): + def test_get_segment_bound_corner_cases(self): detector = anomaly_detector.AnomalyDetector('test_id') empty_segment = pd.Series([]) same_values_segment = pd.Series([2,2,2,2,2,2]) - empty_detector_result = detector.get_bounds_for_segment(empty_segment) - same_values_detector_result = detector.get_bounds_for_segment(same_values_segment) + empty_detector_result_upper = detector.get_segment_bound(empty_segment, Bound.UPPER) + empty_detector_result_lower = detector.get_segment_bound(empty_segment, Bound.LOWER) + same_values_detector_result_upper = detector.get_segment_bound(same_values_segment, Bound.UPPER) + same_values_detector_result_lower = detector.get_segment_bound(same_values_segment, Bound.LOWER) - self.assertEqual(len(empty_detector_result[0]), 0) - self.assertEqual(len(empty_detector_result[1]), 0) - self.assertEqual(min(same_values_detector_result[0]), 0) - self.assertEqual(max(same_values_detector_result[0]), 0) - self.assertEqual(min(same_values_detector_result[1]), 0) - self.assertEqual(max(same_values_detector_result[1]), 0) + self.assertEqual(len(empty_detector_result_upper), 0) + self.assertEqual(len(empty_detector_result_lower), 0) + self.assertEqual(min(same_values_detector_result_upper), 0) + self.assertEqual(max(same_values_detector_result_upper), 0) + self.assertEqual(min(same_values_detector_result_lower), 0) + self.assertEqual(max(same_values_detector_result_lower), 0) if __name__ == '__main__': unittest.main()