You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

79 lines
2.8 KiB

import logging
import pandas as pd
from typing import Optional, Union, List, Tuple
from analytic_types import AnalyticUnitId
from analytic_types.data_bucket import DataBucket
from detectors import Detector
from models import ModelCache
import utils
MAX_DEPENDENCY_LEVEL = 100
MIN_DEPENDENCY_FACTOR = 0.1
logger = logging.getLogger('ANOMALY_DETECTOR')
class AnomalyDetector(Detector):
def __init__(self, *args, **kwargs):
self.bucket = DataBucket()
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache:
return {
'cache': {
'confidence': payload['confidence'],
'alpha': payload['alpha']
}
}
def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
data = dataframe['value']
last_values = None
if cache is not None:
last_values = cache.get('last_values')
smothed_data = utils.exponential_smoothing(data, cache['alpha'])
upper_bound = smothed_data + cache['confidence']
lower_bound = smothed_data - cache['confidence']
anomaly_indexes = []
for idx, val in enumerate(data.values):
if val > upper_bound.values[idx] or val < lower_bound.values[idx]:
anomaly_indexes.append(data.index[idx])
segments = utils.close_filtering(anomaly_indexes, 1)
segments = utils.get_start_and_end_of_segments(segments)
segments = [(
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[0]]),
utils.convert_pd_timestamp_to_ms(dataframe['timestamp'][segment[1]]),
) for segment in segments]
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = utils.convert_pd_timestamp_to_ms(last_dataframe_time)
return {
'cache': cache,
'segments': segments,
'lastDetectionTime': last_detection_time
}
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
self.detect(data, cache)
def get_window_size(self, cache: Optional[ModelCache]) -> int:
'''
get the number of values that will affect the next value
'''
if cache is None:
raise ValueError('anomaly detector got None cache')
for level in range(1, MAX_DEPENDENCY_LEVEL):
if (1 - cache['alpha']) ** level < MIN_DEPENDENCY_FACTOR:
break
return level
def get_intersections(self, segments: List[dict]) -> List[dict]:
segments = [[segment['from'], segment['to']] for segment in segments]
segments = utils.merge_intersecting_intervals(segments)
segments = [{'from': segment[0], 'to': segment[1]} for segment in segments]
return segments