From cccbf1193bcac729ba4102bc6fd3b25212ded901 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Thu, 19 Jul 2018 20:30:36 +0300 Subject: [PATCH] asyncio usage (#88) * asyncio integration (buggy) PEAKS_DETECTION doesnt works --- analytics/analytic_unit_worker.py | 42 ++++-------------- analytics/data_provider.py | 5 ++- analytics/detectors/general_detector.py | 6 +-- .../detectors/pattern_detection_model.py | 6 +-- analytics/detectors/peaks_detector.py | 2 +- analytics/detectors/step_detector.py | 2 +- analytics/jump_detector.py | 2 +- analytics/server.py | 44 +++++++++++-------- analytics/supervised_algorithm.py | 2 +- 9 files changed, 46 insertions(+), 65 deletions(-) diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 200f274..4c6b528 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -1,8 +1,6 @@ import config from detectors.general_detector import GeneralDetector from detectors.pattern_detection_model import PatternDetectionModel -import queue -import threading import json import logging import sys @@ -15,42 +13,20 @@ logger = logging.getLogger('WORKER') class AnalyticUnitWorker(object): models_cache = {} - thread = None - queue = queue.Queue() - - def start(self): - self.thread = threading.Thread(target=self.run) - self.thread.start() - - def stop(self): - if self.thread: - self.queue.put(None) - self.thread.join() - - def run(self): - while True: - task = self.queue.get() - if task['type'] == "stop": - break - self.do_task(task) - self.queue.task_done() - - def add_task(self, task): - self.queue.put(task) # TODO: get task as an object built from json - def do_task(self, task): + async def do_task(self, task): try: type = task['type'] analytic_unit_id = task['analyticUnitId'] if type == "predict": last_prediction_time = task['lastPredictionTime'] pattern = task['pattern'] - result = self.do_predict(analytic_unit_id, last_prediction_time, pattern) + result = await self.do_predict(analytic_unit_id, last_prediction_time, pattern) elif type == "learn": segments = task['segments'] pattern = task['pattern'] - result = self.do_learn(analytic_unit_id, segments, pattern) + result = await self.do_learn(analytic_unit_id, segments, pattern) else: result = { 'status': "failed", @@ -69,10 +45,10 @@ class AnalyticUnitWorker(object): } return result - def do_learn(self, analytic_unit_id, segments, pattern): + async def do_learn(self, analytic_unit_id, segments, pattern): model = self.get_model(analytic_unit_id, pattern) model.synchronize_data() - last_prediction_time = model.learn(segments) + last_prediction_time = await model.learn(segments) # TODO: we should not do predict before labeling in all models, not just in drops if pattern == 'drop' and len(segments) == 0: @@ -84,15 +60,15 @@ class AnalyticUnitWorker(object): 'lastPredictionTime': last_prediction_time } else: - result = self.do_predict(analytic_unit_id, last_prediction_time, pattern) + result = await self.do_predict(analytic_unit_id, last_prediction_time, pattern) result['task'] = 'learn' return result - def do_predict(self, analytic_unit_id, last_prediction_time, pattern): + async def do_predict(self, analytic_unit_id, last_prediction_time, pattern): model = self.get_model(analytic_unit_id, pattern) model.synchronize_data() - segments, last_prediction_time = model.predict(last_prediction_time) + segments, last_prediction_time = await model.predict(last_prediction_time) return { 'task': "predict", 'status': "success", @@ -109,5 +85,3 @@ class AnalyticUnitWorker(object): model = PatternDetectionModel(analytic_unit_id, pattern_type) self.models_cache[analytic_unit_id] = model return self.models_cache[analytic_unit_id] - - diff --git a/analytics/data_provider.py b/analytics/data_provider.py index 5653084..bbe4491 100644 --- a/analytics/data_provider.py +++ b/analytics/data_provider.py @@ -7,6 +7,7 @@ import json from time import time from config import HASTIC_API_KEY + MS_IN_WEEK = 604800000 class DataProvider: @@ -118,8 +119,8 @@ class DataProvider: 'values': [] } - after_time = int(time()*1000 - MS_IN_WEEK) - before_time = int(time()*1000) + after_time = int(time() * 1000 - MS_IN_WEEK) + before_time = int(time() * 1000) while True: params['q'] = self.custom_query(str(after_time) + 'ms', str(before_time) + 'ms') serie = self.__query_grafana(params) diff --git a/analytics/detectors/general_detector.py b/analytics/detectors/general_detector.py index 2e382bc..e3c7c01 100644 --- a/analytics/detectors/general_detector.py +++ b/analytics/detectors/general_detector.py @@ -56,7 +56,7 @@ class GeneralDetector: max_time = pd.to_datetime(max_time, unit='ms') return min_time, max_time - def learn(self, anomalies): + async def learn(self, anomalies): logger.info("Start to learn for anomaly_name='%s'" % self.anomaly_name) confidence = 0.02 @@ -86,7 +86,7 @@ class GeneralDetector: logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) return last_prediction_time - def predict(self, last_prediction_time): + async def predict(self, last_prediction_time): logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') @@ -105,7 +105,7 @@ class GeneralDetector: assert(len(predict_augmented) == chunk_finish - chunk_start) - predicted_current = self.model.predict(predict_augmented) + predicted_current = await self.model.predict(predict_augmented) predicted = pd.concat([predicted, predicted_current]) predicted_anomalies = self.preprocessor.inverse_transform_anomalies(predicted) diff --git a/analytics/detectors/pattern_detection_model.py b/analytics/detectors/pattern_detection_model.py index 47fb125..c4aa9d8 100644 --- a/analytics/detectors/pattern_detection_model.py +++ b/analytics/detectors/pattern_detection_model.py @@ -52,7 +52,7 @@ class PatternDetectionModel: self.model = None self.__load_model(pattern_type) - def learn(self, segments): + async def learn(self, segments): self.model = self.__create_model(self.pattern_type) window_size = 200 @@ -64,7 +64,7 @@ class PatternDetectionModel: self.__save_model() return 0 - def predict(self, last_prediction_time): + async def predict(self, last_prediction_time): if self.model is None: return [], last_prediction_time @@ -75,7 +75,7 @@ class PatternDetectionModel: start_index = max(0, start_index - window_size) dataframe = self.data_prov.get_data_range(start_index) - predicted_indexes = self.model.predict(dataframe) + predicted_indexes = await self.model.predict(dataframe) predicted_indexes = [(x, y) for (x, y) in predicted_indexes if x >= start_index and y >= start_index] predicted_times = self.data_prov.inverse_transform_indexes(predicted_indexes) diff --git a/analytics/detectors/peaks_detector.py b/analytics/detectors/peaks_detector.py index cc4170f..c9a1610 100644 --- a/analytics/detectors/peaks_detector.py +++ b/analytics/detectors/peaks_detector.py @@ -11,7 +11,7 @@ class PeaksDetector: def fit(self, dataset, contamination=0.005): pass - def predict(self, dataframe): + async def predict(self, dataframe): array = dataframe['value'].as_matrix() window_size = 20 # window = np.ones(101) diff --git a/analytics/detectors/step_detector.py b/analytics/detectors/step_detector.py index 4fbf8ef..a92234a 100644 --- a/analytics/detectors/step_detector.py +++ b/analytics/detectors/step_detector.py @@ -56,7 +56,7 @@ class StepDetector: else: self.convolve_max = 570000 - def predict(self, dataframe): + async def predict(self, dataframe): data = dataframe['value'] result = self.__predict(data) diff --git a/analytics/jump_detector.py b/analytics/jump_detector.py index 87c5770..22dab4f 100644 --- a/analytics/jump_detector.py +++ b/analytics/jump_detector.py @@ -76,7 +76,7 @@ class Jumpdetector: distribution.append(F) return distribution - def predict(self, dataframe): + async def predict(self, dataframe): data = dataframe['value'] result = self.__predict(data) diff --git a/analytics/server.py b/analytics/server.py index 47fefe4..d058bd4 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -2,7 +2,9 @@ import config import json import logging import zmq +import zmq.asyncio import sys +import asyncio from analytic_unit_worker import AnalyticUnitWorker @@ -22,45 +24,49 @@ ch.setFormatter(formatter) root.addHandler(ch) -def handle_ping(): - socket.send(b'pong') +async def server_handle_loop(): + while True: + received_bytes = await socket.recv() + text = received_bytes.decode('utf-8') + + if text == 'ping': + asyncio.ensure_future(handle_ping()) + else: + asyncio.ensure_future(handle_task(text)) + +async def server_send_message(string): + await socket.send_string(string) -def handle_task(text): +async def handle_ping(): + await socket.send(b'pong') + +async def handle_task(text): try: task = json.loads(text) logger.info("Command is OK") - socket.send_string(json.dumps({ + await server_send_message(json.dumps({ '_taskId': task['_taskId'], 'task': task['type'], 'analyticUnitId': task['analyticUnitId'], 'status': "in progress" })) - - res = worker.do_task(task) + + res = await worker.do_task(task) res['_taskId'] = task['_taskId'] - socket.send_string(json.dumps(res)) + await server_send_message(json.dumps(res)) except Exception as e: logger.error("Exception: '%s'" % str(e)) - if __name__ == "__main__": + loop = asyncio.get_event_loop() worker = AnalyticUnitWorker() logger.info("Worker was started") logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) - context = zmq.Context() + context = zmq.asyncio.Context() socket = context.socket(zmq.PAIR) socket.bind(config.ZEROMQ_CONNECTION_STRING) logger.info("Ok") - - while True: - received_bytes = socket.recv() - text = received_bytes.decode('utf-8') - if text == 'ping': - handle_ping() - else: - handle_task(text) - - + loop.run_until_complete(server_handle_loop()) diff --git a/analytics/supervised_algorithm.py b/analytics/supervised_algorithm.py index 59da1a4..dbc7340 100644 --- a/analytics/supervised_algorithm.py +++ b/analytics/supervised_algorithm.py @@ -44,7 +44,7 @@ class supervised_algorithm(object): dataset = self.scaler.transform(dataset) self.clf.fit(dataset) - def predict(self, dataframe): + async def predict(self, dataframe): dataset = dataframe[self.good_features] dataset = self.scaler.transform(dataset) prediction = self.clf.predict(dataset)