From fb2f350e950f74347664687534231f40a5310df2 Mon Sep 17 00:00:00 2001 From: Evgeny Smyshlyaev Date: Thu, 11 Apr 2019 00:34:54 +0300 Subject: [PATCH] Dataframe for detection less than two window size (#532) 2*WINDOW_SIZE checks --- analytics/analytics/analytic_unit_worker.py | 18 +++------ .../analytics/detectors/pattern_detector.py | 40 ++++++++++++++++--- .../analytics/detectors/threshold_detector.py | 4 +- analytics/tests/test_detectors.py | 30 ++++++++++++++ 4 files changed, 72 insertions(+), 20 deletions(-) create mode 100644 analytics/tests/test_detectors.py diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index e1451a8..3501334 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -45,11 +45,7 @@ class AnalyticUnitWorker: except asyncio.TimeoutError: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) - async def do_detect(self, data: list, cache: Optional[ModelCache]) -> dict: - if cache is None: - msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' - logger.error(msg) - raise ValueError(msg) + async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: window_size = self._detector.get_window_size(cache) chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR @@ -73,12 +69,7 @@ class AnalyticUnitWorker: if self._training_future is not None: self._training_future.cancel() - async def consume_data(self, data: list, cache: Optional[ModelCache]): - if cache is None: - msg = f'{self.analytic_unit_id} consume_data got invalid cache, skip detection' - logger.error(msg) - raise ValueError(msg) - + async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: window_size = self._detector.get_window_size(cache) #TODO: make class DetectionResult @@ -94,7 +85,10 @@ class AnalyticUnitWorker: detected = self._detector.consume_data(chunk_dataframe, cache) self.__append_detection_result(detection_result, detected) - return detection_result + if detection_result['lastDetectionTime'] is None: + return None + else: + return detection_result def __append_detection_result(self, detection_result: dict, new_chunk: dict): if new_chunk is not None: diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 3301b2e..3223075 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -57,6 +57,22 @@ class PatternDetector(Detector): logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) + if cache is None or cache == {}: + msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = cache.get('WINDOW_SIZE') + if window_size is None: + message = '{} got cache without WINDOW_SIZE for detection'.format(self.analytic_unit_id) + logger.error(message) + raise ValueError(message) + + if len(dataframe) < window_size * 2: + message = f'{self.analytic_unit_id} skip detection: data length: {len(dataframe)} less than WINDOW_SIZE: {window_size}' + logger.error(message) + raise ValueError(message) + detected = self.model.detect(dataframe, self.analytic_unit_id, cache) segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] @@ -71,23 +87,35 @@ class PatternDetector(Detector): def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) + + if cache is None or cache == {}: + logging.debug(f'consume_data get invalid cache {cache} for task {self.analytic_unit_id}, skip') + return None + data_without_nan = data.dropna() if len(data_without_nan) == 0: return None self.bucket.receive_data(data_without_nan) - if cache == None: - logging.debug('consume_data cache is None for task {}'.format(self.analytic_unit_id)) - cache = {} - bucket_size = max(cache.get('WINDOW_SIZE', 0) * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE) + + window_size = cache['WINDOW_SIZE'] + + bucket_len = len(self.bucket.data) + if bucket_len < window_size * 2: + msg = f'{self.analytic_unit_id} bucket data {bucket_len} less than two window size {window_size * 2}, skip run detection from consume_data' + logger.debug(msg) + return None res = self.detect(self.bucket.data, cache) - if len(self.bucket.data) > bucket_size: - excess_data = len(self.bucket.data) - bucket_size + bucket_size = max(window_size * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE) + if bucket_len > bucket_size: + excess_data = bucket_len - bucket_size self.bucket.drop_data(excess_data) + logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res)) + if res: return res else: diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index a90e841..8d92130 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -28,8 +28,8 @@ class ThresholdDetector(Detector): } def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: - if cache == None: - raise 'Threshold detector error: cannot detect before learning' + if cache is None or cache == {}: + raise ValueError('Threshold detector error: cannot detect before learning') value = cache['value'] condition = cache['condition'] diff --git a/analytics/tests/test_detectors.py b/analytics/tests/test_detectors.py new file mode 100644 index 0000000..0f42643 --- /dev/null +++ b/analytics/tests/test_detectors.py @@ -0,0 +1,30 @@ +import unittest +import pandas as pd + +from detectors import pattern_detector, threshold_detector + +class TestPatternDetector(unittest.TestCase): + + def test_small_dataframe(self): + + data = [[0,1], [1,2]] + dataframe = pd.DataFrame(data, columns=['timestamp', 'values']) + cache = {'WINDOW_SIZE': 10} + + detector = pattern_detector.PatternDetector('GENERAL', 'test_id') + + with self.assertRaises(ValueError): + detector.detect(dataframe, cache) + + +class TestThresholdDetector(unittest.TestCase): + + def test_invalid_cache(self): + + detector = threshold_detector.ThresholdDetector() + + with self.assertRaises(ValueError): + detector.detect([], None) + + with self.assertRaises(ValueError): + detector.detect([], {})