diff --git a/analytics/analytics/analytic_types/__init__.py b/analytics/analytics/analytic_types/__init__.py index d6cb715..e42da50 100644 --- a/analytics/analytics/analytic_types/__init__.py +++ b/analytics/analytics/analytic_types/__init__.py @@ -10,12 +10,15 @@ like PatternDetectionCache, then it should not be here. """ import pandas as pd -from typing import Union, List +from typing import Union, List, Tuple AnalyticUnitId = str ModelCache = dict +# TODO: explicit timestamp / value +TimeSeries = [List[Tuple[int, int]]] + """ Example: diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index df916f0..0c9e278 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -1,4 +1,4 @@ -from analytic_types import ModelCache +from analytic_types import ModelCache, TimeSeries from analytic_types.segment import Segment from typing import List, Optional, Tuple @@ -34,8 +34,24 @@ class ProcessingResult(): def __init__( self, - data: Optional[List[Tuple[int, int]]] = None + data: Optional[TimeSeries] = None ): if data is None: data = [] self.data = data + +@utils.meta.JSONClass +class AnomalyProcessingResult(): + + def __init__( + self, + lower_bound: Optional[TimeSeries] = None, + upper_bound: Optional[TimeSeries] = None, + ): + if lower_bound is None: + lower_bound = [] + self.lower_bound = lower_bound + + if upper_bound is None: + upper_bound = [] + self.upper_bound = upper_bound diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index c33635b..45f535c 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -9,7 +9,7 @@ import utils from utils import get_intersected_chunks, get_chunks, prepare_data from analytic_types import ModelCache -from analytic_types.detector_typing import DetectionResult, ProcessingResult +from analytic_types.detector_typing import DetectionResult logger = logging.getLogger('AnalyticUnitWorker') @@ -95,20 +95,22 @@ class AnalyticUnitWorker: return detection_result.to_json() async def process_data(self, data: list, cache: ModelCache) -> dict: - assert isinstance(self._detector, detectors.ProcessingDetector), f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data' + assert isinstance(self._detector, detectors.ProcessingDetector), \ + f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data' assert cache is not None, f'{self.analytic_unit_id} got empty cache for processing data' - processed_chunks: List[ProcessingResult] = [] + processed_chunks = [] window_size = self._detector.get_window_size(cache) for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): await asyncio.sleep(0) chunk_dataframe = prepare_data(chunk) - processed: ProcessingResult = self._detector.process_data(chunk_dataframe, cache) + processed = self._detector.process_data(chunk_dataframe, cache) if processed is not None: processed_chunks.append(processed) if len(processed_chunks) == 0: raise RuntimeError(f'process_data for {self.analytic_unit_id} got empty processing results') + # TODO: maybe we should process all chunks inside of detector? result = self._detector.concat_processing_results(processed_chunks) return result.to_json() diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 863ac8d..d232c13 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -4,7 +4,7 @@ import pandas as pd from typing import Optional, Union, List, Tuple from analytic_types import AnalyticUnitId, ModelCache -from analytic_types.detector_typing import DetectionResult, ProcessingResult +from analytic_types.detector_typing import DetectionResult, AnomalyProcessingResult from analytic_types.data_bucket import DataBucket from analytic_types.segment import Segment from detectors import Detector, ProcessingDetector @@ -167,13 +167,15 @@ class AnomalyDetector(ProcessingDetector): return result # TODO: ModelCache -> ModelState (don't use string literals) - def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> ProcessingResult: + def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult: segments = cache.get('segments') # TODO: exponential_smoothing should return dataframe with related timestamps smoothed = utils.exponential_smoothing(dataframe['value'], cache['alpha'], cache.get('lastValue')) # TODO: remove duplication with detect() + upper_bound = dataframe['value'] + cache['confidence'] + lower_bound = dataframe['value'] - cache['confidence'] if segments is not None: seasonality = cache.get('seasonality') assert seasonality is not None and seasonality > 0, \ @@ -201,8 +203,9 @@ class AnomalyDetector(ProcessingDetector): lower_bound = lower_seasonality_curve - cache['confidence'] timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) - smoothed_dataset = list(zip(timestamps, smoothed.values.tolist())) - result = ProcessingResult(smoothed_dataset) + upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist())) + lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist())) + result = AnomalyProcessingResult(upper_bound_timeseries, lower_bound_timeseries) return result def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, addition: bool) -> pd.Series: @@ -219,3 +222,14 @@ class AnomalyDetector(ProcessingDetector): else: data = data.add(pd.Series(segment.values * -1, index = segment.index + idx), fill_value = 0) return data[:len_smoothed_data] + + def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]: + if len(processing_results) == 0: + return None + + united_result = AnomalyProcessingResult() + for result in processing_results: + united_result.lower_bound.extend(result.lower_bound) + united_result.upper_bound.extend(result.upper_bound) + + return united_result diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 6329f4c..7673f91 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -23,6 +23,9 @@ const SECONDS_IN_MINUTE = 60; type TaskResult = any; type DetectionResult = any; +// TODO: move TableTimeSeries to grafana-datasource-kit +// TODO: TableTimeSeries is bad name +type TableTimeSeries = { values: [number, number][], columns: string[] }; // TODO: move type definitions somewhere type TimeRange = { from: number, to: number }; export type TaskResolver = (taskResult: TaskResult) => void; @@ -600,7 +603,15 @@ async function runDetectionOnExtendedSpan( return detection; } -export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: number, to: number) { +export async function getHSR( + analyticUnit: AnalyticUnit.AnalyticUnit, + from: number, + to: number +): Promise<{ + hsr: TableTimeSeries, + lowerBound?: TableTimeSeries, + upperBound?: TableTimeSeries +}> { try { const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY); @@ -619,7 +630,7 @@ export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: numb } cache = cache.data; - + const analyticUnitType = analyticUnit.type; const detector = analyticUnit.detectorType; const payload = { @@ -634,7 +645,11 @@ export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: numb if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { throw new Error(`Data processing error: ${result.error}`); } - return { hsr: data, smoothed: { values: result.payload.data, columns: data.columns } }; + return { + hsr: data, + lowerBound: { values: result.payload.lowerBound, columns: data.columns }, + upperBound: { values: result.payload.upperBound, columns: data.columns } + }; } catch (err) { const message = err.message || JSON.stringify(err); await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);