From 564ded152f515263562f182315dc5c34eddfc130 Mon Sep 17 00:00:00 2001 From: Alexandr Velikiy <39257464+VargBurz@users.noreply.github.com> Date: Thu, 16 May 2019 18:01:03 +0300 Subject: [PATCH] Merge threshold segments #624 (#646) --- .../analytics/analytic_types/__init__.py | 2 +- analytics/analytics/analytic_unit_worker.py | 10 +++--- .../analytics/detectors/anomaly_detector.py | 4 +-- analytics/analytics/detectors/detector.py | 2 +- .../analytics/detectors/threshold_detector.py | 15 ++++++-- analytics/analytics/utils/common.py | 10 ++++-- analytics/tests/test_utils.py | 34 +++++++++++++------ 7 files changed, 54 insertions(+), 23 deletions(-) diff --git a/analytics/analytics/analytic_types/__init__.py b/analytics/analytics/analytic_types/__init__.py index e42da50..5d85143 100644 --- a/analytics/analytics/analytic_types/__init__.py +++ b/analytics/analytics/analytic_types/__init__.py @@ -17,7 +17,7 @@ AnalyticUnitId = str ModelCache = dict # TODO: explicit timestamp / value -TimeSeries = [List[Tuple[int, int]]] +TimeSeries = List[Tuple[int, float]] """ Example: diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 45f535c..90d2eb7 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -8,7 +8,7 @@ import asyncio import utils from utils import get_intersected_chunks, get_chunks, prepare_data -from analytic_types import ModelCache +from analytic_types import ModelCache, TimeSeries from analytic_types.detector_typing import DetectionResult logger = logging.getLogger('AnalyticUnitWorker') @@ -69,14 +69,15 @@ class AnalyticUnitWorker: if len(detections) == 0: raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results') - detection_result = self._detector.concat_detection_results(detections) + time_step = utils.find_interval(data) + detection_result = self._detector.concat_detection_results(detections, time_step) return detection_result.to_json() def cancel(self): if self._training_future is not None: self._training_future.cancel() - async def consume_data(self, data: list, cache: Optional[ModelCache]) -> Optional[dict]: + async def consume_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> Optional[dict]: window_size = self._detector.get_window_size(cache) detections: List[DetectionResult] = [] @@ -91,7 +92,8 @@ class AnalyticUnitWorker: if len(detections) == 0: return None else: - detection_result = self._detector.concat_detection_results(detections) + time_step = utils.find_interval(data) + detection_result = self._detector.concat_detection_results(detections, time_step) return detection_result.to_json() async def process_data(self, data: list, cache: ModelCache) -> dict: diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 878c950..af4b8d6 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -155,13 +155,13 @@ class AnomalyDetector(ProcessingDetector): seasonality = cache['seasonality'] // cache['timeStep'] return max(level, seasonality) - def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: + def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: result = DetectionResult() for detection in detections: result.segments.extend(detection.segments) result.last_detection_time = detection.last_detection_time result.cache = detection.cache - result.segments = utils.merge_intersecting_segments(result.segments) + result.segments = utils.merge_intersecting_segments(result.segments, time_step) return result # TODO: ModelCache -> ModelState (don't use string literals) diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 3a146b8..40df09e 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -31,7 +31,7 @@ class Detector(ABC): def is_detection_intersected(self) -> bool: return True - def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: + def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: result = DetectionResult() for detection in detections: result.segments.extend(detection.segments) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 252ca01..4edcd4b 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -9,7 +9,7 @@ from analytic_types.detector_typing import DetectionResult from analytic_types.segment import Segment from detectors import Detector from time import time -from utils import convert_sec_to_ms, convert_pd_timestamp_to_ms +import utils logger = log.getLogger('THRESHOLD_DETECTOR') @@ -42,7 +42,7 @@ class ThresholdDetector(Detector): segments = [] for index, row in dataframe.iterrows(): current_value = row['value'] - current_timestamp = convert_pd_timestamp_to_ms(row['timestamp']) + current_timestamp = utils.convert_pd_timestamp_to_ms(row['timestamp']) segment = Segment(current_timestamp, current_timestamp) # TODO: merge segments if pd.isnull(current_value): @@ -67,7 +67,7 @@ class ThresholdDetector(Detector): segments.append(segment) last_entry = dataframe.iloc[-1] - last_detection_time = convert_pd_timestamp_to_ms(last_entry['timestamp']) + last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp']) return DetectionResult(cache, segments, last_detection_time) @@ -77,3 +77,12 @@ class ThresholdDetector(Detector): def get_window_size(self, cache: Optional[ModelCache]) -> int: return self.WINDOW_SIZE + + def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: + result = DetectionResult() + for detection in detections: + result.segments.extend(detection.segments) + result.last_detection_time = detection.last_detection_time + result.cache = detection.cache + result.segments = utils.merge_intersecting_segments(result.segments, time_step) + return result diff --git a/analytics/analytics/utils/common.py b/analytics/analytics/utils/common.py index 3979ae3..ea8df3f 100644 --- a/analytics/analytics/utils/common.py +++ b/analytics/analytics/utils/common.py @@ -11,6 +11,7 @@ import utils import logging from itertools import islice from collections import deque +from analytic_types import TimeSeries from analytic_types.segment import Segment SHIFT_FACTOR = 0.05 @@ -128,7 +129,7 @@ def close_filtering(pattern_list: List[int], win_size: int) -> List[Tuple[int, i s.append([pattern_list[i]]) return s -def merge_intersecting_segments(segments: List[Segment]) -> List[Segment]: +def merge_intersecting_segments(segments: List[Segment], time_step: int) -> List[Segment]: ''' Find intersecting segments in segments list and merge it. ''' @@ -137,7 +138,7 @@ def merge_intersecting_segments(segments: List[Segment]) -> List[Segment]: segments = sorted(segments, key = lambda segment: segment.from_timestamp) previous_segment = segments[0] for i in range(1, len(segments)): - if segments[i].from_timestamp <= previous_segment.to_timestamp: + if segments[i].from_timestamp <= previous_segment.to_timestamp + time_step: segments[i].from_timestamp = min(previous_segment.from_timestamp, segments[i].from_timestamp) segments[i].to_timestamp = max(previous_segment.to_timestamp, segments[i].to_timestamp) segments[i - 1] = None @@ -145,6 +146,11 @@ def merge_intersecting_segments(segments: List[Segment]) -> List[Segment]: segments = [x for x in segments if x is not None] return segments +def find_interval(data: TimeSeries) -> int: + if len(data) < 2: + raise ValueError('Can`t find interval: length of data must be at least 2') + return int(data[1][0] - data[0][0]) + def get_start_and_end_of_segments(segments: List[List[int]]) -> List[Tuple[int, int]]: ''' find start and end of segment: [1, 2, 3, 4] -> [1, 4] diff --git a/analytics/tests/test_utils.py b/analytics/tests/test_utils.py index 5fd7145..e1a81a4 100644 --- a/analytics/tests/test_utils.py +++ b/analytics/tests/test_utils.py @@ -306,44 +306,58 @@ class TestUtils(unittest.TestCase): test_cases = [ { 'index': [Segment(10, 20), Segment(30, 40)], - 'result': [[10, 20], [30, 40]] + 'result': [[10, 20], [30, 40]], + 'step': 0, }, { 'index': [Segment(10, 20), Segment(13, 23), Segment(15, 17), Segment(20, 40)], - 'result': [[10, 40]] + 'result': [[10, 40]], + 'step': 0, }, { 'index': [], - 'result': [] + 'result': [], + 'step': 0, }, { 'index': [Segment(10, 20)], - 'result': [[10, 20]] + 'result': [[10, 20]], + 'step': 0, }, { 'index': [Segment(10, 20), Segment(13, 23), Segment(25, 30), Segment(35, 40)], - 'result': [[10, 23], [25, 30], [35, 40]] + 'result': [[10, 23], [25, 30], [35, 40]], + 'step': 0, }, { 'index': [Segment(10, 50), Segment(5, 40), Segment(15, 25), Segment(6, 50)], - 'result': [[5, 50]] + 'result': [[5, 50]], + 'step': 0, }, { 'index': [Segment(5, 10), Segment(10, 20), Segment(25, 50)], - 'result': [[5, 20], [25, 50]] + 'result': [[5, 20], [25, 50]], + 'step': 0, }, { 'index': [Segment(20, 40), Segment(10, 15), Segment(50, 60)], - 'result': [[10, 15], [20, 40], [50, 60]] + 'result': [[10, 15], [20, 40], [50, 60]], + 'step': 0, }, { 'index': [Segment(20, 40), Segment(10, 20), Segment(50, 60)], - 'result': [[10, 40], [50, 60]] + 'result': [[10, 40], [50, 60]], + 'step': 0, + }, + { + 'index': [Segment(10, 10), Segment(20, 20), Segment(30, 30)], + 'result': [[10, 30]], + 'step': 10, }, ] for case in test_cases: - utils_result = utils.merge_intersecting_segments(case['index']) + utils_result = utils.merge_intersecting_segments(case['index'], case['step']) for got, expected in zip(utils_result, case['result']): self.assertEqual(got.from_timestamp, expected[0]) self.assertEqual(got.to_timestamp, expected[1])