From b1bd20f03008a0cb95a3af31dd20ed432702741a Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Fri, 26 Jul 2019 17:38:47 +0300 Subject: [PATCH] Threshold line for HSR #727 (#729) --- .../analytic_types/detector_typing.py | 11 -------- .../analytics/detectors/anomaly_detector.py | 26 ++++--------------- analytics/analytics/detectors/detector.py | 16 +++++++++++- .../analytics/detectors/threshold_detector.py | 26 ++++++++++++++++--- .../src/controllers/analytics_controller.ts | 22 ++++++++++------ 5 files changed, 57 insertions(+), 44 deletions(-) diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index 04b4fd6..5e3fd35 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -32,17 +32,6 @@ class DetectionResult: @utils.meta.JSONClass class ProcessingResult(): - def __init__( - self, - data: Optional[TimeSeries] = None - ): - if data is None: - data = [] - self.data = data - -@utils.meta.JSONClass -class AnomalyProcessingResult(): - def __init__( self, lower_bound: Optional[TimeSeries] = None, diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 04d7d5a..7ab342d 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -6,7 +6,7 @@ import math from typing import Optional, Union, List, Tuple from analytic_types import AnalyticUnitId, ModelCache -from analytic_types.detector_typing import DetectionResult, AnomalyProcessingResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.data_bucket import DataBucket from analytic_types.segment import Segment from detectors import Detector, ProcessingDetector @@ -173,7 +173,7 @@ class AnomalyDetector(ProcessingDetector): return result # TODO: remove duplication with detect() - def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult: + def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult: segments = self.get_value_from_cache(cache, 'segments') alpha = self.get_value_from_cache(cache, 'alpha', required = True) confidence = self.get_value_from_cache(cache, 'confidence', required = True) @@ -212,11 +212,11 @@ class AnomalyDetector(ProcessingDetector): upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist())) if enable_bounds == Bound.ALL: - return AnomalyProcessingResult(lower_bound_timeseries, upper_bound_timeseries) + return ProcessingResult(lower_bound_timeseries, upper_bound_timeseries) elif enable_bounds == Bound.UPPER: - return AnomalyProcessingResult(upper_bound = upper_bound_timeseries) + return ProcessingResult(upper_bound = upper_bound_timeseries) elif enable_bounds == Bound.LOWER: - return AnomalyProcessingResult(lower_bound = lower_bound_timeseries) + return ProcessingResult(lower_bound = lower_bound_timeseries) def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series: #data - smoothed data to which seasonality will be added @@ -239,22 +239,6 @@ class AnomalyDetector(ProcessingDetector): return data[:len_smoothed_data] - def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]: - if len(processing_results) == 0: - return None - - united_result = AnomalyProcessingResult() - for result in processing_results: - if result.lower_bound is not None: - if united_result.lower_bound is None: united_result.lower_bound = [] - united_result.lower_bound.extend(result.lower_bound) - - if result.upper_bound is not None: - if united_result.upper_bound is None: united_result.upper_bound = [] - united_result.upper_bound.extend(result.upper_bound) - - return united_result - def get_bounds_for_segment(self, segment: pd.Series) -> Tuple[pd.Series, pd.Series]: ''' segment is divided by the median to determine its top and bottom parts diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 3920fda..8521890 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -53,14 +53,28 @@ class ProcessingDetector(Detector): @abstractmethod def process_data(self, data: TimeSeries, cache: Optional[ModelCache]) -> ProcessingResult: + ''' + Data processing to receive additional time series that represents detector's settings + ''' pass def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]: + ''' + Concatenate sequential ProcessingResults that received via + splitting dataset to chunks in analytic worker + ''' + if len(processing_results) == 0: return None united_result = ProcessingResult() for result in processing_results: - united_result.data.extend(result.data) + if result.lower_bound is not None: + if united_result.lower_bound is None: united_result.lower_bound = [] + united_result.lower_bound.extend(result.lower_bound) + + if result.upper_bound is not None: + if united_result.upper_bound is None: united_result.upper_bound = [] + united_result.upper_bound.extend(result.upper_bound) return united_result diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 979e7f1..aa8c67b 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -6,9 +6,9 @@ import numpy as np from typing import Optional, List from analytic_types import ModelCache, AnalyticUnitId -from analytic_types.detector_typing import DetectionResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.segment import Segment -from detectors import Detector +from detectors import ProcessingDetector from time import time import utils @@ -16,7 +16,7 @@ import utils logger = log.getLogger('THRESHOLD_DETECTOR') -class ThresholdDetector(Detector): +class ThresholdDetector(ProcessingDetector): WINDOW_SIZE = 3 @@ -89,3 +89,23 @@ class ThresholdDetector(Detector): result.cache = detection.cache result.segments = utils.merge_intersecting_segments(result.segments, time_step) return result + + def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult: + data = dataframe['value'] + value = self.get_value_from_cache(cache, 'value', required = True) + condition = self.get_value_from_cache(cache, 'condition', required = True) + + if condition == 'NO_DATA': + return ProcessingResult() + + data.values[:] = value + timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) + result_series = list(zip(timestamps, data.values.tolist())) + + if condition in ['>', '>=', '=']: + return ProcessingResult(upper_bound = result_series) + + if condition in ['<', '<=']: + return ProcessingResult(lower_bound = result_series) + + raise ValueError(f'{condition} condition not supported') diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index ee3c482..e683169 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -30,6 +30,12 @@ type TableTimeSeries = { values: [number, number][], columns: string[] }; type TimeRange = { from: number, to: number }; export type TaskResolver = (taskResult: TaskResult) => void; +type HSRResult = { + hsr: TableTimeSeries, + lowerBound?: TableTimeSeries + upperBound?: TableTimeSeries +} + const taskResolvers = new Map(); let analyticsService: AnalyticsService = undefined; @@ -631,10 +637,14 @@ export async function getHSR( try { const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY); + let resultSeries: HSRResult = { + hsr: data + } - if(analyticUnit.detectorType !== AnalyticUnit.DetectorType.ANOMALY) { - return { hsr: data }; + if(analyticUnit.detectorType === AnalyticUnit.DetectorType.PATTERN) { + return resultSeries; } + let cache = await AnalyticUnitCache.findById(analyticUnit.id); if( cache === null || @@ -662,16 +672,12 @@ export async function getHSR( throw new Error(`Data processing error: ${result.error}`); } - let resultSeries = { - hsr: data - } - if(result.payload.lowerBound !== undefined) { - resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns }; + resultSeries.lowerBound = { values: result.payload.lowerBound, columns: data.columns }; } if(result.payload.upperBound !== undefined) { - resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns }; + resultSeries.upperBound = { values: result.payload.upperBound, columns: data.columns }; } return resultSeries;