diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 7c57ad7..7b00849 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -2,7 +2,7 @@ import config import detectors import logging import pandas as pd -from typing import Optional, Union, Generator +from typing import Optional, Union from models import ModelCache import concurrent.futures import asyncio @@ -35,83 +35,12 @@ class AnalyticUnitWorker: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - if cache is None: - msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' - logger.error(msg) - raise ValueError(msg) - - window_size = cache['WINDOW_SIZE'] - chunks = self.__get_data_chunks(data, window_size) - - detection_result = { - 'cache': None, - 'segments': [], - 'lastDetectionTime': None - } - - for chunk in chunks: - await asyncio.sleep(0) - detected = self._detector.detect(data, cache) - if detected is not None: - detection_result['cache'] = detected['cache'] - detection_result['lastDetectionTime'] = detected['lastDetectionTime'] - detection_result['segments'].extend(detected['segments']) - - return detection_result + # TODO: return without await + return await self._detector.detect(data, cache) def cancel(self): if self._training_future is not None: self._training_future.cancel() async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): - if cache is None: - msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' - logger.error(msg) - raise ValueError(msg) - - window_size = cache['WINDOW_SIZE'] - chunks = self.__get_data_chunks(data, window_size) - - detection_result = { - 'cache': None, - 'segments': [], - 'lastDetectionTime': None - } - - for chunk in chunks: - await asyncio.sleep(0) - detected = self._detector.recieve_data(data, cache) - if detected is not None: - detection_result['cache'] = detected['cache'] - detection_result['lastDetectionTime'] = detected['lastDetectionTime'] - detection_result['segments'].extend(detected['segments']) - - return detection_result - - def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]: - """ - TODO: fix description - Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step. - Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5]. - """ - chunk_size = window_size * 100 - intersection = window_size - - data_len = len(dataframe) - - if data_len < chunk_size: - return (chunk for chunk in (dataframe,)) - - def slices(): - nonintersected = chunk_size - intersection - mod = data_len % nonintersected - chunks_number = data_len // nonintersected - - offset = 0 - for i in range(chunks_number): - yield slice(offset, offset + nonintersected + 1) - offset += nonintersected - - yield slice(offset, offset + mod) - - return (dataframe[chunk_slice] for chunk_slice in slices()) + return self._detector.recieve_data(data, cache) diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index b0738a4..99de16b 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -14,7 +14,7 @@ class Detector(ABC): pass @abstractmethod - def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: + async def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: pass @abstractmethod diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 4949cb4..b002dc7 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -51,13 +51,32 @@ class PatternDetector(Detector): 'cache': new_cache } - def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: + async def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) - - detected = self.model.detect(dataframe, self.analytic_unit_id, cache) - segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] + if not cache: + msg = f'{self.analytic_unit_id} detection got invalid cache {cache}, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = cache.get('WINDOW_SIZE') + + if not window_size: + msg = f'{self.analytic_unit_id} detection got invalid window size {window_size}' + + chunks = self.__get_data_chunks(dataframe, window_size) + + segments = [] + segment_parser = lambda segment: { 'from': segment[0], 'to': segment[1] } + for chunk in chunks: + await asyncio.sleep(0) + detected = self.model.detect(dataframe, self.analytic_unit_id, cache) + for detected_segment in detected['segments']: + detected_segment = segment_parser(detected_segment) + if detected_segment not in segments: + segments.append(detected_segment) + newCache = detected['cache'] last_dataframe_time = dataframe.iloc[-1]['timestamp'] @@ -79,7 +98,7 @@ class PatternDetector(Detector): if cache == None: logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id)) cache = {} - bucket_size = max(cache.get('WINDOW_SIZE', 0) * 5, self.MIN_BUCKET_SIZE) + bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.MIN_BUCKET_SIZE) res = self.detect(self.bucket.data, cache) @@ -92,3 +111,30 @@ class PatternDetector(Detector): else: return None + def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]: + """ + TODO: fix description + Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step. + Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5]. + """ + chunk_size = window_size * 100 + intersection = window_size + + data_len = len(dataframe) + + if data_len < chunk_size: + return (chunk for chunk in (dataframe,)) + + def slices(): + nonintersected = chunk_size - intersection + mod = data_len % nonintersected + chunks_number = data_len // nonintersected + + offset = 0 + for i in range(chunks_number): + yield slice(offset, offset + nonintersected + 1) + offset += nonintersected + + yield slice(offset, offset + mod) + + return (dataframe[chunk_slice] for chunk_slice in slices()) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index dc5320f..2790a1f 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -25,7 +25,7 @@ class ThresholdDetector(Detector): } } - def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: + async def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: if cache == None: raise 'Threshold detector error: cannot detect before learning' value = cache['value']