Browse Source

Non intersected chunks for consuming data #529 (#530)

pull/1/head
Evgeny Smyshlyaev 5 years ago committed by GitHub
parent
commit
4cef0545e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      analytics/analytics/analytic_unit_manager.py
  2. 32
      analytics/analytics/analytic_unit_worker.py
  3. 46
      analytics/analytics/utils/dataframe.py
  4. 2
      analytics/tests/test_dataset.py
  5. 38
      analytics/tests/test_detector_chunks.py
  6. 43
      analytics/tests/test_utils_dataframe.py

18
analytics/analytics/analytic_unit_manager.py

@ -1,12 +1,10 @@
from typing import Dict from typing import Dict
import pandas as pd
import numpy as np
import logging as log import logging as log
import traceback import traceback
from concurrent.futures import Executor, ThreadPoolExecutor from concurrent.futures import Executor, ThreadPoolExecutor
import detectors
from analytic_unit_worker import AnalyticUnitWorker from analytic_unit_worker import AnalyticUnitWorker
import detectors
from models import ModelCache from models import ModelCache
@ -25,18 +23,6 @@ def get_detector_by_type(
raise ValueError('Unknown detector type "%s"' % detector_type) raise ValueError('Unknown detector type "%s"' % detector_type)
def prepare_data(data: list) -> pd.DataFrame:
"""
Takes list
- converts it into pd.DataFrame,
- converts 'timestamp' column to pd.Datetime,
- subtracts min value from the dataset
"""
data = pd.DataFrame(data, columns=['timestamp', 'value'])
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms')
data.fillna(value = np.nan, inplace = True)
return data
class AnalyticUnitManager: class AnalyticUnitManager:
@ -71,7 +57,7 @@ class AnalyticUnitManager:
payload = task['payload'] payload = task['payload']
worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType']) worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType'])
data = prepare_data(payload['data']) data = payload['data']
if task['type'] == 'PUSH': if task['type'] == 'PUSH':
# TODO: do it a better way # TODO: do it a better way
res = await worker.consume_data(data, payload['cache']) res = await worker.consume_data(data, payload['cache'])

32
analytics/analytics/analytic_unit_worker.py

