You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

105 lines
3.7 KiB

from typing import Dict
import pandas as pd
import numpy as np
import logging as log
import traceback
from concurrent.futures import Executor, ThreadPoolExecutor
import detectors
from analytic_unit_worker import AnalyticUnitWorker
from models import ModelCache
logger = log.getLogger('AnalyticUnitManager')
WORKERS_EXECUTORS = 20
AnalyticUnitId = str
def get_detector_by_type(detector_type: str, analytic_unit_type: str) -> detectors.Detector:
if detector_type == 'pattern':
return detectors.PatternDetector(analytic_unit_type)
elif detector_type == 'threshold':
return detectors.ThresholdDetector()
raise ValueError('Unknown detector type "%s"' % detector_type)
def prepare_data(data: list):
"""
Takes list
- converts it into pd.DataFrame,
- converts 'timestamp' column to pd.Datetime,
- subtracts min value from dataset
"""
data = pd.DataFrame(data, columns=['timestamp', 'value'])
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms')
data.fillna(value = np.nan, inplace = True)
return data
class AnalyticUnitManager:
def __init__(self):
self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS)
def __ensure_worker(
self,
analytic_unit_id: AnalyticUnitId,
detector_type: str,
analytic_unit_type: str
) -> AnalyticUnitWorker:
if analytic_unit_id in self.analytic_workers:
# TODO: check that type is the same
return self.analytic_workers[analytic_unit_id]
detector = get_detector_by_type(detector_type, analytic_unit_type)
worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor)
self.analytic_workers[analytic_unit_id] = worker
return worker
async def __handle_analytic_task(self, task) -> dict:
"""
returns payload or None
"""
analytic_unit_id: AnalyticUnitId = task['analyticUnitId']
if task['type'] == 'CANCEL':
if analytic_unit_id in self.analytic_workers:
self.analytic_workers[analytic_unit_id].cancel()
return
payload = task['payload']
worker = self.__ensure_worker(analytic_unit_id, payload['detector'], payload['analyticUnitType'])
data = prepare_data(payload['data'])
if task['type'] == 'PUSH':
# TODO: do it a better way
res = await worker.recieve_data(data, payload['cache'])
res.update({ 'analyticUnitId': analytic_unit_id })
return res
elif task['type'] == 'LEARN':
if 'segments' in payload:
return await worker.do_train(payload['segments'], data, payload['cache'])
elif 'threshold' in payload:
return await worker.do_train(payload['threshold'], data, payload['cache'])
else:
raise ValueError('No segments or threshold in LEARN payload')
elif task['type'] == 'DETECT':
return await worker.do_detect(data, payload['cache'])
raise ValueError('Unknown task type "%s"' % task['type'])
async def handle_analytic_task(self, task):
try:
result_payload = await self.__handle_analytic_task(task)
result_message = {
'status': 'SUCCESS',
'payload': result_payload
}
return result_message
except Exception as e:
error_text = traceback.format_exc()
# TODO: move result to a class which renders to json for messaging to analytics
return {
'status': 'FAILED',
'error': str(e)
}