Browse Source

Anomaly detector option for disabling upper lower bound #701 (#703)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
67f203bee6
  1. 5
      analytics/analytics/analytic_types/detector_typing.py
  2. 87
      analytics/analytics/detectors/anomaly_detector.py
  3. 22
      server/src/controllers/analytics_controller.ts
  4. 14
      server/src/models/analytic_units/anomaly_analytic_unit_model.ts

5
analytics/analytics/analytic_types/detector_typing.py

@ -48,10 +48,5 @@ class AnomalyProcessingResult():
lower_bound: Optional[TimeSeries] = None, lower_bound: Optional[TimeSeries] = None,
upper_bound: Optional[TimeSeries] = None, upper_bound: Optional[TimeSeries] = None,
): ):
if lower_bound is None:
lower_bound = []
self.lower_bound = lower_bound self.lower_bound = lower_bound
if upper_bound is None:
upper_bound = []
self.upper_bound = upper_bound self.upper_bound = upper_bound

87
analytics/analytics/detectors/anomaly_detector.py

@ -1,5 +1,8 @@
from enum import Enum
import logging import logging
import numpy as np import numpy as np
import operator
from collections import OrderedDict
import pandas as pd import pandas as pd
import math import math
from typing import Optional, Union, List, Tuple from typing import Optional, Union, List, Tuple
@ -16,6 +19,10 @@ MIN_DEPENDENCY_FACTOR = 0.1
BASIC_ALPHA = 0.5 BASIC_ALPHA = 0.5
logger = logging.getLogger('ANOMALY_DETECTOR') logger = logging.getLogger('ANOMALY_DETECTOR')
class Bound(Enum):
NONE = 'NONE'
UPPER = 'UPPER'
LOWER = 'LOWER'
class AnomalyDetector(ProcessingDetector): class AnomalyDetector(ProcessingDetector):
@ -25,13 +32,15 @@ class AnomalyDetector(ProcessingDetector):
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
segments = payload.get('segments') segments = payload.get('segments')
disable_bound: str = payload.get('disableBound') or 'NONE'
prepared_segments = [] prepared_segments = []
time_step = utils.find_interval(dataframe) time_step = utils.find_interval(dataframe)
new_cache = { new_cache = {
'confidence': payload['confidence'], 'confidence': payload['confidence'],
'alpha': payload['alpha'], 'alpha': payload['alpha'],
'timeStep': time_step 'timeStep': time_step,
'disableBound': disable_bound
} }
if segments is not None: if segments is not None:
@ -61,12 +70,17 @@ class AnomalyDetector(ProcessingDetector):
data = dataframe['value'] data = dataframe['value']
time_step = cache['timeStep'] time_step = cache['timeStep']
segments = cache.get('segments') segments = cache.get('segments')
disable_bound: str = cache.get('disableBound') or 'NONE'
smoothed_data = utils.exponential_smoothing(data, cache['alpha']) smoothed_data = utils.exponential_smoothing(data, cache['alpha'])
# TODO: use class for cache to avoid using string literals # TODO: use class for cache to avoid using string literals and Bound.TYPE.value
upper_bound = smoothed_data + cache['confidence'] bounds = OrderedDict()
lower_bound = smoothed_data - cache['confidence'] bounds[Bound.LOWER.value] = ( smoothed_data - cache['confidence'], operator.lt )
bounds[Bound.UPPER.value] = ( smoothed_data + cache['confidence'], operator.gt )
if disable_bound != Bound.NONE.value:
del bounds[disable_bound]
if segments is not None: if segments is not None:
@ -84,20 +98,17 @@ class AnomalyDetector(ProcessingDetector):
seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step
#TODO: upper and lower bounds for segment_data #TODO: upper and lower bounds for segment_data
segment_data = pd.Series(segment['data']) segment_data = pd.Series(segment['data'])
upper_bound = self.add_season_to_data( for bound_type, bound_data in bounds.items():
upper_bound, segment_data, seasonality_offset, seasonality_index, True bound_data, _ = bound_data
) bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type)
lower_bound = self.add_season_to_data( assert len(smoothed_data) == len(bounds[bound_type]), \
lower_bound, segment_data, seasonality_offset, seasonality_index, False f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}'
)
assert len(smoothed_data) == len(upper_bound) == len(lower_bound), \
f'len smoothed {len(smoothed_data)} != len seasonality {len(upper_bound)}'
# TODO: use class for cache to avoid using string literals
anomaly_indexes = [] anomaly_indexes = []
for idx, val in enumerate(data.values): for idx, val in enumerate(data.values):
if val > upper_bound.values[idx] or val < lower_bound.values[idx]: for bound_type, bound_data in bounds.items():
bound_data, comparator = bound_data
if comparator(val, bound_data.values[idx]):
anomaly_indexes.append(data.index[idx]) anomaly_indexes.append(data.index[idx])
# TODO: use Segment in utils # TODO: use Segment in utils
segments = utils.close_filtering(anomaly_indexes, 1) segments = utils.close_filtering(anomaly_indexes, 1)
@ -163,11 +174,18 @@ class AnomalyDetector(ProcessingDetector):
# TODO: ModelCache -> ModelState (don't use string literals) # TODO: ModelCache -> ModelState (don't use string literals)
def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult: def process_data(self, dataframe: pd.DataFrame, cache: ModelCache) -> AnomalyProcessingResult:
segments = cache.get('segments') segments = cache.get('segments')
disable_bound: str = cache.get('disableBound') or 'NONE'
# TODO: exponential_smoothing should return dataframe with related timestamps # TODO: exponential_smoothing should return dataframe with related timestamps
smoothed = utils.exponential_smoothing(dataframe['value'], cache['alpha']) smoothed_data = utils.exponential_smoothing(dataframe['value'], cache['alpha'])
upper_bound = smoothed + cache['confidence']
lower_bound = smoothed - cache['confidence'] bounds = OrderedDict()
bounds[Bound.LOWER.value] = smoothed_data - cache['confidence']
bounds[Bound.UPPER.value] = smoothed_data + cache['confidence']
if disable_bound != Bound.NONE.value:
del bounds[disable_bound]
# TODO: remove duplication with detect() # TODO: remove duplication with detect()
@ -186,24 +204,21 @@ class AnomalyDetector(ProcessingDetector):
start_seasonal_segment = segment['from'] + seasonality * season_count start_seasonal_segment = segment['from'] + seasonality * season_count
seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step seasonality_offset = (abs(start_seasonal_segment - data_start_time) % seasonality) // time_step
segment_data = pd.Series(segment['data']) segment_data = pd.Series(segment['data'])
upper_bound = self.add_season_to_data( for bound_type, bound_data in bounds.items():
upper_bound, segment_data, seasonality_offset, seasonality_index, True bounds[bound_type] = self.add_season_to_data(bound_data, segment_data, seasonality_offset, seasonality_index, bound_type)
) assert len(smoothed_data) == len(bounds[bound_type]), \
lower_bound = self.add_season_to_data( f'len smoothed {len(smoothed_data)} != len seasonality {len(bounds[bound_type])}'
lower_bound, segment_data, seasonality_offset, seasonality_index, False
)
assert len(smoothed) == len(upper_bound) == len(lower_bound), \
f'len smoothed {len(smoothed)} != len seasonality {len(upper_bound)}'
# TODO: support multiple segments # TODO: support multiple segments
timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp) timestamps = utils.convert_series_to_timestamp_list(dataframe.timestamp)
lower_bound_timeseries = list(zip(timestamps, lower_bound.values.tolist())) result_bounds = {}
upper_bound_timeseries = list(zip(timestamps, upper_bound.values.tolist())) for bound_type, bound_data in bounds.items():
result = AnomalyProcessingResult(lower_bound_timeseries, upper_bound_timeseries) result_bounds[bound_type] = list(zip(timestamps, bound_data.values.tolist()))
result = AnomalyProcessingResult(lower_bound=result_bounds.get(Bound.LOWER.value), upper_bound=result_bounds.get(Bound.UPPER.value))
return result return result
def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, addition: bool) -> pd.Series: def add_season_to_data(self, data: pd.Series, segment: pd.Series, offset: int, seasonality: int, bound_type: Bound) -> pd.Series:
#data - smoothed data to which seasonality will be added #data - smoothed data to which seasonality will be added
#if addition == True -> segment is added #if addition == True -> segment is added
#if addition == False -> segment is subtracted #if addition == False -> segment is subtracted
@ -213,12 +228,15 @@ class AnomalyDetector(ProcessingDetector):
#TODO: add seasonality for non empty parts #TODO: add seasonality for non empty parts
continue continue
if (idx - offset) % seasonality == 0: if (idx - offset) % seasonality == 0:
if addition: if bound_type == Bound.UPPER.value:
upper_segment_bound = self.get_bounds_for_segment(segment)[0] upper_segment_bound = self.get_bounds_for_segment(segment)[0]
data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0) data = data.add(pd.Series(upper_segment_bound.values, index = segment.index + idx), fill_value = 0)
else: elif bound_type == Bound.LOWER.value:
lower_segment_bound = self.get_bounds_for_segment(segment)[1] lower_segment_bound = self.get_bounds_for_segment(segment)[1]
data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0) data = data.add(pd.Series(lower_segment_bound.values * -1, index = segment.index + idx), fill_value = 0)
else:
raise ValueError(f'unknown {bound_type}')
return data[:len_smoothed_data] return data[:len_smoothed_data]
def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]: def concat_processing_results(self, processing_results: List[AnomalyProcessingResult]) -> Optional[AnomalyProcessingResult]:
@ -227,7 +245,12 @@ class AnomalyDetector(ProcessingDetector):
united_result = AnomalyProcessingResult() united_result = AnomalyProcessingResult()
for result in processing_results: for result in processing_results:
if result.lower_bound is not None:
if united_result.lower_bound is None: united_result.lower_bound = []
united_result.lower_bound.extend(result.lower_bound) united_result.lower_bound.extend(result.lower_bound)
if result.upper_bound is not None:
if united_result.upper_bound is None: united_result.upper_bound = []
united_result.upper_bound.extend(result.upper_bound) united_result.upper_bound.extend(result.upper_bound)
return united_result return united_result

