|
|
|
@ -6,13 +6,15 @@ from analytic_types.data_bucket import DataBucket
|
|
|
|
|
from detectors import Detector |
|
|
|
|
from models import ModelCache |
|
|
|
|
import utils |
|
|
|
|
from analytic_types import AnalyticUnitId |
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('ANOMALY_DETECTOR') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnomalyDetector(Detector): |
|
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
|
|
def __init__(self, analytic_unit_id: AnalyticUnitId): |
|
|
|
|
self.analytic_unit_id = analytic_unit_id |
|
|
|
|
self.bucket = DataBucket() |
|
|
|
|
|
|
|
|
|
def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: |
|
|
|
@ -51,35 +53,31 @@ class AnomalyDetector(Detector):
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: |
|
|
|
|
self.detect(data, cache) |
|
|
|
|
if cache is None: |
|
|
|
|
msg = f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}' |
|
|
|
|
logging.debug(msg) |
|
|
|
|
raise ValueError(msg) |
|
|
|
|
|
|
|
|
|
data_without_nan = data.dropna() |
|
|
|
|
|
|
|
|
|
def __smooth_data(self, dataframe: pd.DataFrame) -> List[Tuple[int, float]]: |
|
|
|
|
''' |
|
|
|
|
smooth data using exponential smoothing/moving average/weighted_average |
|
|
|
|
''' |
|
|
|
|
|
|
|
|
|
def __get_confidence_window(self, smooth_data: pd.Series, condfidence: float) -> Tuple[pd.Series, pd.Series]: |
|
|
|
|
''' |
|
|
|
|
build confidence interval above and below smoothed data |
|
|
|
|
''' |
|
|
|
|
if len(data_without_nan) == 0: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def __get_dependency_level(self, alpha: float) -> int: |
|
|
|
|
''' |
|
|
|
|
get the number of values that will affect the next value |
|
|
|
|
''' |
|
|
|
|
self.bucket.receive_data(data_without_nan) |
|
|
|
|
|
|
|
|
|
if len(self.bucket.data) >= self.get_window_size(cache): |
|
|
|
|
self.detect(self.bucket, cache) |
|
|
|
|
|
|
|
|
|
for level in range(1, 100): |
|
|
|
|
if (1 - alpha) ** level < 0.1: |
|
|
|
|
break |
|
|
|
|
return level |
|
|
|
|
|
|
|
|
|
def get_window_size(self, cache: Optional[ModelCache]) -> int: |
|
|
|
|
if cache is None: |
|
|
|
|
raise ValueError('anomaly detector got None cache') |
|
|
|
|
|
|
|
|
|
#TODO: calculate value based on `alpha` value from cache |
|
|
|
|
return 1 |
|
|
|
|
for level in range(1, 100): |
|
|
|
|
if (1 - cache['alpha']) ** level < 0.1: |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
return level |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_detection_intersected(self) -> bool: |
|
|
|
|