From 67f203bee66f140ff3488b660acb5148a8510e07 Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Mon, 24 Jun 2019 16:56:13 +0300 Subject: [PATCH] Anomaly detector option for disabling upper lower bound #701 (#703) --- .../analytic_types/detector_typing.py | 5 - .../analytics/detectors/anomaly_detector.py | 93 ++++++++++++------- .../src/controllers/analytics_controller.ts | 22 +++-- .../anomaly_analytic_unit_model.ts | 14 ++- 4 files changed, 86 insertions(+), 48 deletions(-) diff --git a/analytics/analytics/analytic_types/detector_typing.py b/analytics/analytics/analytic_types/detector_typing.py index 0c9e278..04b4fd6 100644 --- a/analytics/analytics/analytic_types/detector_typing.py +++ b/analytics/analytics/analytic_types/detector_typing.py @@ -48,10 +48,5 @@ class AnomalyProcessingResult(): lower_bound: Optional[TimeSeries] = None, upper_bound: Optional[TimeSeries] = None, ): - if lower_bound is None: - lower_bound = [] self.lower_bound = lower_bound - - if upper_bound is None: - upper_bound = [] self.upper_bound = upper_bound diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 05a86ae..5bf53a3 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -1,5 +1,8 @@ +from enum import Enum import logging import numpy as np +import operator +from collections import OrderedDict import pandas as pd import math from typing import Optional, Union, List, Tuple @@ -16,6 +19,10 @@ MIN_DEPENDENCY_FACTOR = 0.1 BASIC_ALPHA = 0.5 logger = logging.getLogger('ANOMALY_DETECTOR') +class Bound(Enum): + NONE = 'NONE' + UPPER = 'UPPER' + LOWER = 'LOWER' class AnomalyDetector(ProcessingDetector): @@ -25,13 +32,15 @@ class AnomalyDetector(ProcessingDetector): def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: segments = payload.get('segments') + disable_bound: str = payload.get('disableBound') or 'NONE' prepared_segments = [] time_step = utils.find_interval(dataframe) new_cache = { 'confidence': payload['confidence'], 'alpha': payload['alpha'], - 'timeStep': time_step + 'timeStep': time_step, + 'disableBound': disable_bound } if segments is not None: @@ -61,12 +70,17 @@ class AnomalyDetector(ProcessingDetector): data = dataframe['value'] time_step = cache['timeStep'] segments = cache.get('segments') + disable_bound: str = cache.get('disableBound') or 'NONE' smoothed_data = utils.exponential_smoothing(data, cache['alpha']) - # TODO: use class for cache to avoid using string literals - upper_bound = smoothed_data + cache['confidence'] - lower_bound = smoothed_data - cache['confidence'] + # TODO: use class for cache to avoid using string literals and Bound.TYPE.value + bounds = OrderedDict() + bounds[Bound.LOWER.value] = ( smoothed_data - cache['confidence'], operator.lt ) + bounds[Bound.UPPER.value] = ( smoothed_data + cache['confidence'], operator.gt ) + + if disable_bound != Bound.NONE.value: + del bounds[disable_bound] if segments is not None: @@ -84,21 +98,18 @@ class AnomalyDetector(ProcessingDetector): seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step #TODO: upper and lower bounds for segment_data segment_data = pd.Series(segment['data']) - upper_bound = self.add_season_to_data( - upper_bound, segment_data, seasonality_offset, seasonality_index, True - ) - lower_bound = self.add_season_to_data( - lower_bound, segment_data, seasonality_offset, seasonality_index, False - ) - assert len(smoothed_data) == len(upper_bound) == len(lower_bound), \ - f'len smoothed {len(smoothed_data)} != len seasonality {len(upper_bound)}' - - # TODO: use class for cache to avoid using string literals + for bound_type, bound_data in bounds.items(): + bound_data, _ = bound_data + bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type) + assert len(smoothed_data) == len(bounds[bound_type]), \ + f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}' anomaly_indexes = [] for idx, val in enumerate(data.values): - if val > upper_bound.values[idx] or val < lower_bound.values[idx]: - anomaly_indexes.append(data.index[idx]) + for bound_type, bound_data in bounds.items(): + bound_data, comparator = bound_data + if comparator(val, bound_data.values[idx]): + anomaly_indexes.append(data.index[idx]) # TODO: use Segment in utils segments = utils.close_filtering(anomaly_indexes, 1) segments = utils.get_start_and_end_of_segments(segments) @@ -163,11 +174,18 @@ class AnomalyDetector(ProcessingDetector): # TODO: ModelCache -> ModelState (don't use string literals) def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult: segments = cache.get('segments') + disable_bound: str = cache.get('disableBound') or 'NONE' # TODO: exponential_smoothing should return dataframe with related timestamps - smoothed = utils.exponential_smoothing(dataframe['value'], cache['alpha']) - upper_bound = smoothed + cache['confidence'] - lower_bound = smoothed - cache['confidence'] + smoothed_data = utils.exponential_smoothing(dataframe['value'], cache['alpha']) + + bounds = OrderedDict() + bounds[Bound.LOWER.value] = smoothed_data - cache['confidence'] + bounds[Bound.UPPER.value] = smoothed_data + cache['confidence'] + + if disable_bound != Bound.NONE.value: + del bounds[disable_bound] + # TODO: remove duplication with detect() @@ -186,24 +204,21 @@ class AnomalyDetector(ProcessingDetector): start_seasonal_segment = segment['from'] + seasonality * season_count seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step segment_data = pd.Series(segment['data']) - upper_bound = self.add_season_to_data( - upper_bound, segment_data, seasonality_offset, seasonality_index, True - ) - lower_bound = self.add_season_to_data( - lower_bound, segment_data, seasonality_offset, seasonality_index, False - ) - assert len(smoothed) == len(upper_bound) == len(lower_bound), \ - f'len smoothed {len(smoothed)} != len seasonality {len(upper_bound)}' + for bound_type, bound_data in bounds.items(): + bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type) + assert len(smoothed_data) == len(bounds[bound_type]), \ + f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}' # TODO: support multiple segments timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) - lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist())) - upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist())) - result = AnomalyProcessingResult(lower_bound_timeseries, upper_bound_timeseries) + result_bounds = {} + for bound_type, bound_data in bounds.items(): + result_bounds[bound_type] = list(zip(timestamps, bound_data.values.tolist())) + result = AnomalyProcessingResult(lower_bound=result_bounds.get(Bound.LOWER.value), upper_bound=result_bounds.get(Bound.UPPER.value)) return result - def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, addition: bool) -> pd.Series: + def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series: #data - smoothed data to which seasonality will be added #if addition == True -> segment is added #if addition == False -> segment is subtracted @@ -213,12 +228,15 @@ class AnomalyDetector(ProcessingDetector): #TODO: add seasonality for non empty parts continue if (idx - offset) % seasonality == 0: - if addition: + if bound_type == Bound.UPPER.value: upper_segment_bound = self.get_bounds_for_segment(segment)[0] data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0) - else: + elif bound_type == Bound.LOWER.value: lower_segment_bound = self.get_bounds_for_segment(segment)[1] data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0) + else: + raise ValueError(f'unknown {bound_type}') + return data[:len_smoothed_data] def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]: @@ -227,8 +245,13 @@ class AnomalyDetector(ProcessingDetector): united_result = AnomalyProcessingResult() for result in processing_results: - united_result.lower_bound.extend(result.lower_bound) - united_result.upper_bound.extend(result.upper_bound) + if result.lower_bound is not None: + if united_result.lower_bound is None: united_result.lower_bound = [] + united_result.lower_bound.extend(result.lower_bound) + + if result.upper_bound is not None: + if united_result.upper_bound is None: united_result.upper_bound = [] + united_result.upper_bound.extend(result.upper_bound) return united_result diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 40051ca..a54be11 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -286,7 +286,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number case AnalyticUnit.DetectorType.ANOMALY: taskPayload.anomaly = { alpha: (analyticUnit as AnomalyAnalyticUnit).alpha, - confidence: (analyticUnit as AnomalyAnalyticUnit).confidence + confidence: (analyticUnit as AnomalyAnalyticUnit).confidence, + disableBound: (analyticUnit as AnomalyAnalyticUnit).disableBound }; taskPayload.data = await getPayloadData(analyticUnit, from, to); @@ -669,11 +670,20 @@ export async function getHSR( if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { throw new Error(`Data processing error: ${result.error}`); } - return { - hsr: data, - lowerBound: { values: result.payload.lowerBound, columns: data.columns }, - upperBound: { values: result.payload.upperBound, columns: data.columns } - }; + + let resultSeries = { + hsr: data + } + + if(result.payload.lowerBound !== undefined) { + resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns }; + } + + if(result.payload.upperBound !== undefined) { + resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns }; + } + + return resultSeries; } catch (err) { const message = err.message || JSON.stringify(err); await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); diff --git a/server/src/models/analytic_units/anomaly_analytic_unit_model.ts b/server/src/models/analytic_units/anomaly_analytic_unit_model.ts index 5e74728..9f29fec 100644 --- a/server/src/models/analytic_units/anomaly_analytic_unit_model.ts +++ b/server/src/models/analytic_units/anomaly_analytic_unit_model.ts @@ -7,6 +7,12 @@ type SeasonalityPeriod = { unit: string, value: number } + +enum Bound { + NONE = 'NONE', + UPPER = 'UPPER', + LOWER = 'LOWER' +}; export class AnomalyAnalyticUnit extends AnalyticUnit { public learningAfterUpdateRequired = true; @@ -20,6 +26,7 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { public confidence: number, public seasonality: number, //seasonality in ms private seasonalityPeriod: SeasonalityPeriod, + public disableBound: Bound, metric?: Metric, alert?: boolean, id?: AnalyticUnitId, @@ -57,7 +64,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { alpha: this.alpha, confidence: this.confidence, seasonality: this.seasonality, - seasonalityPeriod: this.seasonalityPeriod + seasonalityPeriod: this.seasonalityPeriod, + disableBound: this.disableBound }; } @@ -68,7 +76,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { alpha: this.alpha, confidence: this.confidence, seasonality: this.seasonality, - seasonalityPeriod: this.seasonalityPeriod + seasonalityPeriod: this.seasonalityPeriod, + disableBound: this.disableBound }; } @@ -88,6 +97,7 @@ export class AnomalyAnalyticUnit extends AnalyticUnit { obj.confidence, obj.seasonality, obj.seasonalityPeriod, + obj.disableBound, metric, obj.alert, obj._id,