@ -7,7 +7,7 @@ from models import ModelCache
import concurrent.futures import concurrent.futures
import asyncio import asyncio
from utils import get_data_chunks from utils import get_intersected_chunks, get_chunks, prepare_data
logger = logging.getLogger('AnalyticUnitWorker') logger = logging.getLogger('AnalyticUnitWorker')
@ -16,6 +16,10 @@ logger = logging.getLogger('AnalyticUnitWorker')
class AnalyticUnitWorker: class AnalyticUnitWorker:
CHUNK_WINDOW_SIZE_FACTOR = 100 CHUNK_WINDOW_SIZE_FACTOR = 100
CHUNK_INTERSECTION_FACTOR = 2
assert CHUNK_WINDOW_SIZE_FACTOR > CHUNK_INTERSECTION_FACTOR, \
'CHUNK_INTERSECTION_FACTOR should be less than CHUNK_WINDOW_SIZE_FACTOR'
def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor): def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor):
self.analytic_unit_id = analytic_unit_id self.analytic_unit_id = analytic_unit_id
@ -24,10 +28,13 @@ class AnalyticUnitWorker:
self._training_future: asyncio.Future = None self._training_future: asyncio.Future = None
async def do_train( async def do_train(
self, payload: Union[list, dict], data: pd.DataFrame, cache: Optional[ModelCache] self, payload: Union[list, dict], data: list, cache: Optional[ModelCache]
) -> Optional[ModelCache]: ) -> Optional[ModelCache]:
dataframe = prepare_data(data)
cfuture: concurrent.futures.Future = self._executor.submit( cfuture: concurrent.futures.Future = self._executor.submit(
self._detector.train, data, payload, cache self._detector.train, dataframe, payload, cache
) )
self._training_future = asyncio.wrap_future(cfuture) self._training_future = asyncio.wrap_future(cfuture)
try: try:
@ -38,13 +45,15 @@ class AnalyticUnitWorker:
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT)) raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT))
async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict: async def do_detect(self, data: list, cache: Optional[ModelCache]) -> dict:
if cache is None: if cache is None:
msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection' msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection'
logger.error(msg) logger.error(msg)
raise ValueError(msg) raise ValueError(msg)
window_size = self._detector.get_window_size(cache) window_size = self._detector.get_window_size(cache)
chunk_size = window_size * self.CHUNK_WINDOW_SIZE_FACTOR
chunk_intersection = window_size * self.CHUNK_INTERSECTION_FACTOR
detection_result = { detection_result = {
'cache': None, 'cache': None,
@ -52,9 +61,10 @@ class AnalyticUnitWorker:
'lastDetectionTime': None 'lastDetectionTime': None
} }
for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR): for chunk in get_intersected_chunks(data, chunk_intersection, chunk_size):
await asyncio.sleep(0) await asyncio.sleep(0)
detected = self._detector.detect(chunk, cache) chunk_dataframe = prepare_data(chunk)
detected = self._detector.detect(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected) self.__append_detection_result(detection_result, detected)
return detection_result return detection_result
@ -63,7 +73,7 @@ class AnalyticUnitWorker:
if self._training_future is not None: if self._training_future is not None:
self._training_future.cancel() self._training_future.cancel()
async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]): async def consume_data(self, data: list, cache: Optional[ModelCache]):
if cache is None: if cache is None:
msg = f'{self.analytic_unit_id} consume_data got invalid cache, skip detection' msg = f'{self.analytic_unit_id} consume_data got invalid cache, skip detection'
logger.error(msg) logger.error(msg)
@ -78,10 +88,10 @@ class AnalyticUnitWorker:
'lastDetectionTime': None 'lastDetectionTime': None
} }
#TODO: remove code duplication with do_detect for chunk in get_chunks(data, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0) await asyncio.sleep(0)
detected = self._detector.consume_data(chunk, cache) chunk_dataframe = prepare_data(chunk)
detected = self._detector.consume_data(chunk_dataframe, cache)
self.__append_detection_result(detection_result, detected) self.__append_detection_result(detection_result, detected)
return detection_result return detection_result

46
analytics/analytics/utils/dataframe.py

@ -1,21 +1,35 @@
from typing import Generator from itertools import chain
import pandas as pd import pandas as pd
import numpy as np
from typing import Generator
def get_data_chunks(dataframe: pd.DataFrame, window_size: int, chunk_size: int) -> Generator[pd.DataFrame, None, None]: def prepare_data(data: list) -> pd.DataFrame:
"""
Takes list
- converts it into pd.DataFrame,
- converts 'timestamp' column to pd.Datetime,
- subtracts min value from the dataset
"""
data = pd.DataFrame(data, columns=['timestamp', 'value'])
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms')
data.fillna(value = np.nan, inplace = True)
return data
def get_intersected_chunks(data: list, intersection: int, chunk_size: int) -> Generator[list, None, None]:
""" """
Returns generator that splits dataframe on intersected segments. Returns generator that splits dataframe on intersected segments.
Intersection makes it able to detect pattern that present in dataframe on the border between chunks. Intersection makes it able to detect pattern that present in dataframe on the border between chunks.
window_size - length of intersection. intersection - length of intersection.
chunk_size - length of chunk chunk_size - length of chunk
""" """
data_len = len(dataframe) data_len = len(data)
if data_len <= chunk_size: if data_len <= chunk_size:
yield dataframe yield data
return return
nonintersected = chunk_size - 2 * window_size nonintersected = chunk_size - intersection
offset = 0 offset = 0
while True: while True:
@ -23,8 +37,24 @@ def get_data_chunks(dataframe: pd.DataFrame, window_size: int, chunk_size: int)
if left_values == 0: if left_values == 0:
break break
if left_values <= chunk_size: if left_values <= chunk_size:
yield dataframe[offset : data_len].reset_index() yield data[offset : data_len]
break break
else: else:
yield dataframe[offset: offset + chunk_size].reset_index() yield data[offset: offset + chunk_size]
offset += min(nonintersected, left_values) offset += min(nonintersected, left_values)
def get_chunks(data: list, chunk_size: int) -> Generator[list, None, None]:
"""
Returns generator that splits dataframe on non-intersected segments.
chunk_size - length of chunk
"""
chunks_iterables = [iter(data)] * chunk_size
result_chunks = zip(*chunks_iterables)
partial_chunk_len = len(data) % chunk_size
if partial_chunk_len != 0:
result_chunks = chain(result_chunks, [data[-partial_chunk_len:]])
for chunk in result_chunks:
yield list(chunk)