22
server/src/controllers/analytics_controller.ts

@ -286,7 +286,8 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId, from?: number
case AnalyticUnit.DetectorType.ANOMALY: case AnalyticUnit.DetectorType.ANOMALY:
taskPayload.anomaly = { taskPayload.anomaly = {
alpha: (analyticUnit as AnomalyAnalyticUnit).alpha, alpha: (analyticUnit as AnomalyAnalyticUnit).alpha,
confidence: (analyticUnit as AnomalyAnalyticUnit).confidence confidence: (analyticUnit as AnomalyAnalyticUnit).confidence,
disableBound: (analyticUnit as AnomalyAnalyticUnit).disableBound
}; };
taskPayload.data = await getPayloadData(analyticUnit, from, to); taskPayload.data = await getPayloadData(analyticUnit, from, to);
@ -669,11 +670,20 @@ export async function getHSR(
if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { if(result.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
throw new Error(`Data processing error: ${result.error}`); throw new Error(`Data processing error: ${result.error}`);
} }
return {
hsr: data, let resultSeries = {
lowerBound: { values: result.payload.lowerBound, columns: data.columns }, hsr: data
upperBound: { values: result.payload.upperBound, columns: data.columns } }
};
if(result.payload.lowerBound !== undefined) {
resultSeries['lowerBound'] = { values: result.payload.lowerBound, columns: data.columns };
}
if(result.payload.upperBound !== undefined) {
resultSeries['upperBound'] = { values: result.payload.upperBound, columns: data.columns };
}
return resultSeries;
} catch (err) { } catch (err) {
const message = err.message || JSON.stringify(err); const message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); await AnalyticUnit.setStatus(analyticUnit.id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);

