Browse Source

Send data to detection in chunks #489 (#503)

* Add `get_data_chunks` generator to `utils/dataframe.py`
* Add chunks generator usage to `analytic_worker.py`
* Add tests to `tests/test_detector_chunks.py`
* Minor fixes (constants, etc)
pull/1/head
Evgeny Smyshlyaev 5 years ago committed by rozetko
parent
commit
21f39f2a60
  1. 2
      analytics/analytics/analytic_unit_manager.py
  2. 57
      analytics/analytics/analytic_unit_worker.py
  3. 6
      analytics/analytics/detectors/detector.py
  4. 68
      analytics/analytics/detectors/pattern_detector.py
  5. 9
      analytics/analytics/detectors/threshold_detector.py
  6. 1
      analytics/analytics/utils/__init__.py
  7. 30
      analytics/analytics/utils/dataframe.py
  8. 29
      analytics/tests/test_detector_chunks.py

2
analytics/analytics/analytic_unit_manager.py

@ -74,7 +74,7 @@ class AnalyticUnitManager:
data = prepare_data(payload['data'])
if task['type'] == 'PUSH':
# TODO: do it a better way
res = await worker.recieve_data(data, payload['cache'])
res = await worker.consume_data(data, payload['cache'])
if res:
res.update({ 'analyticUnitId': analytic_unit_id })
return res

57
analytics/analytics/analytic_unit_worker.py

@ -2,17 +2,21 @@ import config
import detectors
import logging
import pandas as pd
from typing import Optional, Union
from typing import Optional, Union, Generator
from models import ModelCache
import concurrent.futures
import asyncio
from utils import get_data_chunks
logger = logging.getLogger('AnalyticUnitWorker')
class AnalyticUnitWorker:
CHUNK_WINDOW_SIZE_FACTOR = 100
def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: concurrent.futures.Executor):
self.analytic_unit_id = analytic_unit_id
self._detector = detector
@ -35,12 +39,55 @@ class AnalyticUnitWorker:
raise Exception('Timeout ({}s) exceeded while learning'.format(config.LEARNING_TIMEOUT))
async def do_detect(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> dict:
# TODO: return without await
return await self._detector.detect(data, cache)
if cache is None:
msg = f'{self.analytic_unit_id} detection got invalid cache, skip detection'
logger.error(msg)
raise ValueError(msg)
window_size = self._detector.get_window_size(cache)
detection_result = {
'cache': None,
'segments': [],
'lastDetectionTime': None
}
for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
detected = self._detector.detect(chunk, cache)
self.__append_detection_result(detection_result, detected)
return detection_result
def cancel(self):
if self._training_future is not None:
self._training_future.cancel()
async def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]):
return self._detector.recieve_data(data, cache)
async def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]):
if cache is None:
msg = f'{self.analytic_unit_id} consume_data got invalid cache, skip detection'
logger.error(msg)
raise ValueError(msg)
window_size = self._detector.get_window_size(cache)
#TODO: make class DetectionResult
detection_result = {
'cache': None,
'segments': [],
'lastDetectionTime': None
}
#TODO: remove code duplication with do_detect
for chunk in get_data_chunks(data, window_size, window_size * self.CHUNK_WINDOW_SIZE_FACTOR):
await asyncio.sleep(0)
detected = self._detector.consume_data(chunk, cache)
self.__append_detection_result(detection_result, detected)
return detection_result
def __append_detection_result(self, detection_result: dict, new_chunk: dict):
if new_chunk is not None:
detection_result['cache'] = new_chunk['cache']
detection_result['lastDetectionTime'] = new_chunk['lastDetectionTime']
detection_result['segments'].extend(new_chunk['segments'])

6
analytics/analytics/detectors/detector.py

