diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index b339035..df916f0 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -1,8 +1,9 @@ from analytic_types import ModelCache from analytic_types.segment import Segment -from typing import List, Optional +from typing import List, Optional, Tuple +import utils.meta class DetectionResult: @@ -27,3 +28,14 @@ class DetectionResult: 'segments': list(map(lambda segment: segment.to_json(), self.segments)), 'lastDetectionTime': self.last_detection_time } + +@utils.meta.JSONClass +class ProcessingResult(): + + def __init__( + self, + data: Optional[List[Tuple[int, int]]] = None + ): + if data is None: + data = [] + self.data = data diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index f4613e0..b1ba0be 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -19,7 +19,7 @@ def get_detector_by_type( elif detector_type == 'threshold': return detectors.ThresholdDetector() elif detector_type == 'anomaly': - return detectors.AnomalyDetector() + return detectors.AnomalyDetector(analytic_unit_id) raise ValueError('Unknown detector type "%s"' % detector_type) @@ -69,10 +69,14 @@ class AnalyticUnitManager: return await worker.do_train(payload['segments'], data, payload['cache']) elif 'threshold' in payload: return await worker.do_train(payload['threshold'], data, payload['cache']) + elif 'anomaly' in payload: + return await worker.do_train(payload['anomaly'], data, payload['cache']) else: raise ValueError('No segments or threshold in LEARN payload') elif task['type'] == 'DETECT': return await worker.do_detect(data, payload['cache']) + elif task['type'] == 'PROCESS': + return await worker.process_data(data, payload['cache']) raise ValueError('Unknown task type "%s"' % task['type']) diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index f0e838d..c33635b 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -2,14 +2,14 @@ import config import detectors import logging import pandas as pd -from typing import Optional, Union, Generator, List +from typing import Optional, Union, Generator, List, Tuple import concurrent.futures import asyncio import utils from utils import get_intersected_chunks, get_chunks, prepare_data from analytic_types import ModelCache -from analytic_types.detector_typing import DetectionResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult logger = logging.getLogger('AnalyticUnitWorker') @@ -52,41 +52,63 @@ class AnalyticUnitWorker: chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR - detection_result = DetectionResult() + detections: List[DetectionResult] = [] + chunks = [] + # XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size) + if self._detector.is_detection_intersected(): + chunks = get_intersected_chunks(data, chunk_intersection, chunk_size) + else: + chunks = get_chunks(data, chunk_size) - for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size): + for chunk in chunks: await asyncio.sleep(0) chunk_dataframe = prepare_data(chunk) - detected = self._detector.detect(chunk_dataframe, cache) - self.__append_detection_result(detection_result, detected) - detection_result.segments = self._detector.merge_segments(detection_result.segments) + detected: DetectionResult = self._detector.detect(chunk_dataframe, cache) + detections.append(detected) + + if len(detections) == 0: + raise RuntimeError(f'do_detect for {self.analytic_unit_id} got empty detection results') + + detection_result = self._detector.concat_detection_results(detections) return detection_result.to_json() def cancel(self): if self._training_future is not None: self._training_future.cancel() - async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: + async def consume_data(self, data: list, cache: Optional[ModelCache]) -> Optional[dict]: window_size = self._detector.get_window_size(cache) - detection_result = DetectionResult() + detections: List[DetectionResult] = [] for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): await asyncio.sleep(0) chunk_dataframe = prepare_data(chunk) detected = self._detector.consume_data(chunk_dataframe, cache) - self.__append_detection_result(detection_result, detected) - - detection_result.segments = self._detector.merge_segments(detection_result.segments) + if detected is not None: + detections.append(detected) - if detection_result.last_detection_time is None: + if len(detections) == 0: return None else: + detection_result = self._detector.concat_detection_results(detections) return detection_result.to_json() - # TODO: move result concatenation to Detectors - def __append_detection_result(self, detection_result: DetectionResult, new_chunk: DetectionResult): - if new_chunk is not None: - detection_result.cache = new_chunk.cache - detection_result.last_detection_time = new_chunk.last_detection_time - detection_result.segments.extend(new_chunk.segments) + async def process_data(self, data: list, cache: ModelCache) -> dict: + assert isinstance(self._detector, detectors.ProcessingDetector), f'{self.analytic_unit_id} detector is not ProcessingDetector, can`t process data' + assert cache is not None, f'{self.analytic_unit_id} got empty cache for processing data' + + processed_chunks: List[ProcessingResult] = [] + window_size = self._detector.get_window_size(cache) + for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): + await asyncio.sleep(0) + chunk_dataframe = prepare_data(chunk) + processed: ProcessingResult = self._detector.process_data(chunk_dataframe, cache) + if processed is not None: + processed_chunks.append(processed) + + if len(processed_chunks) == 0: + raise RuntimeError(f'process_data for {self.analytic_unit_id} got empty processing results') + + result = self._detector.concat_processing_results(processed_chunks) + return result.to_json() diff --git a/analytics/analytics/detectors/__init__.py b/analytics/analytics/detectors/__init__.py index 9dba17e..370f0f2 100644 --- a/analytics/analytics/detectors/__init__.py +++ b/analytics/analytics/detectors/__init__.py @@ -1,4 +1,4 @@ -from detectors.detector import Detector +from detectors.detector import Detector, ProcessingDetector from detectors.pattern_detector import PatternDetector from detectors.threshold_detector import ThresholdDetector from detectors.anomaly_detector import AnomalyDetector diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 1c7f7bf..32f0617 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -1,12 +1,13 @@ import logging +import numpy as np import pandas as pd from typing import Optional, Union, List, Tuple from analytic_types import AnalyticUnitId, ModelCache -from analytic_types.detector_typing import DetectionResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.data_bucket import DataBucket from analytic_types.segment import Segment -from detectors import Detector +from detectors import Detector, ProcessingDetector import utils MAX_DEPENDENCY_LEVEL = 100 @@ -14,9 +15,10 @@ MIN_DEPENDENCY_FACTOR = 0.1 logger = logging.getLogger('ANOMALY_DETECTOR') -class AnomalyDetector(Detector): +class AnomalyDetector(ProcessingDetector): - def __init__(self, *args, **kwargs): + def __init__(self, analytic_unit_id: AnalyticUnitId): + self.analytic_unit_id = analytic_unit_id self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: @@ -27,15 +29,16 @@ class AnomalyDetector(Detector): } } + # TODO: ModelCache -> ModelState def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> DetectionResult: data = dataframe['value'] - last_values = None + last_value = None if cache is not None: - last_values = cache.get('last_values') + last_value = cache.get('last_value') - smothed_data = utils.exponential_smoothing(data, cache['alpha']) - upper_bound = smothed_data + cache['confidence'] - lower_bound = smothed_data - cache['confidence'] + smoothed_data = utils.exponential_smoothing(data, cache['alpha'], last_value) + upper_bound = smoothed_data + cache['confidence'] + lower_bound = smoothed_data - cache['confidence'] anomaly_indexes = [] for idx, val in enumerate(data.values): @@ -48,13 +51,34 @@ class AnomalyDetector(Detector): utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]), utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]), ) for segment in segments] + last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time) + # TODO: ['lastValue'] -> .last_value + cache['lastValue'] = smoothed_data.values[-1] + return DetectionResult(cache, segments, last_detection_time) def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[DetectionResult]: - self.detect(data, cache) + if cache is None: + msg = f'consume_data got invalid cache {cache} for task {self.analytic_unit_id}' + logging.debug(msg) + raise ValueError(msg) + + data_without_nan = data.dropna() + + if len(data_without_nan) == 0: + return None + + self.bucket.receive_data(data_without_nan) + if len(self.bucket.data) >= self.get_window_size(cache): + return self.detect(self.bucket, cache) + + return None + + def is_detection_intersected(self) -> bool: + return False def get_window_size(self, cache: Optional[ModelCache]) -> int: ''' @@ -63,12 +87,30 @@ class AnomalyDetector(Detector): if cache is None: raise ValueError('anomaly detector got None cache') - + for level in range(1, MAX_DEPENDENCY_LEVEL): if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR: break return level + def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: + result = DetectionResult() + for detection in detections: + result.segments.extend(detection.segments) + result.last_detection_time = detection.last_detection_time + result.cache = detection.cache + result.segments = utils.merge_intersecting_segments(result.segments) + return result + + # TODO: ModelCache -> ModelState + def process_data(self, data: pd.DataFrame, cache: ModelCache) -> ProcessingResult: + # TODO: exponential_smoothing should return dataframe with related timestamps + smoothed = utils.exponential_smoothing(data['value'], cache['alpha'], cache.get('lastValue')) + timestamps = utils.convert_series_to_timestamp_list(data.timestamp) + smoothed_dataset = list(zip(timestamps, smoothed.values.tolist())) + result = ProcessingResult(smoothed_dataset) + return result + def merge_segments(self, segments: List[Segment]) -> List[Segment]: segments = utils.merge_intersecting_segments(segments) return segments diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 82034a9..e8a2344 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -3,7 +3,7 @@ from pandas import DataFrame from typing import Optional, Union, List from analytic_types import ModelCache -from analytic_types.detector_typing import DetectionResult +from analytic_types.detector_typing import DetectionResult, ProcessingResult from analytic_types.segment import Segment @@ -28,5 +28,32 @@ class Detector(ABC): def get_window_size(self, cache: Optional[ModelCache]) -> int: pass + def is_detection_intersected(self) -> bool: + return True + + def concat_detection_results(self, detections: List[DetectionResult]) -> DetectionResult: + result = DetectionResult() + for detection in detections: + result.segments.extend(detection.segments) + result.last_detection_time = detection.last_detection_time + result.cache = detection.cache + return result + def merge_segments(self, segments: List[Segment]) -> List[Segment]: return segments + +class ProcessingDetector(Detector): + + @abstractmethod + def process_data(self, data, cache: Optional[ModelCache]) -> ProcessingResult: + pass + + def concat_processing_results(self, processing_results: List[ProcessingResult]) -> Optional[ProcessingResult]: + if len(processing_results) == 0: + return None + + united_result = ProcessingResult() + for result in processing_results: + united_result.data.extend(result.data) + + return united_result diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 186ffa0..252ca01 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -44,7 +44,6 @@ class ThresholdDetector(Detector): current_value = row['value'] current_timestamp = convert_pd_timestamp_to_ms(row['timestamp']) segment = Segment(current_timestamp, current_timestamp) - # TODO: merge segments if pd.isnull(current_value): if condition == 'NO_DATA': diff --git a/analytics/analytics/utils/time.py b/analytics/analytics/utils/time.py index 958685a..14710b0 100644 --- a/analytics/analytics/utils/time.py +++ b/analytics/analytics/utils/time.py @@ -1,4 +1,5 @@ import pandas as pd +from typing import List def convert_sec_to_ms(sec) -> int: return int(sec) * 1000 @@ -6,3 +7,7 @@ def convert_sec_to_ms(sec) -> int: def convert_pd_timestamp_to_ms(timestamp: pd.Timestamp) -> int: # TODO: convert from nanoseconds to millisecond in a better way: not by dividing by 10^6 return int(timestamp.value) / 1000000 + +def convert_series_to_timestamp_list(series: pd.Series) -> List[int]: + timestamps = map(lambda value: convert_pd_timestamp_to_ms(value), series) + return list(timestamps) diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py index ea499b9..78acbdb 100644 --- a/analytics/tests/test_detectors.py +++ b/analytics/tests/test_detectors.py @@ -42,7 +42,7 @@ class TestAnomalyDetector(unittest.TestCase): 'confidence': 2, 'alpha': 0.1, } - detector = anomaly_detector.AnomalyDetector() + detector = anomaly_detector.AnomalyDetector('test_id') detect_result = detector.detect(dataframe, cache) result = [{ 'from': 1523889000005.0, 'to': 1523889000005.0 }] self.assertEqual(result, detect_result.to_json()['segments']) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 5db3654..65fb3db 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -17,6 +17,7 @@ import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafan import * as _ from 'lodash'; import { WebhookType } from '../services/notification_service'; +import { AnomalyAnalyticUnit } from '../models/analytic_units/anomaly_analytic_unit_model'; const SECONDS_IN_MINUTE = 60; @@ -105,7 +106,7 @@ export function terminate() { alertService.stopAlerting(); } -async function runTask(task: AnalyticsTask): Promise { +export async function runTask(task: AnalyticsTask): Promise { return new Promise((resolver: TaskResolver) => { taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult() analyticsService.sendTask(task); // we dont wait for result here @@ -126,7 +127,10 @@ async function getQueryRange( return getQueryRangeForLearningBySegments(segments); } - if(detectorType === AnalyticUnit.DetectorType.THRESHOLD) { + if( + detectorType === AnalyticUnit.DetectorType.THRESHOLD || + detectorType === AnalyticUnit.DetectorType.ANOMALY + ) { const now = Date.now(); return { from: now - 5 * SECONDS_IN_MINUTE * 1000, @@ -215,23 +219,34 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number const detector = analyticUnit.detectorType; let taskPayload: any = { detector, analyticUnitType, cache: oldCache }; - if(detector === AnalyticUnit.DetectorType.PATTERN) { - let segments = await Segment.findMany(id, { labeled: true }); - if(segments.length === 0) { - throw new Error('Need at least 1 labeled segment'); - } - - let segmentObjs = segments.map(s => s.toObject()); - - let deletedSegments = await Segment.findMany(id, { deleted: true }); - let deletedSegmentsObjs = deletedSegments.map(s => s.toObject()); - segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs); - taskPayload.segments = segmentObjs; - } else if(detector === AnalyticUnit.DetectorType.THRESHOLD) { - taskPayload.threshold = { - value: (analyticUnit as ThresholdAnalyticUnit).value, - condition: (analyticUnit as ThresholdAnalyticUnit).condition - }; + switch(detector) { + case AnalyticUnit.DetectorType.PATTERN: + let segments = await Segment.findMany(id, { labeled: true }); + if(segments.length === 0) { + throw new Error('Need at least 1 labeled segment'); + } + + let segmentObjs = segments.map(s => s.toObject()); + + let deletedSegments = await Segment.findMany(id, { deleted: true }); + let deletedSegmentsObjs = deletedSegments.map(s => s.toObject()); + segmentObjs = _.concat(segmentObjs, deletedSegmentsObjs); + taskPayload.segments = segmentObjs; + break; + case AnalyticUnit.DetectorType.THRESHOLD: + taskPayload.threshold = { + value: (analyticUnit as ThresholdAnalyticUnit).value, + condition: (analyticUnit as ThresholdAnalyticUnit).condition + }; + break; + case AnalyticUnit.DetectorType.ANOMALY: + taskPayload.anomaly = { + alpha: (analyticUnit as AnomalyAnalyticUnit).alpha, + confidence: (analyticUnit as AnomalyAnalyticUnit).confidence + }; + break; + default: + throw new Error(`Unknown type of detector: ${detector}`); } let range: TimeRange; @@ -558,3 +573,46 @@ async function runDetectionOnExtendedSpan( await Detection.insertSpan(detection); return detection; } + +export async function getHSR(analyticUnit: AnalyticUnit.AnalyticUnit, from: number, to: number) { + try { + const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); + const data = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY); + + if(analyticUnit.detectorType !== AnalyticUnit.DetectorType.ANOMALY) { + return data; + } else { + let cache = await AnalyticUnitCache.findById(analyticUnit.id); + if( + cache === null || + cache.data.alpha !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).alpha || + cache.data.confidence !== (analyticUnit as AnalyticUnit.AnomalyAnalyticUnit).confidence + ) { + await runLearning(analyticUnit.id, from, to); + cache = await AnalyticUnitCache.findById(analyticUnit.id); + } + + cache = cache.data; + + const analyticUnitType = analyticUnit.type; + const detector = analyticUnit.detectorType; + const payload = { + data: data.values, + analyticUnitType, + detector, + cache + }; + + const processingTask = new AnalyticsTask(analyticUnit.id, AnalyticsTaskType.PROCESS, payload); + const result = await runTask(processingTask); + if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { + throw new Error(`Data processing error: ${result.error}`); + } + return { values: result.payload.data, columns: data.columns } + } + } catch (err) { + const message = err.message || JSON.stringify(err); + await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); + throw new Error(message); + } +} diff --git a/server/src/models/analytic_units/anomaly_analytic_unit_model.ts b/server/src/models/analytic_units/anomaly_analytic_unit_model.ts index 1084660..0d66c2c 100644 --- a/server/src/models/analytic_units/anomaly_analytic_unit_model.ts +++ b/server/src/models/analytic_units/anomaly_analytic_unit_model.ts @@ -35,7 +35,7 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { error, labeledColor, deletedColor, - DetectorType.THRESHOLD, + DetectorType.ANOMALY, visible ); } diff --git a/server/src/models/analytic_units/index.ts b/server/src/models/analytic_units/index.ts index fe50170..cab3cd0 100644 --- a/server/src/models/analytic_units/index.ts +++ b/server/src/models/analytic_units/index.ts @@ -3,6 +3,7 @@ import { AnalyticUnitId, AnalyticUnitStatus, DetectorType, ANALYTIC_UNIT_TYPES } import { AnalyticUnit } from './analytic_unit_model'; import { PatternAnalyticUnit } from './pattern_analytic_unit_model'; import { ThresholdAnalyticUnit } from './threshold_analytic_unit_model'; +import { AnomalyAnalyticUnit } from './anomaly_analytic_unit_model'; import { findById, findMany, @@ -17,7 +18,7 @@ import { export { - AnalyticUnit, PatternAnalyticUnit, ThresholdAnalyticUnit, + AnalyticUnit, PatternAnalyticUnit, ThresholdAnalyticUnit, AnomalyAnalyticUnit, AnalyticUnitId, AnalyticUnitStatus, DetectorType, ANALYTIC_UNIT_TYPES, createAnalyticUnitFromObject, findById, findMany, diff --git a/server/src/models/analytics_task_model.ts b/server/src/models/analytics_task_model.ts index 4444561..ec47f21 100644 --- a/server/src/models/analytics_task_model.ts +++ b/server/src/models/analytics_task_model.ts @@ -10,7 +10,8 @@ export enum AnalyticsTaskType { LEARN = 'LEARN', DETECT = 'DETECT', CANCEL = 'CANCEL', - PUSH = 'PUSH' + PUSH = 'PUSH', + PROCESS = 'PROCESS' }; export class AnalyticsTask { diff --git a/server/src/routes/data_router.ts b/server/src/routes/data_router.ts index e0107c3..e900179 100644 --- a/server/src/routes/data_router.ts +++ b/server/src/routes/data_router.ts @@ -1,8 +1,5 @@ import * as AnalyticUnit from '../models/analytic_units'; -import { HASTIC_API_KEY } from '../config'; -import { getGrafanaUrl } from '../utils/grafana'; - -import { queryByMetric } from 'grafana-datasource-kit'; +import * as AnalyticsController from '../controllers/analytics_controller'; import * as Router from 'koa-router'; @@ -46,8 +43,7 @@ async function query(ctx: Router.IRouterContext) { throw new Error(`can't find analytic unit ${analyticUnitId}`); } - const grafanaUrl = getGrafanaUrl(analyticUnit.grafanaUrl); - const results = await queryByMetric(analyticUnit.metric, grafanaUrl, from, to, HASTIC_API_KEY); + const results = await AnalyticsController.getHSR(analyticUnit, from, to); ctx.response.body = { results }; } diff --git a/server/src/services/alert_service.ts b/server/src/services/alert_service.ts index 371383e..746429f 100644 --- a/server/src/services/alert_service.ts +++ b/server/src/services/alert_service.ts @@ -134,6 +134,7 @@ export class AlertService { alertsType[AnalyticUnit.DetectorType.THRESHOLD] = ThresholdAlert; alertsType[AnalyticUnit.DetectorType.PATTERN] = PatternAlert; + alertsType[AnalyticUnit.DetectorType.ANOMALY] = Alert; this._alerts[analyticUnit.id] = new alertsType[detector](analyticUnit); }