diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index 0c9e278..2291875 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -11,7 +11,8 @@ class DetectionResult: self, cache: Optional[ModelCache] = None, segments: Optional[List[Segment]] = None, - last_detection_time: int = None + last_detection_time: int = None, + time_step: int = None ): if cache is None: cache = {} @@ -20,13 +21,15 @@ class DetectionResult: self.cache = cache self.segments = segments self.last_detection_time = last_detection_time + self.time_step = time_step # TODO: use @utils.meta.JSONClass (now it can't serialize list of objects) def to_json(self): return { 'cache': self.cache, 'segments': list(map(lambda segment: segment.to_json(), self.segments)), - 'lastDetectionTime': self.last_detection_time + 'lastDetectionTime': self.last_detection_time, + 'timeStep': self.time_step } @utils.meta.JSONClass diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 90d2eb7..7ed0aa1 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -69,8 +69,7 @@ class AnalyticUnitWorker: if len(detections) == 0: raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results') - time_step = utils.find_interval(data) - detection_result = self._detector.concat_detection_results(detections, time_step) + detection_result = self._detector.concat_detection_results(detections) return detection_result.to_json() def cancel(self): @@ -92,8 +91,7 @@ class AnalyticUnitWorker: if len(detections) == 0: return None else: - time_step = utils.find_interval(data) - detection_result = self._detector.concat_detection_results(detections, time_step) + detection_result = self._detector.concat_detection_results(detections) return detection_result.to_json() async def process_data(self, data: list, cache: ModelCache) -> dict: diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index f7951ef..ed207dd 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -66,6 +66,7 @@ class AnomalyDetector(ProcessingDetector): if cache is not None: last_value = cache.get('last_value') + time_step = utils.find_interval(dataframe) smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value) # TODO: use class for cache to avoid using string literals @@ -115,7 +116,7 @@ class AnomalyDetector(ProcessingDetector): # TODO: ['lastValue'] -> .last_value cache['lastValue'] = smoothed_data.values[-1] - return DetectionResult(cache, segments, last_detection_time) + return DetectionResult(cache, segments, last_detection_time, time_step) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: if cache is None: @@ -131,7 +132,7 @@ class AnomalyDetector(ProcessingDetector): self.bucket.receive_data(data_without_nan) if len(self.bucket.data) >= self.get_window_size(cache): - return self.detect(self.bucket, cache) + return self.detect(self.bucket.data, cache) return None @@ -155,8 +156,9 @@ class AnomalyDetector(ProcessingDetector): seasonality = cache['seasonality'] // cache['timeStep'] return max(level, seasonality) - def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: + def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: result = DetectionResult() + time_step = detections[0].time_step for detection in detections: result.segments.extend(detection.segments) result.last_detection_time = detection.last_detection_time diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 40df09e..3a146b8 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -31,7 +31,7 @@ class Detector(ABC): def is_detection_intersected(self) -> bool: return True - def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: + def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: result = DetectionResult() for detection in detections: result.segments.extend(detection.segments) diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 406e075..74ffc29 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -13,7 +13,7 @@ from utils import convert_pd_timestamp_to_ms from analytic_types import AnalyticUnitId, ModelCache from analytic_types.detector_typing import DetectionResult from analytic_types.segment import Segment - +import utils logger = logging.getLogger('PATTERN_DETECTOR') @@ -77,13 +77,14 @@ class PatternDetector(Detector): logger.error(message) raise ValueError(message) + time_step = utils.find_interval(dataframe) detected = self.model.detect(dataframe, self.analytic_unit_id) segments = [Segment(segment[0], segment[1]) for segment in detected['segments']] new_cache = detected['cache'].to_json() last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) - return DetectionResult(new_cache, segments, last_detection_time) + return DetectionResult(new_cache, segments, last_detection_time, time_step) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 4edcd4b..74fa13d 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -38,6 +38,7 @@ class ThresholdDetector(Detector): value = cache['value'] condition = cache['condition'] + time_step = utils.find_interval(dataframe) segments = [] for index, row in dataframe.iterrows(): @@ -68,7 +69,7 @@ class ThresholdDetector(Detector): last_entry = dataframe.iloc[-1] last_detection_time = utils.convert_pd_timestamp_to_ms(last_entry['timestamp']) - return DetectionResult(cache, segments, last_detection_time) + return DetectionResult(cache, segments, last_detection_time, time_step) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: @@ -78,8 +79,9 @@ class ThresholdDetector(Detector): def get_window_size(self, cache: Optional[ModelCache]) -> int: return self.WINDOW_SIZE - def concat_detection_results(self, detections: List[DetectionResult], time_step: int) -> DetectionResult: + def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: result = DetectionResult() + time_step = detections[0].time_step for detection in detections: result.segments.extend(detection.segments) result.last_detection_time = detection.last_detection_time diff --git a/analytics/analytics/utils/common.py b/analytics/analytics/utils/common.py index ea8df3f..5bd8c4e 100644 --- a/analytics/analytics/utils/common.py +++ b/analytics/analytics/utils/common.py @@ -146,10 +146,11 @@ def merge_intersecting_segments(segments: List[Segment], time_step: int) -> List segments = [x for x in segments if x is not None] return segments -def find_interval(data: TimeSeries) -> int: - if len(data) < 2: +def find_interval(dataframe: pd.DataFrame) -> int: + if len(dataframe) < 2: raise ValueError('Can`t find interval: length of data must be at least 2') - return int(data[1][0] - data[0][0]) + delta = utils.convert_pd_timestamp_to_ms(dataframe.timestamp[1]) - utils.convert_pd_timestamp_to_ms(dataframe.timestamp[0]) + return delta def get_start_and_end_of_segments(segments: List[List[int]]) -> List[Tuple[int, int]]: '''