2
analytics/tests/test_dataset.py

@ -1,7 +1,7 @@
import unittest import unittest
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from analytic_unit_manager import prepare_data from utils import prepare_data
import models import models
import random import random
import scipy.signal import scipy.signal

38
analytics/tests/test_detector_chunks.py

@ -1,38 +0,0 @@
import unittest
from utils import get_data_chunks
import pandas as pd
class TestUtils(unittest.TestCase):
def test_chunks_generator(self):
window_size = 1
chunk_window_size_factor = 4
cases = [
(list(range(8)), [[0,1,2,3], [2,3,4,5], [4,5,6,7]]),
([], [[]]),
(list(range(1)), [[0]]),
(list(range(4)), [[0,1,2,3]]),
(list(range(9)), [[0,1,2,3], [2,3,4,5], [4,5,6,7], [6,7,8]])
]
for data, expected_chunks in cases:
data = [(x,x) for x in data]
data = pd.DataFrame(data, columns=['timestamp', 'value'])
df_expected_chunks = []
for chunk in expected_chunks:
chunk = [(x,x) for x in chunk]
df_expected_chunks.append(chunk)
df_expected_chunks = [pd.DataFrame(chunk, columns=['timestamp', 'value']) for chunk in df_expected_chunks]
chunks = tuple(get_data_chunks(data, window_size, window_size * chunk_window_size_factor))
df_expected_chunks = [df.reset_index() for df in df_expected_chunks]
zipped = zip(chunks, df_expected_chunks)
map(lambda a,b: self.assertTrue(a.equals(b)), zipped)
if __name__ == '__main__':
unittest.main()

43
analytics/tests/test_utils_dataframe.py

@ -0,0 +1,43 @@
import unittest
from utils import get_intersected_chunks, get_chunks
import pandas as pd
class TestUtils(unittest.TestCase):
def test_chunks_generator(self):
intersection = 2
chunk_size = 4
cases = [
(list(range(8)), [[0,1,2,3], [2,3,4,5], [4,5,6,7]]),
([], [[]]),
(list(range(1)), [[0]]),
(list(range(4)), [[0,1,2,3]]),
(list(range(9)), [[0,1,2,3], [2,3,4,5], [4,5,6,7], [6,7,8]])
]
for tested, expected in cases:
tested_chunks = get_intersected_chunks(tested, intersection, chunk_size)
self.assertSequenceEqual(tuple(tested_chunks), expected)
def test_non_intersected_chunks(self):
chunk_size = 4
cases = [
(tuple(range(12)), [[0,1,2,3], [4,5,6,7], [8,9,10,11]]),
(tuple(range(9)), [[0,1,2,3], [4,5,6,7], [8]]),
(tuple(range(10)), [[0,1,2,3], [4,5,6,7], [8,9]]),
(tuple(range(11)), [[0,1,2,3], [4,5,6,7], [8,9,10]]),
([], []),
(tuple(range(1)), [[0]]),
(tuple(range(4)), [[0,1,2,3]])
]
for tested, expected in cases:
tested_chunks = list(get_chunks(tested, chunk_size))
self.assertSequenceEqual(tested_chunks, expected)
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save