14
server/src/models/analytic_units/anomaly_analytic_unit_model.ts

@ -7,6 +7,12 @@ type SeasonalityPeriod = {
unit: string, unit: string,
value: number value: number
} }
enum Bound {
NONE = 'NONE',
UPPER = 'UPPER',
LOWER = 'LOWER'
};
export class AnomalyAnalyticUnit extends AnalyticUnit { export class AnomalyAnalyticUnit extends AnalyticUnit {
public learningAfterUpdateRequired = true; public learningAfterUpdateRequired = true;
@ -20,6 +26,7 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
public confidence: number, public confidence: number,
public seasonality: number, //seasonality in ms public seasonality: number, //seasonality in ms
private seasonalityPeriod: SeasonalityPeriod, private seasonalityPeriod: SeasonalityPeriod,
public disableBound: Bound,
metric?: Metric, metric?: Metric,
alert?: boolean, alert?: boolean,
id?: AnalyticUnitId, id?: AnalyticUnitId,
@ -57,7 +64,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
alpha: this.alpha, alpha: this.alpha,
confidence: this.confidence, confidence: this.confidence,
seasonality: this.seasonality, seasonality: this.seasonality,
seasonalityPeriod: this.seasonalityPeriod seasonalityPeriod: this.seasonalityPeriod,
disableBound: this.disableBound
}; };
} }
@ -68,7 +76,8 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
alpha: this.alpha, alpha: this.alpha,
confidence: this.confidence, confidence: this.confidence,
seasonality: this.seasonality, seasonality: this.seasonality,
seasonalityPeriod: this.seasonalityPeriod seasonalityPeriod: this.seasonalityPeriod,
disableBound: this.disableBound
}; };
} }
@ -88,6 +97,7 @@ export class AnomalyAnalyticUnit extends AnalyticUnit {
obj.confidence, obj.confidence,
obj.seasonality, obj.seasonality,
obj.seasonalityPeriod, obj.seasonalityPeriod,
obj.disableBound,
metric, metric,
obj.alert, obj.alert,
obj._id, obj._id,

Loading…
Cancel
Save