diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 7cddfe1..2414e2f 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -57,14 +57,7 @@ class AnalyticUnitWorker: 'lastDetectionTime': None } - chunks = [] - # XXX: get_chunks(data, chunk_size) == get_intersected_chunks(data, 0, chunk_size) - if self._detector.is_detection_intersected(): - chunks = get_intersected_chunks(data, chunk_intersection, chunk_size) - else: - chunks = get_chunks(data, chunk_size) - - for chunk in chunks: + for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size): await asyncio.sleep(0) chunk_dataframe = prepare_data(chunk) detected = self._detector.detect(chunk_dataframe, cache) @@ -93,7 +86,7 @@ class AnalyticUnitWorker: self.__append_detection_result(detection_result, detected) detection_result['segments'] = self._detector.get_intersections(detection_result['segments']) - + if detection_result['lastDetectionTime'] is None: return None else: diff --git a/analytics/analytics/detectors/anomaly_detector.py b/analytics/analytics/detectors/anomaly_detector.py index 704907a..400ea3a 100644 --- a/analytics/analytics/detectors/anomaly_detector.py +++ b/analytics/analytics/detectors/anomaly_detector.py @@ -7,7 +7,6 @@ from analytic_types.data_bucket import DataBucket from detectors import Detector from models import ModelCache import utils -from analytic_types import AnalyticUnitId MAX_DEPENDENCY_LEVEL = 100 MIN_DEPENDENCY_FACTOR = 0.1 @@ -16,8 +15,7 @@ logger = logging.getLogger('ANOMALY_DETECTOR') class AnomalyDetector(Detector): - def __init__(self, analytic_unit_id: AnalyticUnitId): - self.analytic_unit_id = analytic_unit_id + def __init__(self, *args, **kwargs): self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, payload: Union[list, dict], cache: Optional[ModelCache]) -> ModelCache: @@ -30,10 +28,7 @@ class AnomalyDetector(Detector): def detect(self, dataframe: pd.DataFrame, cache: Optional[ModelCache]) -> dict: data = dataframe['value'] - alpha = cache['alpha'] - confidence = cache['confidence'] - - last_value = None + last_values = None if cache is not None: last_values = cache.get('last_values') @@ -60,23 +55,14 @@ class AnomalyDetector(Detector): } def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: - if cache is None: - msg = f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}' - logging.debug(msg) - raise ValueError(msg) - - data_without_nan = data.dropna() - - if len(data_without_nan) == 0: - return None - - self.bucket.receive_data(data_without_nan) - - if len(self.bucket.data) >= self.get_window_size(cache): - self.detect(self.bucket, cache) + self.detect(data, cache) def get_window_size(self, cache: Optional[ModelCache]) -> int: + ''' + get the number of values that will affect the next value + ''' + if cache is None: raise ValueError('anomaly detector got None cache') @@ -90,6 +76,3 @@ class AnomalyDetector(Detector): segments = utils.merge_intersecting_intervals(segments) segments = [{'from': segment[0], 'to': segment[1]} for segment in segments] return segments - - def is_detection_intersected(self) -> bool: - return False diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index b0f891b..def7c3c 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -28,6 +28,3 @@ class Detector(ABC): @abstractmethod def get_intersections(self, segments: List[dict]) -> List[dict]: pass - - def is_detection_intersected(self) -> bool: - return True