Browse Source

asyncio usage (#88)

* asyncio integration (buggy)

PEAKS_DETECTION doesnt works
pull/1/head
Alexey Velikiy 7 years ago committed by GitHub
parent
commit
cccbf1193b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      analytics/analytic_unit_worker.py
  2. 5
      analytics/data_provider.py
  3. 6
      analytics/detectors/general_detector.py
  4. 6
      analytics/detectors/pattern_detection_model.py
  5. 2
      analytics/detectors/peaks_detector.py
  6. 2
      analytics/detectors/step_detector.py
  7. 2
      analytics/jump_detector.py
  8. 44
      analytics/server.py
  9. 2
      analytics/supervised_algorithm.py

42
analytics/analytic_unit_worker.py

@ -1,8 +1,6 @@
import config
from detectors.general_detector import GeneralDetector
from detectors.pattern_detection_model import PatternDetectionModel
import queue
import threading
import json
import logging
import sys
@ -15,42 +13,20 @@ logger = logging.getLogger('WORKER')
class AnalyticUnitWorker(object):
models_cache = {}
thread = None
queue = queue.Queue()
def start(self):
self.thread = threading.Thread(target=self.run)
self.thread.start()
def stop(self):
if self.thread:
self.queue.put(None)
self.thread.join()
def run(self):
while True:
task = self.queue.get()
if task['type'] == "stop":
break
self.do_task(task)
self.queue.task_done()
def add_task(self, task):
self.queue.put(task)
# TODO: get task as an object built from json
def do_task(self, task):
async def do_task(self, task):
try:
type = task['type']
analytic_unit_id = task['analyticUnitId']
if type == "predict":
last_prediction_time = task['lastPredictionTime']
pattern = task['pattern']
result = self.do_predict(analytic_unit_id, last_prediction_time, pattern)
result = await self.do_predict(analytic_unit_id, last_prediction_time, pattern)
elif type == "learn":
segments = task['segments']
pattern = task['pattern']
result = self.do_learn(analytic_unit_id, segments, pattern)
result = await self.do_learn(analytic_unit_id, segments, pattern)
else:
result = {
'status': "failed",
@ -69,10 +45,10 @@ class AnalyticUnitWorker(object):
}
return result
def do_learn(self, analytic_unit_id, segments, pattern):
async def do_learn(self, analytic_unit_id, segments, pattern):
model = self.get_model(analytic_unit_id, pattern)
model.synchronize_data()
last_prediction_time = model.learn(segments)
last_prediction_time = await model.learn(segments)
# TODO: we should not do predict before labeling in all models, not just in drops
if pattern == 'drop' and len(segments) == 0:
@ -84,15 +60,15 @@ class AnalyticUnitWorker(object):
'lastPredictionTime': last_prediction_time
}
else:
result = self.do_predict(analytic_unit_id, last_prediction_time, pattern)
result = await self.do_predict(analytic_unit_id, last_prediction_time, pattern)
result['task'] = 'learn'
return result
def do_predict(self, analytic_unit_id, last_prediction_time, pattern):
async def do_predict(self, analytic_unit_id, last_prediction_time, pattern):
model = self.get_model(analytic_unit_id, pattern)
model.synchronize_data()
segments, last_prediction_time = model.predict(last_prediction_time)
segments, last_prediction_time = await model.predict(last_prediction_time)
return {
'task': "predict",
'status': "success",
@ -109,5 +85,3 @@ class AnalyticUnitWorker(object):
model = PatternDetectionModel(analytic_unit_id, pattern_type)
self.models_cache[analytic_unit_id] = model
return self.models_cache[analytic_unit_id]

5
analytics/data_provider.py

@ -7,6 +7,7 @@ import json
from time import time
from config import HASTIC_API_KEY
MS_IN_WEEK = 604800000
class DataProvider:
@ -118,8 +119,8 @@ class DataProvider:
'values': []
}
after_time = int(time()*1000 - MS_IN_WEEK)
before_time = int(time()*1000)
after_time = int(time() * 1000 - MS_IN_WEEK)
before_time = int(time() * 1000)
while True:
params['q'] = self.custom_query(str(after_time) + 'ms', str(before_time) + 'ms')
serie = self.__query_grafana(params)

6
analytics/detectors/general_detector.py

