diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 7b00849..7c57ad7 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -2,7 +2,7 @@ import config import detectors import logging import pandas as pd -from typing import Optional, Union +from typing import Optional, Union, Generator from models import ModelCache import concurrent.futures import asyncio @@ -35,12 +35,83 @@ class AnalyticUnitWorker: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - # TODO: return without await - return await self._detector.detect(data, cache) + if cache is None: + msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = cache['WINDOW_SIZE'] + chunks = self.__get_data_chunks(data, window_size) + + detection_result = { + 'cache': None, + 'segments': [], + 'lastDetectionTime': None + } + + for chunk in chunks: + await asyncio.sleep(0) + detected = self._detector.detect(data, cache) + if detected is not None: + detection_result['cache'] = detected['cache'] + detection_result['lastDetectionTime'] = detected['lastDetectionTime'] + detection_result['segments'].extend(detected['segments']) + + return detection_result def cancel(self): if self._training_future is not None: self._training_future.cancel() async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): - return self._detector.recieve_data(data, cache) + if cache is None: + msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = cache['WINDOW_SIZE'] + chunks = self.__get_data_chunks(data, window_size) + + detection_result = { + 'cache': None, + 'segments': [], + 'lastDetectionTime': None + } + + for chunk in chunks: + await asyncio.sleep(0) + detected = self._detector.recieve_data(data, cache) + if detected is not None: + detection_result['cache'] = detected['cache'] + detection_result['lastDetectionTime'] = detected['lastDetectionTime'] + detection_result['segments'].extend(detected['segments']) + + return detection_result + + def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]: + """ + TODO: fix description + Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step. + Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5]. + """ + chunk_size = window_size * 100 + intersection = window_size + + data_len = len(dataframe) + + if data_len < chunk_size: + return (chunk for chunk in (dataframe,)) + + def slices(): + nonintersected = chunk_size - intersection + mod = data_len % nonintersected + chunks_number = data_len // nonintersected + + offset = 0 + for i in range(chunks_number): + yield slice(offset, offset + nonintersected + 1) + offset += nonintersected + + yield slice(offset, offset + mod) + + return (dataframe[chunk_slice] for chunk_slice in slices()) diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 99de16b..b0738a4 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -14,7 +14,7 @@ class Detector(ABC): pass @abstractmethod - async def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: + def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: pass @abstractmethod diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index b002dc7..4949cb4 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -51,32 +51,13 @@ class PatternDetector(Detector): 'cache': new_cache } - async def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: + def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) + + detected = self.model.detect(dataframe, self.analytic_unit_id, cache) - if not cache: - msg = f'{self.analytic_unit_id} detection got invalid cache {cache}, skip detection' - logger.error(msg) - raise ValueError(msg) - - window_size = cache.get('WINDOW_SIZE') - - if not window_size: - msg = f'{self.analytic_unit_id} detection got invalid window size {window_size}' - - chunks = self.__get_data_chunks(dataframe, window_size) - - segments = [] - segment_parser = lambda segment: { 'from': segment[0], 'to': segment[1] } - for chunk in chunks: - await asyncio.sleep(0) - detected = self.model.detect(dataframe, self.analytic_unit_id, cache) - for detected_segment in detected['segments']: - detected_segment = segment_parser(detected_segment) - if detected_segment not in segments: - segments.append(detected_segment) - + segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] newCache = detected['cache'] last_dataframe_time = dataframe.iloc[-1]['timestamp'] @@ -98,7 +79,7 @@ class PatternDetector(Detector): if cache == None: logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id)) cache = {} - bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.MIN_BUCKET_SIZE) + bucket_size = max(cache.get('WINDOW_SIZE', 0) * 5, self.MIN_BUCKET_SIZE) res = self.detect(self.bucket.data, cache) @@ -111,30 +92,3 @@ class PatternDetector(Detector): else: return None - def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]: - """ - TODO: fix description - Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step. - Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5]. - """ - chunk_size = window_size * 100 - intersection = window_size - - data_len = len(dataframe) - - if data_len < chunk_size: - return (chunk for chunk in (dataframe,)) - - def slices(): - nonintersected = chunk_size - intersection - mod = data_len % nonintersected - chunks_number = data_len // nonintersected - - offset = 0 - for i in range(chunks_number): - yield slice(offset, offset + nonintersected + 1) - offset += nonintersected - - yield slice(offset, offset + mod) - - return (dataframe[chunk_slice] for chunk_slice in slices()) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index 2790a1f..dc5320f 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -25,7 +25,7 @@ class ThresholdDetector(Detector): } } - async def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: + def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: if cache == None: raise 'Threshold detector error: cannot detect before learning' value = cache['value']