Browse Source

Workers for analyticunits #203 (#265)

* rm async from analytic_unit_worker + some refactorings in maager

* AnalyticUnitManager

* workers for analytic units
pull/1/head
Alexey Velikiy 6 years ago committed by GitHub
parent
commit
7ec2616cb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 85
      analytics/analytics/analytic_unit_manager.py
  2. 17
      analytics/analytics/analytic_unit_worker.py
  3. 7
      analytics/analytics/detectors/detector.py
  4. 4
      analytics/analytics/detectors/pattern_detector.py
  5. 17
      analytics/bin/server

85
analytics/analytics/analytic_unit_manager.py

@ -2,56 +2,21 @@ from typing import Dict
import pandas as pd
import numpy as np
import logging, traceback
from concurrent.futures import Executor, ThreadPoolExecutor
import detectors
from analytic_unit_worker import AnalyticUnitWorker
logger = logging.getLogger('AnalyticUnitManager')
WORKERS_EXECUTORS = 20
AnalyticUnitId = str
analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
def get_detector_by_type(analytic_unit_type) -> detectors.Detector:
return detectors.PatternDetector(analytic_unit_type)
def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
if analytic_unit_id in analytic_workers:
# TODO: check that type is the same
return analytic_workers[analytic_unit_id]
detector = get_detector_by_type(analytic_unit_type)
worker = AnalyticUnitWorker(analytic_unit_id, detector)
analytic_workers[analytic_unit_id] = worker
return worker
async def handle_analytic_task(task):
try:
payload = task['payload']
worker = ensure_worker(task['analyticUnitId'], payload['pattern'])
data = prepare_data(payload['data'])
result_payload = {}
if task['type'] == 'LEARN':
result_payload = await worker.do_learn(payload['segments'], data, payload['cache'])
elif task['type'] == 'PREDICT':
result_payload = await worker.do_predict(data, payload['cache'])
else:
raise ValueError('Unknown task type "%s"' % task['type'])
return {
'status': 'SUCCESS',
'payload': result_payload
}
except Exception as e:
error_text = traceback.format_exc()
logger.error("handle_analytic_task exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics
return {
'status': 'FAILED',
'error': str(e)
}
def prepare_data(data: list):
"""
Takes list
@ -66,3 +31,47 @@ def prepare_data(data: list):
data['value'] = data['value'] - min(data['value'])
return data
class AnalyticUnitManager:
def __init__(self):
self.analytic_workers: Dict[AnalyticUnitId, AnalyticUnitWorker] = dict()
self.workers_executor = ThreadPoolExecutor(max_workers=WORKERS_EXECUTORS)
def __ensure_worker(self, analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
if analytic_unit_id in self.analytic_workers:
# TODO: check that type is the same
return self.analytic_workers[analytic_unit_id]
detector = get_detector_by_type(analytic_unit_type)
worker = AnalyticUnitWorker(analytic_unit_id, detector, self.workers_executor)
self.analytic_workers[analytic_unit_id] = worker
return worker
async def handle_analytic_task(self, task):
try:
payload = task['payload']
worker = self.__ensure_worker(task['analyticUnitId'], payload['pattern'])
data = prepare_data(payload['data'])
result_payload = {}
if task['type'] == 'LEARN':
result_payload = await worker.do_learn(payload['segments'], data, payload['cache'])
elif task['type'] == 'PREDICT':
result_payload = await worker.do_predict(data, payload['cache'])
else:
raise ValueError('Unknown task type "%s"' % task['type'])
return {
'status': 'SUCCESS',
'payload': result_payload
}
except Exception as e:
error_text = traceback.format_exc()
logger.error("handle_analytic_task exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics
return {
'status': 'FAILED',
'error': str(e)
}

17
analytics/analytics/analytic_unit_worker.py

@ -4,19 +4,26 @@ import logging
import pandas as pd
from typing import Optional
from models import AnalyticUnitCache
from concurrent.futures import Executor
import asyncio
logger = logging.getLogger('AnalyticUnitWorker')
class AnalyticUnitWorker:
def __init__(self, analytic_unit_id: str, detector: detectors.Detector):
def __init__(self, analytic_unit_id: str, detector: detectors.Detector, executor: Executor):
self.analytic_unit_id = analytic_unit_id
self.detector = detector
self.executor: Executor = executor
async def do_learn(self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache:
return await self.detector.train(data, segments, cache)
async def do_learn(
self, segments: list, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]
) -> AnalyticUnitCache:
new_cache: AnalyticUnitCache = await asyncio.get_event_loop().run_in_executor(
self.executor, self.detector.train, data, segments, cache
)
return new_cache
async def do_predict(self, data: pd.DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
return await self.detector.predict(data, cache)
return self.detector.predict(data, cache)

7
analytics/analytics/detectors/detector.py

@ -7,9 +7,12 @@ from typing import Optional
class Detector(ABC):
@abstractmethod
async def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache:
def train(self, dataframe: DataFrame, segments: list, cache: Optional[AnalyticUnitCache]) -> AnalyticUnitCache:
"""
Should be thread-safe with other detectors' train method
"""
pass
@abstractmethod
async def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
def predict(self, dataframe: DataFrame, cache: Optional[AnalyticUnitCache]) -> dict:
pass

4
analytics/analytics/detectors/pattern_detector.py

@ -35,14 +35,14 @@ class PatternDetector(Detector):
self.model = resolve_model_by_pattern(self.pattern_type)
window_size = 100
async def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache:
def train(self, dataframe: pd.DataFrame, segments: list, cache: Optional[models.AnalyticUnitCache]) -> models.AnalyticUnitCache:
# TODO: pass only part of dataframe that has segments
new_cache = self.model.fit(dataframe, segments, cache)
return {
'cache': new_cache
}
async def predict(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict:
def predict(self, dataframe: pd.DataFrame, cache: Optional[models.AnalyticUnitCache]) -> dict:
# TODO: split and sleep (https://github.com/hastic/hastic-server/pull/124#discussion_r214085643)
predicted = self.model.predict(dataframe, cache)

17
analytics/bin/server

@ -13,7 +13,7 @@ import asyncio
import traceback
import services
from analytic_unit_manager import handle_analytic_task
from analytic_unit_manager import AnalyticUnitManager
root = logging.getLogger()
@ -22,6 +22,8 @@ logger = logging.getLogger('SERVER')
server_service: services.ServerService = None
data_service: services.DataService = None
analytic_unit_manager: AnalyticUnitManager = None
root.setLevel(logging.DEBUG)
@ -36,8 +38,6 @@ logging_handler.setFormatter(logging_formatter)
root.addHandler(logging_handler)
async def handle_task(task: object):
try:
@ -53,7 +53,7 @@ async def handle_task(task: object):
message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload)
await server_service.send_message(message)
res = await handle_analytic_task(task)
res = await analytic_unit_manager.handle_analytic_task(task)
res['_id'] = task['_id']
message = services.server_service.ServerMessage('TASK_RESULT', res)
@ -68,7 +68,6 @@ async def handle_message(message: services.ServerMessage):
if message.method == 'TASK':
await handle_task(message.payload)
def init_services():
logger.info("Starting services...")
logger.info("Server...")
@ -77,18 +76,20 @@ def init_services():
logger.info("Data service...")
data_service = services.DataService(server_service)
logger.info("Ok")
logger.info("Analytic unit manager...")
analytic_unit_manager = AnalyticUnitManager()
logger.info("Ok")
return server_service, data_service
return server_service, data_service, analytic_unit_manager
async def app_loop():
await server_service.handle_loop()
# await asyncio.gather(server_service.handle_loop(), test_file_save())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
logger.info("Ok")
server_service, data_service = init_services()
server_service, data_service, analytic_unit_manager = init_services()
print('Analytics process is running') # we need to print to stdout and flush
sys.stdout.flush() # because node.js expects it

Loading…
Cancel
Save