diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 6d38b6c..65b39e5 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -7,84 +7,65 @@ import traceback import time -logger = logging.getLogger('WORKER') +logger = logging.getLogger('AnalyticUnitWorker') -class AnalyticUnitWorker(object): - detectors_cache = {} - # TODO: get task as an object built from json +class AnalyticUnitWorker: + + def get_detector(self, analytic_unit_id, pattern_type): + if analytic_unit_id not in self.detectors_cache: + if pattern_type == 'GENERAL': + detector = detectors.GeneralDetector(analytic_unit_id) + else: + detector = detectors.PatternDetector(analytic_unit_id, pattern_type) + self.detectors_cache[analytic_unit_id] = detector + return self.detectors_cache[analytic_unit_id] + + def __init__(self, detector: detectors.Detector): + pass + async def do_task(self, task): try: type = task['type'] analytic_unit_id = task['analyticUnitId'] payload = task['payload'] if type == "PREDICT": - result = await self.do_predict(analytic_unit_id, payload) + result_payload = await self.do_predict(analytic_unit_id, payload) elif type == "LEARN": - result = await self.do_learn(analytic_unit_id, payload) + result_payload = await self.do_learn(analytic_unit_id, payload) else: - result = { - 'status': "FAILED", - 'error': "unknown type " + str(type) - } + raise ValueError('Unknown task type %s' % type) + except Exception as e: #traceback.extract_stack() error_text = traceback.format_exc() logger.error("do_task Exception: '%s'" % error_text) # TODO: move result to a class which renders to json for messaging to analytics result = { - 'task': type, 'status': "FAILED", - 'analyticUnitId': analytic_unit_id, 'error': str(e) } - return result + return { + 'status': 'SUCCESS', + 'payload': result_payload + } - async def do_learn(self, analytic_unit_id, payload): + async def do_learn(self, analytic_unit_id, payload) -> None: pattern = payload['pattern'] segments = payload['segments'] data = payload['data'] # [time, value][] detector = self.get_detector(analytic_unit_id, pattern) - detector.synchronize_data() - last_prediction_time = await detector.learn(segments) - # TODO: we should not do predict before labeling in all models, not just in drops - - if pattern == 'DROP' and len(segments) == 0: - # TODO: move result to a class which renders to json for messaging to analytics - result = { - 'status': 'SUCCESS', - 'analyticUnitId': analytic_unit_id, - 'segments': [], - 'lastPredictionTime': last_prediction_time - } - else: - result = await self.do_predict(analytic_unit_id, last_prediction_time, pattern) - - result['task'] = 'LEARN' - return result + await detector.learn(segments) async def do_predict(self, analytic_unit_id, payload): pattern = payload['pattern'] - last_prediction_time = payload['lastPredictionTime'] + data = payload['data'] # [time, value][] detector = self.get_detector(analytic_unit_id, pattern) - detector.synchronize_data() - segments, last_prediction_time = await detector.predict(last_prediction_time) + segments, last_prediction_time = await detector.predict(data) return { - 'task': 'PREDICT', - 'status': 'SUCCESS', - 'analyticUnitId': analytic_unit_id, 'segments': segments, 'lastPredictionTime': last_prediction_time } - - def get_detector(self, analytic_unit_id, pattern_type): - if analytic_unit_id not in self.detectors_cache: - if pattern_type == 'GENERAL': - detector = detectors.GeneralDetector(analytic_unit_id) - else: - detector = detectors.PatternDetector(analytic_unit_id, pattern_type) - self.detectors_cache[analytic_unit_id] = detector - return self.detectors_cache[analytic_unit_id] diff --git a/analytics/detectors/__init__.py b/analytics/detectors/__init__.py index 145dc6d..ff8e6ab 100644 --- a/analytics/detectors/__init__.py +++ b/analytics/detectors/__init__.py @@ -1,3 +1,3 @@ +from detectors.detector import Detector from detectors.pattern_detector import PatternDetector -# TODO: do something with general detector from detectors.general_detector import GeneralDetector diff --git a/analytics/detectors/detector.py b/analytics/detectors/detector.py new file mode 100644 index 0000000..c801664 --- /dev/null +++ b/analytics/detectors/detector.py @@ -0,0 +1,13 @@ +from abc import ABC, abstractmethod +from pandas import DataFrame + + +class Detector(ABC): + + @abstractmethod + def fit(self, dataframe: DataFrame, segments: list): + pass + + @abstractmethod + def predict(self, dataframe: DataFrame) -> list: + pass diff --git a/analytics/models/model.py b/analytics/models/model.py index b6a8abc..05dbf1e 100644 --- a/analytics/models/model.py +++ b/analytics/models/model.py @@ -1,17 +1,8 @@ from abc import ABC, abstractmethod from pandas import DataFrame -import pickle + class Model(ABC): - - def __init__(self): - """ - Variables which are obtained as a result of fit() method - should be stored in self.state dict - in order to be saved in model file - """ - self.state = {} - self.segments = [] @abstractmethod def fit(self, dataframe: DataFrame, segments: list): @@ -20,11 +11,3 @@ class Model(ABC): @abstractmethod def predict(self, dataframe: DataFrame) -> list: pass - - def save(self, model_filename: str): - with open(model_filename, 'wb') as file: - pickle.dump(self.state, file) - - def load(self, model_filename: str): - with open(model_filename, 'rb') as f: - self.state = pickle.load(f)