diff --git a/analytics/Codestyle.md b/analytics/Codestyle.md index 0d724e3..cf0e4fb 100644 --- a/analytics/Codestyle.md +++ b/analytics/Codestyle.md @@ -1,6 +1,10 @@ +# Type hints + +Please use: https://www.python.org/dev/peps/pep-0484/ + # Line endings -We use CRLS everywhere +We use LF everywhere # Imports diff --git a/analytics/analytic_unit_manager.py b/analytics/analytic_unit_manager.py new file mode 100644 index 0000000..68302f0 --- /dev/null +++ b/analytics/analytic_unit_manager.py @@ -0,0 +1,50 @@ +from typing import Dict + +import detectors +from analytic_unit_worker import AnalyticUnitWorker + +analytic_unit_id = str +analytic_workers: Dict[analytic_unit_id, AnalyticUnitWorker] = Dict() + +def get_detector(self, analytic_unit_type) -> detectors.Detector: + if analytic_unit_type == 'GENERAL': + detector = detectors.GeneralDetector() + else: + detector = detectors.PatternDetector(analytic_unit_type) + return detector + +def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: + if analytic_unit_id in analytic_workers: + # TODO: check that type is the same + return analytic_workers[analytic_unit_id] + detector = get_detector(analytic_unit_type) + worker = AnalyticUnitWorker(analytic_unit_id, detector) + analytic_workers[analytic_unit_id] = worker + return worker + +async def handle_analytic_task(task): + try: + worker = ensure_worker(task['analyticUnitId'], task['type']) + payload = task['payload'] + if type == "PREDICT": + result_payload = await worker.do_predict(analytic_unit_id, payload) + elif type == "LEARN": + result_payload = await worker.do_learn(analytic_unit_id, payload) + else: + raise ValueError('Unknown task type "%s"' % type) + + except Exception as e: + #traceback.extract_stack() + error_text = traceback.format_exc() + logger.error("do_task Exception: '%s'" % error_text) + # TODO: move result to a class which renders to json for messaging to analytics + result = { + 'status': "FAILED", + 'error': str(e) + } + return { + 'status': 'SUCCESS', + 'payload': result_payload + } + + diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 65b39e5..a0c1f9d 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -7,57 +7,20 @@ import traceback import time - logger = logging.getLogger('AnalyticUnitWorker') class AnalyticUnitWorker: - def get_detector(self, analytic_unit_id, pattern_type): - if analytic_unit_id not in self.detectors_cache: - if pattern_type == 'GENERAL': - detector = detectors.GeneralDetector(analytic_unit_id) - else: - detector = detectors.PatternDetector(analytic_unit_id, pattern_type) - self.detectors_cache[analytic_unit_id] = detector - return self.detectors_cache[analytic_unit_id] - - def __init__(self, detector: detectors.Detector): - pass - - async def do_task(self, task): - try: - type = task['type'] - analytic_unit_id = task['analyticUnitId'] - payload = task['payload'] - if type == "PREDICT": - result_payload = await self.do_predict(analytic_unit_id, payload) - elif type == "LEARN": - result_payload = await self.do_learn(analytic_unit_id, payload) - else: - raise ValueError('Unknown task type %s' % type) - - except Exception as e: - #traceback.extract_stack() - error_text = traceback.format_exc() - logger.error("do_task Exception: '%s'" % error_text) - # TODO: move result to a class which renders to json for messaging to analytics - result = { - 'status': "FAILED", - 'error': str(e) - } - return { - 'status': 'SUCCESS', - 'payload': result_payload - } + def __init__(self, analytic_unit_id: str, detector: detectors.Detector): + self.analytic_unit_id = analytic_unit_id + self.detector = detector async def do_learn(self, analytic_unit_id, payload) -> None: pattern = payload['pattern'] segments = payload['segments'] data = payload['data'] # [time, value][] - - detector = self.get_detector(analytic_unit_id, pattern) - await detector.learn(segments) + await self.detector.train(data, segments) async def do_predict(self, analytic_unit_id, payload): pattern = payload['pattern'] diff --git a/analytics/detectors/detector.py b/analytics/detectors/detector.py index c801664..5489fd6 100644 --- a/analytics/detectors/detector.py +++ b/analytics/detectors/detector.py @@ -5,9 +5,9 @@ from pandas import DataFrame class Detector(ABC): @abstractmethod - def fit(self, dataframe: DataFrame, segments: list): + async def train(self, dataframe: DataFrame, segments: list): pass @abstractmethod - def predict(self, dataframe: DataFrame) -> list: + async def predict(self, dataframe: DataFrame) -> list: pass diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py index 086e1f7..5225ef7 100644 --- a/analytics/detectors/general_detector/general_detector.py +++ b/analytics/detectors/general_detector/general_detector.py @@ -1,4 +1,5 @@ from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm +from detectors import Detector import utils # from grafana_data_provider import GrafanaDataProvider from data_preprocessor import data_preprocessor @@ -15,15 +16,13 @@ NANOSECONDS_IN_MS = 1000000 logger = logging.getLogger('analytic_toolset') -class GeneralDetector: +class GeneralDetector(Detector): - def __init__(self, anomaly_name): - self.anomaly_name = anomaly_name + def __init__(self): self.model = None self.__load_model() - async def learn(self, segments, data): - logger.info("Start to learn for anomaly_name='%s'" % self.anomaly_name) + async def train(self, segments, data): confidence = 0.02 dataframe = data # make dataframae from array @@ -40,7 +39,7 @@ class GeneralDetector: segments ) - self.model = self.create_algorithm() + self.model = SupervisedAlgorithm() await self.model.fit(train_augmented, confidence) if len(segments) > 0: last_dataframe_time = dataframe.iloc[-1]['timestamp'] @@ -52,7 +51,7 @@ class GeneralDetector: logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) return last_prediction_time - async def predict(self, last_prediction_time): + async def predict(self, data): logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') @@ -83,15 +82,3 @@ class GeneralDetector: logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name) return predicted_anomalies, last_prediction_time - - - def create_algorithm(self): - return SupervisedAlgorithm() - - def __save_model(self): - pass - # TODO: use data_service to save anything - - def __load_model(self): - pass - # TODO: use data_service to save anything diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index affae3e..c1e0be4 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -11,9 +11,10 @@ import config import pandas as pd +from detectors import Detector -logger = logging.getLogger('analytic_toolset') +logger = logging.getLogger('analytic_toolset') def resolve_model_by_pattern(pattern: str) -> models.Model: @@ -28,30 +29,20 @@ def resolve_model_by_pattern(pattern: str) -> models.Model: raise ValueError('Unknown pattern "%s"' % pattern) -class PatternDetector: +class PatternDetector(Detector): - def __init__(self, analytic_unit_id, pattern_type): - self.analytic_unit_id = analytic_unit_id + def __init__(self, pattern_type): self.pattern_type = pattern_type - - self.model = None - self.__load_model(pattern_type) - - async def learn(self, segments, data): self.model = resolve_model_by_pattern(self.pattern_type) - window_size = 200 + window_size = 100 + async def train(self, dataframe: pd.DataFrame, segments: list): # TODO: pass only part of dataframe that has segments - self.model.fit(dataframe, segments, data) + self.model.fit(dataframe, data, segments) self.__save_model() return 0 - async def predict(self, last_prediction_time, data): - if self.model is None: - return [], last_prediction_time - - window_size = 100 - last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') + async def predict(self, data): start_index = self.data_prov.get_upper_bound(last_prediction_time) start_index = max(0, start_index - window_size) @@ -73,12 +64,3 @@ class PatternDetector: last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = int(last_dataframe_time.timestamp() * 1000) return segments, last_prediction_time - # return predicted_anomalies, last_prediction_time - - def __save_model(self): - pass - # TODO: use data_service to save anything - - def __load_model(self, pattern): - pass - # TODO: use data_service to save anything diff --git a/analytics/server.py b/analytics/server.py index 20db571..8cc62d4 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -5,13 +5,13 @@ import sys import asyncio import services -from analytic_unit_worker import AnalyticUnitWorker +from analytic_unit_manager import handle_analytic_task root = logging.getLogger() logger = logging.getLogger('SERVER') -worker: AnalyticUnitWorker = None + server_service: services.ServerService = None data_service: services.DataService = None @@ -28,6 +28,8 @@ logging_handler.setFormatter(logging_formatter) root.addHandler(logging_handler) + + async def handle_task(task: object): try: @@ -43,7 +45,7 @@ async def handle_task(task: object): message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) await server_service.send_message(message) - res = await worker.do_task(task) + res = await handle_analytic_task(task) res['_id'] = task['_id'] message = services.server_service.ServerMessage('TASK_RESULT', res)