From f4a5f2031325f050fa3bb05db2d65804f3c7256e Mon Sep 17 00:00:00 2001 From: amper43 Date: Thu, 25 Jul 2019 19:04:01 +0300 Subject: [PATCH] add support of threshold line --- .../analytic_types/detector_typing.py | 8 ++ .../analytics/detectors/threshold_detector.py | 24 ++++- .../src/controllers/analytics_controller.ts | 99 ++++++++++++------- 3 files changed, 94 insertions(+), 37 deletions(-) diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index 04b4fd6..5f90e62 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -50,3 +50,11 @@ class AnomalyProcessingResult(): ): self.lower_bound = lower_bound self.upper_bound = upper_bound +@utils.meta.JSONClass +class ThresholdProcessingResult(): + + def __init__( + self, + threshold: Optional[TimeSeries] = None, + ): + self.threshold = threshold diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 1e3352f..e37392a 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -6,9 +6,9 @@ import numpy as np from typing import Optional, List from analytic_types import ModelCache -from analytic_types.detector_typing import DetectionResult +from analytic_types.detector_typing import DetectionResult, ThresholdProcessingResult from analytic_types.segment import Segment -from detectors import Detector +from detectors import ProcessingDetector from time import time import utils @@ -16,7 +16,7 @@ import utils logger = log.getLogger('THRESHOLD_DETECTOR') -class ThresholdDetector(Detector): +class ThresholdDetector(ProcessingDetector): WINDOW_SIZE = 3 @@ -89,3 +89,21 @@ class ThresholdDetector(Detector): result.cache = detection.cache result.segments = utils.merge_intersecting_segments(result.segments, time_step) return result + + def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ThresholdProcessingResult: + data = dataframe['value'] + value = cache['value'] + data.values[:] = value + timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) + result_series = list(zip(timestamps, data.values.tolist())) + return ThresholdProcessingResult(result_series) + + def concat_processing_results(self, processing_results: List[ThresholdProcessingResult]) -> Optional[ThresholdProcessingResult]: + if len(processing_results) == 0: + return None + + united_result = ThresholdProcessingResult([]) + for result in processing_results: + united_result.threshold.extend(result.threshold) + + return united_result diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index ee3c482..d644045 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -631,47 +631,78 @@ export async function getHSR( try { const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY); - - if(analyticUnit.detectorType !== AnalyticUnit.DetectorType.ANOMALY) { - return { hsr: data }; - } - let cache = await AnalyticUnitCache.findById(analyticUnit.id); - if( - cache === null || - cache.data.alpha !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).alpha || - cache.data.confidence !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).confidence - ) { - await runLearning(analyticUnit.id, from, to); - cache = await AnalyticUnitCache.findById(analyticUnit.id); + let resultSeries = { + hsr: data } - cache = cache.data; + if(analyticUnit.detectorType === AnalyticUnit.DetectorType.THRESHOLD) { + let cache = await AnalyticUnitCache.findById(analyticUnit.id); + if( + cache === null || + cache.data.alpha !== (analyticUnit as AnalyticUnit.ThresholdAnalyticUnit).value || + cache.data.confidence !== (analyticUnit as AnalyticUnit.ThresholdAnalyticUnit).condition + ) { + await runLearning(analyticUnit.id, from, to); + cache = await AnalyticUnitCache.findById(analyticUnit.id); + } - const analyticUnitType = analyticUnit.type; - const detector = analyticUnit.detectorType; - const payload = { - data: data.values, - analyticUnitType, - detector, - cache - }; - - const processingTask = new AnalyticsTask(analyticUnit.id, AnalyticsTaskType.PROCESS, payload); - const result = await runTask(processingTask); - if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { - throw new Error(`Data processing error: ${result.error}`); - } + cache = cache.data; - let resultSeries = { - hsr: data - } + const analyticUnitType = analyticUnit.type; + const detector = analyticUnit.detectorType; + const payload = { + data: data.values, + analyticUnitType, + detector, + cache + }; - if(result.payload.lowerBound !== undefined) { - resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns }; + const processingTask = new AnalyticsTask(analyticUnit.id, AnalyticsTaskType.PROCESS, payload); + const result = await runTask(processingTask); + if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { + throw new Error(`Data processing error: ${result.error}`); + } + + if(result.payload.threshold !== undefined) { + resultSeries['threshold'] = { values: result.payload.threshold, columns: data.columns }; + } } - if(result.payload.upperBound !== undefined) { - resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns }; + if(analyticUnit.detectorType === AnalyticUnit.DetectorType.ANOMALY) { + let cache = await AnalyticUnitCache.findById(analyticUnit.id); + if( + cache === null || + cache.data.alpha !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).alpha || + cache.data.confidence !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).confidence + ) { + await runLearning(analyticUnit.id, from, to); + cache = await AnalyticUnitCache.findById(analyticUnit.id); + } + + cache = cache.data; + + const analyticUnitType = analyticUnit.type; + const detector = analyticUnit.detectorType; + const payload = { + data: data.values, + analyticUnitType, + detector, + cache + }; + + const processingTask = new AnalyticsTask(analyticUnit.id, AnalyticsTaskType.PROCESS, payload); + const result = await runTask(processingTask); + if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { + throw new Error(`Data processing error: ${result.error}`); + } + + if(result.payload.lowerBound !== undefined) { + resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns }; + } + + if(result.payload.upperBound !== undefined) { + resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns }; + } } return resultSeries;