diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 71844cc..7b00849 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -35,7 +35,8 @@ class AnalyticUnitWorker: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: - return self._detector.detect(data, cache) + # TODO: return without await + return await self._detector.detect(data, cache) def cancel(self): if self._training_future is not None: diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index b0738a4..99de16b 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -14,7 +14,7 @@ class Detector(ABC): pass @abstractmethod - def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: + async def detect(self, dataframe: DataFrame, cache: Optional[ModelCache]) -> dict: pass @abstractmethod diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index b84a348..b002dc7 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -1,10 +1,11 @@ import models +import asyncio import logging import config import pandas as pd -from typing import Optional +from typing import Optional, Generator from detectors import Detector from buckets import DataBucket @@ -33,11 +34,12 @@ def resolve_model_by_pattern(pattern: str) -> models.Model: AnalyticUnitId = str class PatternDetector(Detector): + MIN_BUCKET_SIZE = 150 + def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId): self.analytic_unit_id = analytic_unit_id self.pattern_type = pattern_type self.model = resolve_model_by_pattern(self.pattern_type) - self.min_bucket_size = 150 self.bucket = DataBucket() def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.ModelCache]) -> models.ModelCache: @@ -49,12 +51,32 @@ class PatternDetector(Detector): 'cache': new_cache } - def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: + async def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict: logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe))) # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) - detected = self.model.detect(dataframe, self.analytic_unit_id, cache) - segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']] + if not cache: + msg = f'{self.analytic_unit_id} detection got invalid cache {cache}, skip detection' + logger.error(msg) + raise ValueError(msg) + + window_size = cache.get('WINDOW_SIZE') + + if not window_size: + msg = f'{self.analytic_unit_id} detection got invalid window size {window_size}' + + chunks = self.__get_data_chunks(dataframe, window_size) + + segments = [] + segment_parser = lambda segment: { 'from': segment[0], 'to': segment[1] } + for chunk in chunks: + await asyncio.sleep(0) + detected = self.model.detect(dataframe, self.analytic_unit_id, cache) + for detected_segment in detected['segments']: + detected_segment = segment_parser(detected_segment) + if detected_segment not in segments: + segments.append(detected_segment) + newCache = detected['cache'] last_dataframe_time = dataframe.iloc[-1]['timestamp'] @@ -76,7 +98,7 @@ class PatternDetector(Detector): if cache == None: logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id)) cache = {} - bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.min_bucket_size) + bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.MIN_BUCKET_SIZE) res = self.detect(self.bucket.data, cache) @@ -88,3 +110,31 @@ class PatternDetector(Detector): return res else: return None + + def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]: + """ + TODO: fix description + Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step. + Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5]. + """ + chunk_size = window_size * 100 + intersection = window_size + + data_len = len(dataframe) + + if data_len < chunk_size: + return (chunk for chunk in (dataframe,)) + + def slices(): + nonintersected = chunk_size - intersection + mod = data_len % nonintersected + chunks_number = data_len // nonintersected + + offset = 0 + for i in range(chunks_number): + yield slice(offset, offset + nonintersected + 1) + offset += nonintersected + + yield slice(offset, offset + mod) + + return (dataframe[chunk_slice] for chunk_slice in slices()) diff --git a/analytics/analytics/detectors/threshold_detector.py b/analytics/analytics/detectors/threshold_detector.py index dc5320f..2790a1f 100644 --- a/analytics/analytics/detectors/threshold_detector.py +++ b/analytics/analytics/detectors/threshold_detector.py @@ -25,7 +25,7 @@ class ThresholdDetector(Detector): } } - def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: + async def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict: if cache == None: raise 'Threshold detector error: cannot detect before learning' value = cache['value'] diff --git a/analytics/bin/server b/analytics/bin/server index d84647f..996c9ec 100755 --- a/analytics/bin/server +++ b/analytics/bin/server @@ -100,7 +100,7 @@ async def app_loop(): if __name__ == "__main__": loop = asyncio.get_event_loop() - loop.set_debug(True) + #loop.set_debug(True) logger.info("Ok") server_service, data_service, analytic_unit_manager = init_services() print('Analytics process is running') # we need to print to stdout and flush diff --git a/analytics/tests/test_detector_chunks.py b/analytics/tests/test_detector_chunks.py new file mode 100644 index 0000000..3bef5d5 --- /dev/null +++ b/analytics/tests/test_detector_chunks.py @@ -0,0 +1,22 @@ +import unittest +from detectors.pattern_detector import PatternDetector + +def rlist(start, stop): + return [x for x in range(start, stop + 1)] +class TestUtils(unittest.TestCase): + + def test_chunks_generator(self): + window_size = 1 + + cases = [ + ([], [[]]), + (rlist(0, 300), [rlist(0,99),rlist(99,198),rlist(198,297),rlist(297,300)]) + ] + + for data, expected_chunks in cases: + chunks = tuple(PatternDetector._PatternDetector__get_data_chunks(None, data, window_size)) + self.assertSequenceEqual(chunks, expected_chunks) + + +if __name__ == '__main__': + unittest.main()