@ -56,7 +56,7 @@ class GeneralDetector:
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
def learn(self, anomalies):
async def learn(self, anomalies):
logger.info("Start to learn for anomaly_name='%s'" % self.anomaly_name)
confidence = 0.02
@ -86,7 +86,7 @@ class GeneralDetector:
logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name)
return last_prediction_time
def predict(self, last_prediction_time):
async def predict(self, last_prediction_time):
logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name)
last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms')
@ -105,7 +105,7 @@ class GeneralDetector:
assert(len(predict_augmented) == chunk_finish - chunk_start)
predicted_current = self.model.predict(predict_augmented)
predicted_current = await self.model.predict(predict_augmented)
predicted = pd.concat([predicted, predicted_current])
predicted_anomalies = self.preprocessor.inverse_transform_anomalies(predicted)

6
analytics/detectors/pattern_detection_model.py

@ -52,7 +52,7 @@ class PatternDetectionModel:
self.model = None
self.__load_model(pattern_type)
def learn(self, segments):
async def learn(self, segments):
self.model = self.__create_model(self.pattern_type)
window_size = 200
@ -64,7 +64,7 @@ class PatternDetectionModel:
self.__save_model()
return 0
def predict(self, last_prediction_time):
async def predict(self, last_prediction_time):
if self.model is None:
return [], last_prediction_time
@ -75,7 +75,7 @@ class PatternDetectionModel:
start_index = max(0, start_index - window_size)
dataframe = self.data_prov.get_data_range(start_index)
predicted_indexes = self.model.predict(dataframe)
predicted_indexes = await self.model.predict(dataframe)
predicted_indexes = [(x, y) for (x, y) in predicted_indexes if x >= start_index and y >= start_index]
predicted_times = self.data_prov.inverse_transform_indexes(predicted_indexes)

2
analytics/detectors/peaks_detector.py

@ -11,7 +11,7 @@ class PeaksDetector:
def fit(self, dataset, contamination=0.005):
pass
def predict(self, dataframe):
async def predict(self, dataframe):
array = dataframe['value'].as_matrix()
window_size = 20
# window = np.ones(101)

2
analytics/detectors/step_detector.py

@ -56,7 +56,7 @@ class StepDetector:
else:
self.convolve_max = 570000
def predict(self, dataframe):
async def predict(self, dataframe):
data = dataframe['value']
result = self.__predict(data)

2
analytics/jump_detector.py

@ -76,7 +76,7 @@ class Jumpdetector:
distribution.append(F)
return distribution
def predict(self, dataframe):
async def predict(self, dataframe):
data = dataframe['value']
result = self.__predict(data)

44
analytics/server.py

@ -2,7 +2,9 @@ import config
import json
import logging
import zmq
import zmq.asyncio
import sys
import asyncio
from analytic_unit_worker import AnalyticUnitWorker
@ -22,45 +24,49 @@ ch.setFormatter(formatter)
root.addHandler(ch)
def handle_ping():
socket.send(b'pong')
async def server_handle_loop():
while True:
received_bytes = await socket.recv()
text = received_bytes.decode('utf-8')
if text == 'ping':
asyncio.ensure_future(handle_ping())
else:
asyncio.ensure_future(handle_task(text))
async def server_send_message(string):
await socket.send_string(string)
def handle_task(text):
async def handle_ping():
await socket.send(b'pong')
async def handle_task(text):
try:
task = json.loads(text)
logger.info("Command is OK")
socket.send_string(json.dumps({
await server_send_message(json.dumps({
'_taskId': task['_taskId'],
'task': task['type'],
'analyticUnitId': task['analyticUnitId'],
'status': "in progress"
}))
res = worker.do_task(task)
res = await worker.do_task(task)
res['_taskId'] = task['_taskId']
socket.send_string(json.dumps(res))
await server_send_message(json.dumps(res))
except Exception as e:
logger.error("Exception: '%s'" % str(e))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
worker = AnalyticUnitWorker()
logger.info("Worker was started")
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING)
context = zmq.Context()
context = zmq.asyncio.Context()
socket = context.socket(zmq.PAIR)
socket.bind(config.ZEROMQ_CONNECTION_STRING)
logger.info("Ok")
while True:
received_bytes = socket.recv()
text = received_bytes.decode('utf-8')
if text == 'ping':
handle_ping()
else:
handle_task(text)
loop.run_until_complete(server_handle_loop())

2
analytics/supervised_algorithm.py

@ -44,7 +44,7 @@ class supervised_algorithm(object):
dataset = self.scaler.transform(dataset)
self.clf.fit(dataset)
def predict(self, dataframe):
async def predict(self, dataframe):
dataset = dataframe[self.good_features]
dataset = self.scaler.transform(dataset)
prediction = self.clf.predict(dataset)

Loading…
Cancel
Save