diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index 116079e..2061df9 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -1,12 +1,10 @@ from typing import Dict -import pandas as pd -import numpy as np import logging as log import traceback from concurrent.futures import Executor, ThreadPoolExecutor -import detectors from analytic_unit_worker import AnalyticUnitWorker +import detectors from models import ModelCache @@ -25,18 +23,6 @@ def get_detector_by_type( raise ValueError('Unknown detector type "%s"' % detector_type) -def prepare_data(data: list) -> pd.DataFrame: - """ - Takes list - - converts it into pd.DataFrame, - - converts 'timestamp' column to pd.Datetime, - - subtracts min value from the dataset - """ - data = pd.DataFrame(data, columns=['timestamp', 'value']) - data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms') - data.fillna(value = np.nan, inplace = True) - return data - class AnalyticUnitManager: @@ -71,7 +57,7 @@ class AnalyticUnitManager: payload = task['payload'] worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType']) - data = prepare_data(payload['data']) + data = payload['data'] if task['type'] == 'PUSH': # TODO: do it a better way res = await worker.consume_data(data, payload['cache']) diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 85cea7f..e1451a8 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -7,7 +7,7 @@ from models import ModelCache import concurrent.futures import asyncio -from utils import get_data_chunks +from utils import get_intersected_chunks, get_chunks, prepare_data logger = logging.getLogger('AnalyticUnitWorker') @@ -16,6 +16,10 @@ logger = logging.getLogger('AnalyticUnitWorker') class AnalyticUnitWorker: CHUNK_WINDOW_SIZE_FACTOR = 100 + CHUNK_INTERSECTION_FACTOR = 2 + + assert CHUNK_WINDOW_SIZE_FACTOR > CHUNK_INTERSECTION_FACTOR, \ + 'CHUNK_INTERSECTION_FACTOR should be less than CHUNK_WINDOW_SIZE_FACTOR' def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor): self.analytic_unit_id = analytic_unit_id @@ -24,10 +28,13 @@ class AnalyticUnitWorker: self._training_future: asyncio.Future = None async def do_train( - self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache] + self, payload: Union[list, dict], data: list, cache: Optional[ModelCache] ) -> Optional[ModelCache]: + + dataframe = prepare_data(data) + cfuture: concurrent.futures.Future = self._executor.submit( - self._detector.train, data, payload, cache + self._detector.train, dataframe, payload, cache ) self._training_future = asyncio.wrap_future(cfuture) try: @@ -38,13 +45,15 @@ class AnalyticUnitWorker: except asyncio.TimeoutError: raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) - async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: + async def do_detect(self, data: list, cache: Optional[ModelCache]) -> dict: if cache is None: msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' logger.error(msg) raise ValueError(msg) - + window_size = self._detector.get_window_size(cache) + chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR + chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR detection_result = { 'cache': None, @@ -52,9 +61,10 @@ class AnalyticUnitWorker: 'lastDetectionTime': None } - for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): + for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size): await asyncio.sleep(0) - detected = self._detector.detect(chunk, cache) + chunk_dataframe = prepare_data(chunk) + detected = self._detector.detect(chunk_dataframe, cache) self.__append_detection_result(detection_result, detected) return detection_result @@ -63,7 +73,7 @@ class AnalyticUnitWorker: if self._training_future is not None: self._training_future.cancel() - async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): + async def consume_data(self, data: list, cache: Optional[ModelCache]): if cache is None: msg = f'{self.analytic_unit_id} consume_data got invalid cache, skip detection' logger.error(msg) @@ -78,10 +88,10 @@ class AnalyticUnitWorker: 'lastDetectionTime': None } - #TODO: remove code duplication with do_detect - for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): + for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): await asyncio.sleep(0) - detected = self._detector.consume_data(chunk, cache) + chunk_dataframe = prepare_data(chunk) + detected = self._detector.consume_data(chunk_dataframe, cache) self.__append_detection_result(detection_result, detected) return detection_result diff --git a/analytics/analytics/utils/dataframe.py b/analytics/analytics/utils/dataframe.py index a425d11..35daa81 100644 --- a/analytics/analytics/utils/dataframe.py +++ b/analytics/analytics/utils/dataframe.py @@ -1,21 +1,35 @@ -from typing import Generator +from itertools import chain import pandas as pd +import numpy as np +from typing import Generator -def get_data_chunks(dataframe: pd.DataFrame, window_size: int, chunk_size: int) -> Generator[pd.DataFrame, None, None]: +def prepare_data(data: list) -> pd.DataFrame: + """ + Takes list + - converts it into pd.DataFrame, + - converts 'timestamp' column to pd.Datetime, + - subtracts min value from the dataset + """ + data = pd.DataFrame(data, columns=['timestamp', 'value']) + data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms') + data.fillna(value = np.nan, inplace = True) + return data + +def get_intersected_chunks(data: list, intersection: int, chunk_size: int) -> Generator[list, None, None]: """ Returns generator that splits dataframe on intersected segments. Intersection makes it able to detect pattern that present in dataframe on the border between chunks. - window_size - length of intersection. + intersection - length of intersection. chunk_size - length of chunk """ - data_len = len(dataframe) + data_len = len(data) if data_len <= chunk_size: - yield dataframe + yield data return - nonintersected = chunk_size - 2 * window_size + nonintersected = chunk_size - intersection offset = 0 while True: @@ -23,8 +37,24 @@ def get_data_chunks(dataframe: pd.DataFrame, window_size: int, chunk_size: int) if left_values == 0: break if left_values <= chunk_size: - yield dataframe[offset : data_len].reset_index() + yield data[offset : data_len] break else: - yield dataframe[offset: offset + chunk_size].reset_index() + yield data[offset: offset + chunk_size] offset += min(nonintersected, left_values) + +def get_chunks(data: list, chunk_size: int) -> Generator[list, None, None]: + """ + Returns generator that splits dataframe on non-intersected segments. + chunk_size - length of chunk + """ + + chunks_iterables = [iter(data)] * chunk_size + result_chunks = zip(*chunks_iterables) + partial_chunk_len = len(data) % chunk_size + + if partial_chunk_len != 0: + result_chunks = chain(result_chunks, [data[-partial_chunk_len:]]) + + for chunk in result_chunks: + yield list(chunk) diff --git a/analytics/tests/test_dataset.py b/analytics/tests/test_dataset.py index 50ce9de..cb2d4b4 100644 --- a/analytics/tests/test_dataset.py +++ b/analytics/tests/test_dataset.py @@ -1,7 +1,7 @@ import unittest import pandas as pd import numpy as np -from analytic_unit_manager import prepare_data +from utils import prepare_data import models import random import scipy.signal diff --git a/analytics/tests/test_detector_chunks.py b/analytics/tests/test_detector_chunks.py deleted file mode 100644 index e12c29d..0000000 --- a/analytics/tests/test_detector_chunks.py +++ /dev/null @@ -1,38 +0,0 @@ -import unittest -from utils import get_data_chunks -import pandas as pd - - -class TestUtils(unittest.TestCase): - - def test_chunks_generator(self): - window_size = 1 - chunk_window_size_factor = 4 - - cases = [ - (list(range(8)), [[0,1,2,3], [2,3,4,5], [4,5,6,7]]), - ([], [[]]), - (list(range(1)), [[0]]), - (list(range(4)), [[0,1,2,3]]), - (list(range(9)), [[0,1,2,3], [2,3,4,5], [4,5,6,7], [6,7,8]]) - ] - - for data, expected_chunks in cases: - data = [(x,x) for x in data] - data = pd.DataFrame(data, columns=['timestamp', 'value']) - - df_expected_chunks = [] - for chunk in expected_chunks: - chunk = [(x,x) for x in chunk] - df_expected_chunks.append(chunk) - df_expected_chunks = [pd.DataFrame(chunk, columns=['timestamp', 'value']) for chunk in df_expected_chunks] - chunks = tuple(get_data_chunks(data, window_size, window_size * chunk_window_size_factor)) - df_expected_chunks = [df.reset_index() for df in df_expected_chunks] - - zipped = zip(chunks, df_expected_chunks) - map(lambda a,b: self.assertTrue(a.equals(b)), zipped) - - - -if __name__ == '__main__': - unittest.main() diff --git a/analytics/tests/test_utils_dataframe.py b/analytics/tests/test_utils_dataframe.py new file mode 100644 index 0000000..2985d6f --- /dev/null +++ b/analytics/tests/test_utils_dataframe.py @@ -0,0 +1,43 @@ +import unittest +from utils import get_intersected_chunks, get_chunks +import pandas as pd + + +class TestUtils(unittest.TestCase): + + def test_chunks_generator(self): + intersection = 2 + chunk_size = 4 + + cases = [ + (list(range(8)), [[0,1,2,3], [2,3,4,5], [4,5,6,7]]), + ([], [[]]), + (list(range(1)), [[0]]), + (list(range(4)), [[0,1,2,3]]), + (list(range(9)), [[0,1,2,3], [2,3,4,5], [4,5,6,7], [6,7,8]]) + ] + + for tested, expected in cases: + tested_chunks = get_intersected_chunks(tested, intersection, chunk_size) + self.assertSequenceEqual(tuple(tested_chunks), expected) + + + def test_non_intersected_chunks(self): + chunk_size = 4 + + cases = [ + (tuple(range(12)), [[0,1,2,3], [4,5,6,7], [8,9,10,11]]), + (tuple(range(9)), [[0,1,2,3], [4,5,6,7], [8]]), + (tuple(range(10)), [[0,1,2,3], [4,5,6,7], [8,9]]), + (tuple(range(11)), [[0,1,2,3], [4,5,6,7], [8,9,10]]), + ([], []), + (tuple(range(1)), [[0]]), + (tuple(range(4)), [[0,1,2,3]]) + ] + + for tested, expected in cases: + tested_chunks = list(get_chunks(tested, chunk_size)) + self.assertSequenceEqual(tested_chunks, expected) + +if __name__ == '__main__': + unittest.main()