diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 7167a7e..116079e 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -74,7 +74,7 @@ class AnalyticUnitManager: data = prepare_data(payload['data']) if task['type'] == 'PUSH': # TODO: do it a better way - res = await worker.recieve_data(data, payload['cache']) + res = await worker.consume_data(data, payload['cache']) if res: res.update({ 'analyticUnitId': analytic_unit_id }) return res diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 7b00849..85cea7f 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -2,17 +2,21 @@ import config import detectors import logging import pandas as pd -from typing import Optional, Union +from typing import Optional, Union, Generator from models import ModelCache import concurrent.futures import asyncio +from utils import get_data_chunks + logger = logging.getLogger('AnalyticUnitWorker') class AnalyticUnitWorker: + CHUNK_WINDOW_SIZE_FACTOR = 100 + def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor): self.analytic_unit_id = analytic_unit_id self._detector = detector @@ -35,12 +39,55 @@ class AnalyticUnitWorker: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - # TODO: return without await - return await self._detector.detect(data, cache) + if cache is None: + msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = self._detector.get_window_size(cache) + + detection_result = { + 'cache': None, + 'segments': [], + 'lastDetectionTime': None + } + + for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): + await asyncio.sleep(0) + detected = self._detector.detect(chunk, cache) + self.__append_detection_result(detection_result, detected) + + return detection_result def cancel(self): if self._training_future is not None: self._training_future.cancel() - async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): - return self._detector.recieve_data(data, cache) + async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): + if cache is None: + msg = f'{self.analytic_unit_id} consume_data got invalid cache, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = self._detector.get_window_size(cache) + + #TODO: make class DetectionResult + detection_result = { + 'cache': None, + 'segments': [], + 'lastDetectionTime': None + } + + #TODO: remove code duplication with do_detect + for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): + await asyncio.sleep(0) + detected = self._detector.consume_data(chunk, cache) + self.__append_detection_result(detection_result, detected) + + return detection_result + + def __append_detection_result(self, detection_result: dict, new_chunk: dict): + if new_chunk is not None: + detection_result['cache'] = new_chunk['cache'] + detection_result['lastDetectionTime'] = new_chunk['lastDetectionTime'] + detection_result['segments'].extend(new_chunk['segments']) diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 99de16b..c4c724e 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -18,5 +18,9 @@ class Detector(ABC): pass @abstractmethod - def recieve_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + def consume_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + pass + + @abstractmethod + def get_window_size(self, cache: Optional[ModelCache]) -> int: pass diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index b002dc7..3301b2e 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -35,6 +35,8 @@ AnalyticUnitId = str class PatternDetector(Detector): MIN_BUCKET_SIZE = 150 + BUCKET_WINDOW_SIZE_FACTOR = 5 + DEFAULT_WINDOW_SIZE = 1 def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId): self.analytic_unit_id = analytic_unit_id @@ -51,34 +53,14 @@ class PatternDetector(Detector): 'cache': new_cache } - async def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: + def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) - if not cache: - msg = f'{self.analytic_unit_id} detection got invalid cache {cache}, skip detection' - logger.error(msg) - raise ValueError(msg) - - window_size = cache.get('WINDOW_SIZE') - - if not window_size: - msg = f'{self.analytic_unit_id} detection got invalid window size {window_size}' - - chunks = self.__get_data_chunks(dataframe, window_size) - - segments = [] - segment_parser = lambda segment: { 'from': segment[0], 'to': segment[1] } - for chunk in chunks: - await asyncio.sleep(0) - detected = self.model.detect(dataframe, self.analytic_unit_id, cache) - for detected_segment in detected['segments']: - detected_segment = segment_parser(detected_segment) - if detected_segment not in segments: - segments.append(detected_segment) + detected = self.model.detect(dataframe, self.analytic_unit_id, cache) + segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] newCache = detected['cache'] - last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time) return { @@ -87,8 +69,8 @@ class PatternDetector(Detector): 'lastDetectionTime': last_detection_time } - def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: - logging.debug('Start recieve_data for analytic unit {}'.format(self.analytic_unit_id)) + def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id)) data_without_nan = data.dropna() if len(data_without_nan) == 0: @@ -96,45 +78,21 @@ class PatternDetector(Detector): self.bucket.receive_data(data_without_nan) if cache == None: - logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id)) + logging.debug('consume_data cache is None for task {}'.format(self.analytic_unit_id)) cache = {} - bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.MIN_BUCKET_SIZE) + bucket_size = max(cache.get('WINDOW_SIZE', 0) * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE) res = self.detect(self.bucket.data, cache) if len(self.bucket.data) > bucket_size: excess_data = len(self.bucket.data) - bucket_size self.bucket.drop_data(excess_data) - logging.debug('End recieve_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res)) + logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res)) if res: return res else: return None - def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]: - """ - TODO: fix description - Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step. - Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5]. - """ - chunk_size = window_size * 100 - intersection = window_size - - data_len = len(dataframe) - - if data_len < chunk_size: - return (chunk for chunk in (dataframe,)) - - def slices(): - nonintersected = chunk_size - intersection - mod = data_len % nonintersected - chunks_number = data_len // nonintersected - - offset = 0 - for i in range(chunks_number): - yield slice(offset, offset + nonintersected + 1) - offset += nonintersected - - yield slice(offset, offset + mod) - - return (dataframe[chunk_slice] for chunk_slice in slices()) + def get_window_size(self, cache: Optional[ModelCache]) -> int: + if cache is None: return self.DEFAULT_WINDOW_SIZE + return cache.get('WINDOW_SIZE', self.DEFAULT_WINDOW_SIZE) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 2790a1f..a90e841 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -14,6 +14,8 @@ logger = log.getLogger('THRESHOLD_DETECTOR') class ThresholdDetector(Detector): + WINDOW_SIZE = 3 + def __init__(self): pass @@ -25,7 +27,7 @@ class ThresholdDetector(Detector): } } - async def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: + def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: if cache == None: raise 'Threshold detector error: cannot detect before learning' value = cache['value'] @@ -68,6 +70,9 @@ class ThresholdDetector(Detector): 'lastDetectionTime': now } - def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: + def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]: result = self.detect(data, cache) return result if result else None + + def get_window_size(self, cache: Optional[ModelCache]) -> int: + return self.WINDOW_SIZE diff --git a/analytics/analytics/utils/__init__.py b/analytics/analytics/utils/__init__.py index afdd013..851cfdd 100644 --- a/analytics/analytics/utils/__init__.py +++ b/analytics/analytics/utils/__init__.py @@ -1,3 +1,4 @@ from utils.common import * from utils.segments import * from utils.time import * +from utils.dataframe import * diff --git a/analytics/analytics/utils/dataframe.py b/analytics/analytics/utils/dataframe.py new file mode 100644 index 0000000..e4c0c59 --- /dev/null +++ b/analytics/analytics/utils/dataframe.py @@ -0,0 +1,30 @@ +from typing import Generator +import pandas as pd + +def get_data_chunks(dataframe: pd.DataFrame, window_size: int, chunk_size: int) -> Generator[pd.DataFrame, None, None]: + """ + Returns generator that splits dataframe on intersected segments. + Intersection makes it able to detect pattern that present in dataframe on the border between chunks. + window_size - length of intersection. + chunk_size - length of chunk + """ + + data_len = len(dataframe) + + if data_len <= chunk_size: + yield dataframe + return + + nonintersected = chunk_size - window_size + + offset = 0 + while True: + left_values = data_len - offset + if left_values == 0: + break + if left_values <= chunk_size: + yield dataframe[offset : data_len].reset_index() + break + else: + yield dataframe[offset: offset + chunk_size].reset_index() + offset += min(nonintersected, left_values) diff --git a/analytics/tests/test_detector_chunks.py b/analytics/tests/test_detector_chunks.py index 3bef5d5..cb4739d 100644 --- a/analytics/tests/test_detector_chunks.py +++ b/analytics/tests/test_detector_chunks.py @@ -1,21 +1,38 @@ import unittest -from detectors.pattern_detector import PatternDetector +from utils import get_data_chunks +import pandas as pd + -def rlist(start, stop): - return [x for x in range(start, stop + 1)] class TestUtils(unittest.TestCase): def test_chunks_generator(self): window_size = 1 + chunk_window_size_factor = 3 cases = [ + (list(range(7)), [[0,1,2], [2,3,4], [4,5,6]]), ([], [[]]), - (rlist(0, 300), [rlist(0,99),rlist(99,198),rlist(198,297),rlist(297,300)]) + (list(range(1)), [[0]]), + (list(range(3)), [[0,1,2]]), + (list(range(8)), [[0,1,2], [2,3,4], [4,5,6], [6,7]]), + (list(range(6)), [[0,1,2], [2,3,4], [4,5]]) ] for data, expected_chunks in cases: - chunks = tuple(PatternDetector._PatternDetector__get_data_chunks(None, data, window_size)) - self.assertSequenceEqual(chunks, expected_chunks) + data = [(x,x) for x in data] + data = pd.DataFrame(data, columns=['timestamp', 'value']) + + df_expected_chunks = [] + for chunk in expected_chunks: + chunk = [(x,x) for x in chunk] + df_expected_chunks.append(chunk) + df_expected_chunks = [pd.DataFrame(chunk, columns=['timestamp', 'value']) for chunk in df_expected_chunks] + chunks = tuple(get_data_chunks(data, window_size, window_size * chunk_window_size_factor)) + df_expected_chunks = [df.reset_index() for df in df_expected_chunks] + + zipped = zip(chunks, df_expected_chunks) + map(lambda a,b: self.assertTrue(a.equals(b)), zipped) + if __name__ == '__main__':