Browse Source

Smoothing for anomaly detector #607 (#608)

pull/1/head
Alexandr Velikiy 6 years ago committed by Evgeny Smyshlyaev
parent
commit
d58f08c42d
  1. 2
      analytics/analytics/analytic_unit_manager.py
  2. 1
      analytics/analytics/detectors/__init__.py
  3. 78
      analytics/analytics/detectors/anomaly_detector.py
  4. 2
      analytics/analytics/detectors/detector.py

2
analytics/analytics/analytic_unit_manager.py

@ -19,6 +19,8 @@ def get_detector_by_type(
return detectors.PatternDetector(analytic_unit_type, analytic_unit_id) return detectors.PatternDetector(analytic_unit_type, analytic_unit_id)
elif detector_type == 'threshold': elif detector_type == 'threshold':
return detectors.ThresholdDetector() return detectors.ThresholdDetector()
elif detector_type == 'anomaly':
return detectors.AnomalyDetector()
raise ValueError('Unknown detector type "%s"' % detector_type) raise ValueError('Unknown detector type "%s"' % detector_type)

1
analytics/analytics/detectors/__init__.py

@ -1,3 +1,4 @@
from detectors.detector import Detector from detectors.detector import Detector
from detectors.pattern_detector import PatternDetector from detectors.pattern_detector import PatternDetector
from detectors.threshold_detector import ThresholdDetector from detectors.threshold_detector import ThresholdDetector
from detectors.anomaly_detector import AnomalyDetector

78
analytics/analytics/detectors/anomaly_detector.py

@ -0,0 +1,78 @@
import logging
import pandas as pd
from typing import Optional, Union, List, Tuple
from analytic_types.data_bucket import DataBucket
from detectors import Detector
from models import ModelCache
import utils
logger = logging.getLogger('ANOMALY_DETECTOR')
class AnomalyDetector(Detector):
def __init__(self, *args, **kwargs):
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
return {
'cache': {
'confidence': payload['confidence'],
'alpha': payload['alpha']
}
}
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
data = dataframe['value']
last_values = None
if cache is not None:
last_values = cache['last_values']
#TODO detection code here
smoth_data = utils.exponential_smoothing(data, cache['alpha'])
upper_bound = utils.exponential_smoothing(data + cache['confidence'], cache['alpha'])
lower_bound = utils.exponential_smoothing(data - cache['confidence'], cache['alpha'])
segemnts = []
for idx, val in enumerate(data.values):
if val > upper_bound[idx] or val < lower_bound[idx]:
segemnts.append(idx)
last_detection_time = dataframe['timestamp'][-1]
return {
'cache': cache,
'segments': segemnts,
'lastDetectionTime': last_detection_time
}
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
self.detect(data, cache)
def __smooth_data(self, dataframe: pd.DataFrame) -> List[Tuple[int, float]]:
'''
smooth data using exponential smoothing/moving average/weighted_average
'''
def __get_confidence_window(self, smooth_data: pd.Series, condfidence: float) -> Tuple[pd.Series, pd.Series]:
'''
build confidence interval above and below smoothed data
'''
def __get_dependency_level(self, alpha: float) -> int:
'''
get the number of values that will affect the next value
'''
for level in range(1, 100):
if (1 - alpha) ** level < 0.1:
break
return level
def get_window_size(self, cache: Optional[ModelCache]) -> int:
if cache is None:
raise ValueError('anomaly detector got None cache')
#TODO: calculate value based on `alpha` value from cache
return 1

2
analytics/analytics/detectors/detector.py

@ -14,7 +14,7 @@ class Detector(ABC):
pass pass
@abstractmethod @abstractmethod
async def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict:
pass pass
@abstractmethod @abstractmethod

Loading…
Cancel
Save