@ -18,5 +18,9 @@ class Detector(ABC):
pass
@abstractmethod
def recieve_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
def consume_data(self, data: DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
pass
@abstractmethod
def get_window_size(self, cache: Optional[ModelCache]) -> int:
pass

68
analytics/analytics/detectors/pattern_detector.py

@ -35,6 +35,8 @@ AnalyticUnitId = str
class PatternDetector(Detector):
MIN_BUCKET_SIZE = 150
BUCKET_WINDOW_SIZE_FACTOR = 5
DEFAULT_WINDOW_SIZE = 1
def __init__(self, pattern_type: str, analytic_unit_id: AnalyticUnitId):
self.analytic_unit_id = analytic_unit_id
@ -51,34 +53,14 @@ class PatternDetector(Detector):
'cache': new_cache
}
async def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict:
def detect(self, dataframe: pd.DataFrame, cache: Optional[models.ModelCache]) -> dict:
logger.debug('Unit {} got {} data points for detection'.format(self.analytic_unit_id, len(dataframe)))
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
if not cache:
msg = f'{self.analytic_unit_id} detection got invalid cache {cache}, skip detection'
logger.error(msg)
raise ValueError(msg)
window_size = cache.get('WINDOW_SIZE')
if not window_size:
msg = f'{self.analytic_unit_id} detection got invalid window size {window_size}'
chunks = self.__get_data_chunks(dataframe, window_size)
segments = []
segment_parser = lambda segment: { 'from': segment[0], 'to': segment[1] }
for chunk in chunks:
await asyncio.sleep(0)
detected = self.model.detect(dataframe, self.analytic_unit_id, cache)
for detected_segment in detected['segments']:
detected_segment = segment_parser(detected_segment)
if detected_segment not in segments:
segments.append(detected_segment)
detected = self.model.detect(dataframe, self.analytic_unit_id, cache)
segments = [{ 'from': segment[0], 'to': segment[1] } for segment in detected['segments']]
newCache = detected['cache']
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_detection_time = convert_pd_timestamp_to_ms(last_dataframe_time)
return {
@ -87,8 +69,8 @@ class PatternDetector(Detector):
'lastDetectionTime': last_detection_time
}
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
logging.debug('Start recieve_data for analytic unit {}'.format(self.analytic_unit_id))
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
logging.debug('Start consume_data for analytic unit {}'.format(self.analytic_unit_id))
data_without_nan = data.dropna()
if len(data_without_nan) == 0:
@ -96,45 +78,21 @@ class PatternDetector(Detector):
self.bucket.receive_data(data_without_nan)
if cache == None:
logging.debug('Recieve_data cache is None for task {}'.format(self.analytic_unit_id))
logging.debug('consume_data cache is None for task {}'.format(self.analytic_unit_id))
cache = {}
bucket_size = max(cache.get('WINDOW_SIZE', 0) * 3, self.MIN_BUCKET_SIZE)
bucket_size = max(cache.get('WINDOW_SIZE', 0) * self.BUCKET_WINDOW_SIZE_FACTOR, self.MIN_BUCKET_SIZE)
res = self.detect(self.bucket.data, cache)
if len(self.bucket.data) > bucket_size:
excess_data = len(self.bucket.data) - bucket_size
self.bucket.drop_data(excess_data)
logging.debug('End recieve_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res))
logging.debug('End consume_data for analytic unit: {} with res: {}'.format(self.analytic_unit_id, res))
if res:
return res
else:
return None
def __get_data_chunks(self, dataframe: pd.DataFrame, window_size: int) -> Generator[pd.DataFrame, None, None]:
"""
TODO: fix description
Return generator, that yields dataframe's chunks. Chunks have 3 WINDOW_SIZE length and 2 WINDOW_SIZE step.
Example: recieved dataframe: [0, 1, 2, 3, 4, 5], returned chunks [0, 1, 2], [2, 3, 4], [4, 5].
"""
chunk_size = window_size * 100
intersection = window_size
data_len = len(dataframe)
if data_len < chunk_size:
return (chunk for chunk in (dataframe,))
def slices():
nonintersected = chunk_size - intersection
mod = data_len % nonintersected
chunks_number = data_len // nonintersected
offset = 0
for i in range(chunks_number):
yield slice(offset, offset + nonintersected + 1)
offset += nonintersected
yield slice(offset, offset + mod)
return (dataframe[chunk_slice] for chunk_slice in slices())
def get_window_size(self, cache: Optional[ModelCache]) -> int:
if cache is None: return self.DEFAULT_WINDOW_SIZE
return cache.get('WINDOW_SIZE', self.DEFAULT_WINDOW_SIZE)

9
analytics/analytics/detectors/threshold_detector.py

@ -14,6 +14,8 @@ logger = log.getLogger('THRESHOLD_DETECTOR')
class ThresholdDetector(Detector):
WINDOW_SIZE = 3
def __init__(self):
pass
@ -25,7 +27,7 @@ class ThresholdDetector(Detector):
}
}
async def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict:
def detect(self, dataframe: pd.DataFrame, cache: ModelCache) -> dict:
if cache == None:
raise 'Threshold detector error: cannot detect before learning'
value = cache['value']
@ -68,6 +70,9 @@ class ThresholdDetector(Detector):
'lastDetectionTime': now
}
def recieve_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
def consume_data(self, data: pd.DataFrame, cache: Optional[ModelCache]) -> Optional[dict]:
result = self.detect(data, cache)
return result if result else None
def get_window_size(self, cache: Optional[ModelCache]) -> int:
return self.WINDOW_SIZE

