From 081b9607cb19fe34119d361fc5dde64f3b6d5ecc Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 8 Jul 2018 21:55:36 +0300 Subject: [PATCH] Use anomaly IDs #42 --- analytics/pattern_detection_model.py | 12 +++--- analytics/server.py | 2 +- analytics/worker.py | 42 ++++++++++--------- .../src/controllers/analytics_controller.ts | 2 +- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/analytics/pattern_detection_model.py b/analytics/pattern_detection_model.py index dd69d58..ae7a269 100644 --- a/analytics/pattern_detection_model.py +++ b/analytics/pattern_detection_model.py @@ -24,8 +24,8 @@ def segments_box(segments): class PatternDetectionModel: - def __init__(self, predictor_id, pattern): - self.predictor_id = predictor_id + def __init__(self, analytic_unit_id, pattern): + self.analytic_unit_id = analytic_unit_id self.pattern = pattern self.__load_anomaly_config() @@ -101,16 +101,16 @@ class PatternDetectionModel: return StepDetector(pattern) def __load_anomaly_config(self): - with open(os.path.join(config.ANOMALIES_FOLDER, self.predictor_id + ".json"), 'r') as config_file: + with open(os.path.join(config.ANOMALIES_FOLDER, self.analytic_unit_id + ".json"), 'r') as config_file: self.anomaly_config = json.load(config_file) def __save_model(self): - logger.info("Save model '%s'" % self.predictor_id) - model_filename = os.path.join(config.MODELS_FOLDER, self.predictor_id + ".m") + logger.info("Save model '%s'" % self.analytic_unit_id) + model_filename = os.path.join(config.MODELS_FOLDER, self.analytic_unit_id + ".m") self.model.save(model_filename) def __load_model(self, pattern): - logger.info("Load model '%s'" % self.predictor_id) + logger.info("Load model '%s'" % self.analytic_unit_id) model_filename = os.path.join(config.MODELS_FOLDER, self.pattern + ".m") if os.path.exists(model_filename): self.model = self.__create_model(pattern) diff --git a/analytics/server.py b/analytics/server.py index 289778c..3416ee2 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -28,7 +28,7 @@ def handleTask(text): socket.send_string(json.dumps({ 'task': task['type'], - 'predictor_id': task['predictor_id'], + 'analyticUnitId': task['analyticUnitId'], '_taskId': task['_taskId'], 'status': "in progress" })) diff --git a/analytics/worker.py b/analytics/worker.py index 5df63ad..6741e5e 100644 --- a/analytics/worker.py +++ b/analytics/worker.py @@ -38,18 +38,19 @@ class Worker(object): def add_task(self, task): self.queue.put(task) + # TODO: get task as an object built from json def do_task(self, task): try: type = task['type'] - predictor_id = task['predictor_id'] + analytic_unit_id = task['analyticUnitId'] if type == "predict": - last_prediction_time = task['last_prediction_time'] + last_prediction_time = task['lastPredictionTime'] pattern = task['pattern'] - result = self.do_predict(predictor_id, last_prediction_time, pattern) + result = self.do_predict(analytic_unit_id, last_prediction_time, pattern) elif type == "learn": segments = task['segments'] pattern = task['pattern'] - result = self.do_learn(predictor_id, segments, pattern) + result = self.do_learn(analytic_unit_id, segments, pattern) else: result = { 'status': "failed", @@ -59,51 +60,54 @@ class Worker(object): #traceback.extract_stack() error_text = traceback.format_exc() logger.error("Exception: '%s'" % error_text) + # TODO: move result to a class which renders to json for messaging to analytics result = { 'task': type, 'status': "failed", - 'predictor_id': predictor_id, + 'analyticUnitId': analytic_unit_id, 'error': str(e) } return result - def do_learn(self, predictor_id, segments, pattern): - model = self.get_model(predictor_id, pattern) + def do_learn(self, analytic_unit_id, segments, pattern): + model = self.get_model(analytic_unit_id, pattern) model.synchronize_data() last_prediction_time = model.learn(segments) # TODO: we should not do predict before labeling in all models, not just in drops + if pattern == 'drops' and len(segments) == 0: + # TODO: move result to a class which renders to json for messaging to analytics result = { 'status': 'success', - 'predictor_id': predictor_id, + 'analyticUnitId': analytic_unit_id, 'segments': [], - 'last_prediction_time': last_prediction_time + 'lastPredictionTime': last_prediction_time } else: - result = self.do_predict(predictor_id, last_prediction_time, pattern) + result = self.do_predict(analytic_unit_id, last_prediction_time, pattern) result['task'] = 'learn' return result - def do_predict(self, predictor_id, last_prediction_time, pattern): - model = self.get_model(predictor_id, pattern) + def do_predict(self, analytic_unit_id, last_prediction_time, pattern): + model = self.get_model(analytic_unit_id, pattern) model.synchronize_data() segments, last_prediction_time = model.predict(last_prediction_time) return { 'task': "predict", 'status': "success", - 'analyticUnitId': predictor_id, + 'analyticUnitId': analytic_unit_id, 'segments': segments, 'lastPredictionTime': last_prediction_time } - def get_model(self, predictor_id, pattern): - if predictor_id not in self.models_cache: + def get_model(self, analytic_unit_id, pattern): + if analytic_unit_id not in self.models_cache: if pattern.find('general') != -1: - model = AnomalyModel(predictor_id) + model = AnomalyModel(analytic_unit_id) else: - model = PatternDetectionModel(predictor_id, pattern) - self.models_cache[predictor_id] = model - return self.models_cache[predictor_id] + model = PatternDetectionModel(analytic_unit_id, pattern) + self.models_cache[analytic_unit_id] = model + return self.models_cache[analytic_unit_id] diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 1b29294..2ae9111 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -64,7 +64,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { let pattern = unit.type; let task = { type: 'predict', - predictor_id: id, + analyticUnitId: id, pattern, lastPredictionTime: unit.lastPredictionTime };