diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 2414e2f..7cddfe1 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -57,7 +57,14 @@ class AnalyticUnitWorker: 'lastDetectionTime': None } - for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size): + chunks = [] + # XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size) + if self._detector.is_detection_intersected(): + chunks = get_intersected_chunks(data, chunk_intersection, chunk_size) + else: + chunks = get_chunks(data, chunk_size) + + for chunk in chunks: await asyncio.sleep(0) chunk_dataframe = prepare_data(chunk) detected = self._detector.detect(chunk_dataframe, cache) @@ -86,7 +93,7 @@ class AnalyticUnitWorker: self.__append_detection_result(detection_result, detected) detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) - + if detection_result['lastDetectionTime'] is None: return None else: diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 400ea3a..704907a 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -7,6 +7,7 @@ from analytic_types.data_bucket import DataBucket from detectors import Detector from models import ModelCache import utils +from analytic_types import AnalyticUnitId MAX_DEPENDENCY_LEVEL = 100 MIN_DEPENDENCY_FACTOR = 0.1 @@ -15,7 +16,8 @@ logger = logging.getLogger('ANOMALY_DETECTOR') class AnomalyDetector(Detector): - def __init__(self, *args, **kwargs): + def __init__(self, analytic_unit_id: AnalyticUnitId): + self.analytic_unit_id = analytic_unit_id self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: @@ -28,7 +30,10 @@ class AnomalyDetector(Detector): def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: data = dataframe['value'] - last_values = None + alpha = cache['alpha'] + confidence = cache['confidence'] + + last_value = None if cache is not None: last_values = cache.get('last_values') @@ -55,14 +60,23 @@ class AnomalyDetector(Detector): } def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: - self.detect(data, cache) + if cache is None: + msg = f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}' + logging.debug(msg) + raise ValueError(msg) + data_without_nan = data.dropna() - def get_window_size(self, cache: Optional[ModelCache]) -> int: - ''' - get the number of values that will affect the next value - ''' + if len(data_without_nan) == 0: + return None + + self.bucket.receive_data(data_without_nan) + if len(self.bucket.data) >= self.get_window_size(cache): + self.detect(self.bucket, cache) + + + def get_window_size(self, cache: Optional[ModelCache]) -> int: if cache is None: raise ValueError('anomaly detector got None cache') @@ -76,3 +90,6 @@ class AnomalyDetector(Detector): segments = utils.merge_intersecting_intervals(segments) segments = [{'from': segment[0], 'to': segment[1]} for segment in segments] return segments + + def is_detection_intersected(self) -> bool: + return False diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index def7c3c..b0f891b 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -28,3 +28,6 @@ class Detector(ABC): @abstractmethod def get_intersections(self, segments: List[dict]) -> List[dict]: pass + + def is_detection_intersected(self) -> bool: + return True