1
analytics/analytics/utils/__init__.py

@ -1,3 +1,4 @@
from utils.common import *
from utils.segments import *
from utils.time import *
from utils.dataframe import *

30
analytics/analytics/utils/dataframe.py

@ -0,0 +1,30 @@
from typing import Generator
import pandas as pd
def get_data_chunks(dataframe: pd.DataFrame, window_size: int, chunk_size: int) -> Generator[pd.DataFrame, None, None]:
"""
Returns generator that splits dataframe on intersected segments.
Intersection makes it able to detect pattern that present in dataframe on the border between chunks.
window_size - length of intersection.
chunk_size - length of chunk
"""
data_len = len(dataframe)
if data_len <= chunk_size:
yield dataframe
return
nonintersected = chunk_size - window_size
offset = 0
while True:
left_values = data_len - offset
if left_values == 0:
break
if left_values <= chunk_size:
yield dataframe[offset : data_len].reset_index()
break
else:
yield dataframe[offset: offset + chunk_size].reset_index()
offset += min(nonintersected, left_values)

29
analytics/tests/test_detector_chunks.py

@ -1,21 +1,38 @@
import unittest
from detectors.pattern_detector import PatternDetector
from utils import get_data_chunks
import pandas as pd
def rlist(start, stop):
return [x for x in range(start, stop + 1)]
class TestUtils(unittest.TestCase):
def test_chunks_generator(self):
window_size = 1
chunk_window_size_factor = 3
cases = [
(list(range(7)), [[0,1,2], [2,3,4], [4,5,6]]),
([], [[]]),
(rlist(0, 300), [rlist(0,99),rlist(99,198),rlist(198,297),rlist(297,300)])
(list(range(1)), [[0]]),
(list(range(3)), [[0,1,2]]),
(list(range(8)), [[0,1,2], [2,3,4], [4,5,6], [6,7]]),
(list(range(6)), [[0,1,2], [2,3,4], [4,5]])
]
for data, expected_chunks in cases:
chunks = tuple(PatternDetector._PatternDetector__get_data_chunks(None, data, window_size))
self.assertSequenceEqual(chunks, expected_chunks)
data = [(x,x) for x in data]
data = pd.DataFrame(data, columns=['timestamp', 'value'])
df_expected_chunks = []
for chunk in expected_chunks:
chunk = [(x,x) for x in chunk]
df_expected_chunks.append(chunk)
df_expected_chunks = [pd.DataFrame(chunk, columns=['timestamp', 'value']) for chunk in df_expected_chunks]
chunks = tuple(get_data_chunks(data, window_size, window_size * chunk_window_size_factor))
df_expected_chunks = [df.reset_index() for df in df_expected_chunks]
zipped = zip(chunks, df_expected_chunks)
map(lambda a,b: self.assertTrue(a.equals(b)), zipped)
if __name__ == '__main__':

Loading…
Cancel
Save