|
|
|
from typing import Dict
|
|
|
|
import pandas as pd
|
|
|
|
import logging, traceback
|
|
|
|
|
|
|
|
import detectors
|
|
|
|
from analytic_unit_worker import AnalyticUnitWorker
|
|
|
|
|
|
|
|
logger = logging.getLogger('AnalyticUnitManager')
|
|
|
|
|
|
|
|
AnalyticUnitId = str
|
|
|
|
analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
|
|
|
|
|
|
|
|
|
|
|
|
def get_detector_by_type(analytic_unit_type) -> detectors.Detector:
|
|
|
|
return detectors.PatternDetector(analytic_unit_type)
|
|
|
|
|
|
|
|
def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
|
|
|
|
if analytic_unit_id in analytic_workers:
|
|
|
|
# TODO: check that type is the same
|
|
|
|
return analytic_workers[analytic_unit_id]
|
|
|
|
detector = get_detector_by_type(analytic_unit_type)
|
|
|
|
worker = AnalyticUnitWorker(analytic_unit_id, detector)
|
|
|
|
analytic_workers[analytic_unit_id] = worker
|
|
|
|
return worker
|
|
|
|
|
|
|
|
async def handle_analytic_task(task):
|
|
|
|
try:
|
|
|
|
payload = task['payload']
|
|
|
|
|
|
|
|
worker = ensure_worker(task['analyticUnitId'], payload['pattern'])
|
|
|
|
|
|
|
|
data = prepare_data(payload['data'])
|
|
|
|
result_payload = {}
|
|
|
|
if task['type'] == 'LEARN':
|
|
|
|
result_payload = await worker.do_learn(payload['segments'], data, payload['cache'])
|
|
|
|
elif task['type'] == 'PREDICT':
|
|
|
|
result_payload = await worker.do_predict(data, payload['cache'])
|
|
|
|
else:
|
|
|
|
raise ValueError('Unknown task type "%s"' % task['type'])
|
|
|
|
return {
|
|
|
|
'status': 'SUCCESS',
|
|
|
|
'payload': result_payload
|
|
|
|
}
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
error_text = traceback.format_exc()
|
|
|
|
logger.error("handle_analytic_task exception: '%s'" % error_text)
|
|
|
|
# TODO: move result to a class which renders to json for messaging to analytics
|
|
|
|
return {
|
|
|
|
'status': 'FAILED',
|
|
|
|
'error': str(e)
|
|
|
|
}
|
|
|
|
|
|
|
|
def prepare_data(data: list):
|
|
|
|
"""
|
|
|
|
Takes list
|
|
|
|
- converts it into pd.DataFrame,
|
|
|
|
- converts 'timestamp' column to pd.Datetime,
|
|
|
|
- subtracts min value from dataset
|
|
|
|
"""
|
|
|
|
data = pd.DataFrame(data, columns=['timestamp', 'value'])
|
|
|
|
|
|
|
|
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms')
|
|
|
|
data['value'] = data['value'] - min(data['value'])
|
|
|
|
|
|
|
|
return data
|