From 16b85632c5d71e0deedcc81f7bfcadcc6437cd54 Mon Sep 17 00:00:00 2001 From: amper43 Date: Tue, 30 Apr 2019 21:57:40 +0300 Subject: [PATCH] fix --- .../analytics/detectors/anomaly_detector.py | 40 +++++++++---------- server/src/models/analytic_unit_model.ts | 6 +++ 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index ec1b5dd..7cc4f42 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -6,13 +6,15 @@ from analytic_types.data_bucket import DataBucket from detectors import Detector from models import ModelCache import utils +from analytic_types import AnalyticUnitId logger = logging.getLogger('ANOMALY_DETECTOR') class AnomalyDetector(Detector): - def __init__(self, *args, **kwargs): + def __init__(self, analytic_unit_id: AnalyticUnitId): + self.analytic_unit_id = analytic_unit_id self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: @@ -51,35 +53,31 @@ class AnomalyDetector(Detector): } def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: - self.detect(data, cache) + if cache is None: + msg = f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}' + logging.debug(msg) + raise ValueError(msg) + data_without_nan = data.dropna() - def __smooth_data(self, dataframe: pd.DataFrame) -> List[Tuple[int, float]]: - ''' - smooth data using exponential smoothing/moving average/weighted_average - ''' - - def __get_confidence_window(self, smooth_data: pd.Series, condfidence: float) -> Tuple[pd.Series, pd.Series]: - ''' - build confidence interval above and below smoothed data - ''' + if len(data_without_nan) == 0: + return None - def __get_dependency_level(self, alpha: float) -> int: - ''' - get the number of values that will affect the next value - ''' + self.bucket.receive_data(data_without_nan) + + if len(self.bucket.data) >= self.get_window_size(cache): + self.detect(self.bucket, cache) - for level in range(1, 100): - if (1 - alpha) ** level < 0.1: - break - return level def get_window_size(self, cache: Optional[ModelCache]) -> int: if cache is None: raise ValueError('anomaly detector got None cache') - #TODO: calculate value based on `alpha` value from cache - return 1 + for level in range(1, 100): + if (1 - cache['alpha']) ** level < 0.1: + break + + return level def is_detection_intersected(self) -> bool: diff --git a/server/src/models/analytic_unit_model.ts b/server/src/models/analytic_unit_model.ts index 9b83d07..715437c 100644 --- a/server/src/models/analytic_unit_model.ts +++ b/server/src/models/analytic_unit_model.ts @@ -35,6 +35,12 @@ export const ANALYTIC_UNIT_TYPES = { name: 'Threshold', value: 'THRESHOLD' } + ], + anomaly: [ + { + name: 'Anomaly', + value: 'ANOMALY' + } ] };