diff --git a/analytics/analytics/analytic_unit_manager.py b/analytics/analytics/analytic_unit_manager.py index b966826..f009bfb 100644 --- a/analytics/analytics/analytic_unit_manager.py +++ b/analytics/analytics/analytic_unit_manager.py @@ -2,56 +2,21 @@ from typing import Dict import pandas as pd import numpy as np import logging, traceback +from concurrent.futures import Executor, ThreadPoolExecutor import detectors from analytic_unit_worker import AnalyticUnitWorker + logger = logging.getLogger('AnalyticUnitManager') +WORKERS_EXECUTORS = 20 AnalyticUnitId = str -analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict() def get_detector_by_type(analytic_unit_type) -> detectors.Detector: return detectors.PatternDetector(analytic_unit_type) -def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: - if analytic_unit_id in analytic_workers: - # TODO: check that type is the same - return analytic_workers[analytic_unit_id] - detector = get_detector_by_type(analytic_unit_type) - worker = AnalyticUnitWorker(analytic_unit_id, detector) - analytic_workers[analytic_unit_id] = worker - return worker - -async def handle_analytic_task(task): - try: - payload = task['payload'] - - worker = ensure_worker(task['analyticUnitId'], payload['pattern']) - - data = prepare_data(payload['data']) - result_payload = {} - if task['type'] == 'LEARN': - result_payload = await worker.do_learn(payload['segments'], data, payload['cache']) - elif task['type'] == 'PREDICT': - result_payload = await worker.do_predict(data, payload['cache']) - else: - raise ValueError('Unknown task type "%s"' % task['type']) - return { - 'status': 'SUCCESS', - 'payload': result_payload - } - - except Exception as e: - error_text = traceback.format_exc() - logger.error("handle_analytic_task exception: '%s'" % error_text) - # TODO: move result to a class which renders to json for messaging to analytics - return { - 'status': 'FAILED', - 'error': str(e) - } - def prepare_data(data: list): """ Takes list @@ -66,3 +31,47 @@ def prepare_data(data: list): data['value'] = data['value'] - min(data['value']) return data + + +class AnalyticUnitManager: + + def __init__(self): + self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict() + self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS) + + def __ensure_worker(self, analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: + if analytic_unit_id in self.analytic_workers: + # TODO: check that type is the same + return self.analytic_workers[analytic_unit_id] + detector = get_detector_by_type(analytic_unit_type) + worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor) + self.analytic_workers[analytic_unit_id] = worker + return worker + + async def handle_analytic_task(self, task): + try: + payload = task['payload'] + worker = self.__ensure_worker(task['analyticUnitId'], payload['pattern']) + data = prepare_data(payload['data']) + result_payload = {} + if task['type'] == 'LEARN': + result_payload = await worker.do_learn(payload['segments'], data, payload['cache']) + elif task['type'] == 'PREDICT': + result_payload = await worker.do_predict(data, payload['cache']) + else: + raise ValueError('Unknown task type "%s"' % task['type']) + return { + 'status': 'SUCCESS', + 'payload': result_payload + } + + except Exception as e: + error_text = traceback.format_exc() + logger.error("handle_analytic_task exception: '%s'" % error_text) + # TODO: move result to a class which renders to json for messaging to analytics + return { + 'status': 'FAILED', + 'error': str(e) + } + + diff --git a/analytics/analytics/analytic_unit_worker.py b/analytics/analytics/analytic_unit_worker.py index 5584d5b..cc26efc 100644 --- a/analytics/analytics/analytic_unit_worker.py +++ b/analytics/analytics/analytic_unit_worker.py @@ -4,19 +4,26 @@ import logging import pandas as pd from typing import Optional from models import AnalyticUnitCache - +from concurrent.futures import Executor +import asyncio logger = logging.getLogger('AnalyticUnitWorker') class AnalyticUnitWorker: - def __init__(self, analytic_unit_id: str, detector: detectors.Detector): + def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: Executor): self.analytic_unit_id = analytic_unit_id self.detector = detector + self.executor: Executor = executor - async def do_learn(self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: - return await self.detector.train(data, segments, cache) + async def do_learn( + self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache] + ) -> AnalyticUnitCache: + new_cache: AnalyticUnitCache = await asyncio.get_event_loop().run_in_executor( + self.executor, self.detector.train, data, segments, cache + ) + return new_cache async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: - return await self.detector.predict(data, cache) + return self.detector.predict(data, cache) diff --git a/analytics/analytics/detectors/detector.py b/analytics/analytics/detectors/detector.py index 6e58653..e924145 100644 --- a/analytics/analytics/detectors/detector.py +++ b/analytics/analytics/detectors/detector.py @@ -7,9 +7,12 @@ from typing import Optional class Detector(ABC): @abstractmethod - async def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache: + """ + Should be thread-safe with other detectors' train method + """ pass @abstractmethod - async def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: + def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict: pass diff --git a/analytics/analytics/detectors/pattern_detector.py b/analytics/analytics/detectors/pattern_detector.py index 35bb72d..841fecf 100644 --- a/analytics/analytics/detectors/pattern_detector.py +++ b/analytics/analytics/detectors/pattern_detector.py @@ -35,14 +35,14 @@ class PatternDetector(Detector): self.model = resolve_model_by_pattern(self.pattern_type) window_size = 100 - async def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: + def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache: # TODO: pass only part of dataframe that has segments new_cache = self.model.fit(dataframe, segments, cache) return { 'cache': new_cache } - async def predict(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict: + def predict(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict: # TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643) predicted = self.model.predict(dataframe, cache) diff --git a/analytics/bin/server b/analytics/bin/server index e164808..929436e 100644 --- a/analytics/bin/server +++ b/analytics/bin/server @@ -13,7 +13,7 @@ import asyncio import traceback import services -from analytic_unit_manager import handle_analytic_task +from analytic_unit_manager import AnalyticUnitManager root = logging.getLogger() @@ -22,6 +22,8 @@ logger = logging.getLogger('SERVER') server_service: services.ServerService = None data_service: services.DataService = None +analytic_unit_manager: AnalyticUnitManager = None + root.setLevel(logging.DEBUG) @@ -36,8 +38,6 @@ logging_handler.setFormatter(logging_formatter) root.addHandler(logging_handler) - - async def handle_task(task: object): try: @@ -53,7 +53,7 @@ async def handle_task(task: object): message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) await server_service.send_message(message) - res = await handle_analytic_task(task) + res = await analytic_unit_manager.handle_analytic_task(task) res['_id'] = task['_id'] message = services.server_service.ServerMessage('TASK_RESULT', res) @@ -68,7 +68,6 @@ async def handle_message(message: services.ServerMessage): if message.method == 'TASK': await handle_task(message.payload) - def init_services(): logger.info("Starting services...") logger.info("Server...") @@ -77,18 +76,20 @@ def init_services(): logger.info("Data service...") data_service = services.DataService(server_service) logger.info("Ok") + logger.info("Analytic unit manager...") + analytic_unit_manager = AnalyticUnitManager() + logger.info("Ok") - return server_service, data_service + return server_service, data_service, analytic_unit_manager async def app_loop(): await server_service.handle_loop() - # await asyncio.gather(server_service.handle_loop(), test_file_save()) if __name__ == "__main__": loop = asyncio.get_event_loop() logger.info("Ok") - server_service, data_service = init_services() + server_service, data_service, analytic_unit_manager = init_services() print('Analytics process is running') # we need to print to stdout and flush sys.stdout.flush() # because node.js expects it