Browse Source

One panel - one worker #62

pull/1/head
Coin de Gamma 6 years ago
parent
commit
0902209234
  1. 6
      analytics/Codestyle.md
  2. 50
      analytics/analytic_unit_manager.py
  3. 45
      analytics/analytic_unit_worker.py
  4. 4
      analytics/detectors/detector.py
  5. 25
      analytics/detectors/general_detector/general_detector.py
  6. 34
      analytics/detectors/pattern_detector.py
  7. 8
      analytics/server.py

6
analytics/Codestyle.md

@ -1,6 +1,10 @@
# Type hints
Please use: https://www.python.org/dev/peps/pep-0484/
# Line endings # Line endings
We use CRLS everywhere We use LF everywhere
# Imports # Imports

50
analytics/analytic_unit_manager.py

@ -0,0 +1,50 @@
from typing import Dict
import detectors
from analytic_unit_worker import AnalyticUnitWorker
analytic_unit_id = str
analytic_workers: Dict[analytic_unit_id, AnalyticUnitWorker] = Dict()
def get_detector(self, analytic_unit_type) -> detectors.Detector:
if analytic_unit_type == 'GENERAL':
detector = detectors.GeneralDetector()
else:
detector = detectors.PatternDetector(analytic_unit_type)
return detector
def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
if analytic_unit_id in analytic_workers:
# TODO: check that type is the same
return analytic_workers[analytic_unit_id]
detector = get_detector(analytic_unit_type)
worker = AnalyticUnitWorker(analytic_unit_id, detector)
analytic_workers[analytic_unit_id] = worker
return worker
async def handle_analytic_task(task):
try:
worker = ensure_worker(task['analyticUnitId'], task['type'])
payload = task['payload']
if type == "PREDICT":
result_payload = await worker.do_predict(analytic_unit_id, payload)
elif type == "LEARN":
result_payload = await worker.do_learn(analytic_unit_id, payload)
else:
raise ValueError('Unknown task type "%s"' % type)
except Exception as e:
#traceback.extract_stack()
error_text = traceback.format_exc()
logger.error("do_task Exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics
result = {
'status': "FAILED",
'error': str(e)
}
return {
'status': 'SUCCESS',
'payload': result_payload
}

45
analytics/analytic_unit_worker.py

@ -7,57 +7,20 @@ import traceback
import time import time
logger = logging.getLogger('AnalyticUnitWorker') logger = logging.getLogger('AnalyticUnitWorker')
class AnalyticUnitWorker: class AnalyticUnitWorker:
def get_detector(self, analytic_unit_id, pattern_type): def __init__(self, analytic_unit_id: str, detector: detectors.Detector):
if analytic_unit_id not in self.detectors_cache: self.analytic_unit_id = analytic_unit_id
if pattern_type == 'GENERAL': self.detector = detector
detector = detectors.GeneralDetector(analytic_unit_id)
else:
detector = detectors.PatternDetector(analytic_unit_id, pattern_type)
self.detectors_cache[analytic_unit_id] = detector
return self.detectors_cache[analytic_unit_id]
def __init__(self, detector: detectors.Detector):
pass
async def do_task(self, task):
try:
type = task['type']
analytic_unit_id = task['analyticUnitId']
payload = task['payload']
if type == "PREDICT":
result_payload = await self.do_predict(analytic_unit_id, payload)
elif type == "LEARN":
result_payload = await self.do_learn(analytic_unit_id, payload)
else:
raise ValueError('Unknown task type %s' % type)
except Exception as e:
#traceback.extract_stack()
error_text = traceback.format_exc()
logger.error("do_task Exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics
result = {
'status': "FAILED",
'error': str(e)
}
return {
'status': 'SUCCESS',
'payload': result_payload
}
async def do_learn(self, analytic_unit_id, payload) -> None: async def do_learn(self, analytic_unit_id, payload) -> None:
pattern = payload['pattern'] pattern = payload['pattern']
segments = payload['segments'] segments = payload['segments']
data = payload['data'] # [time, value][] data = payload['data'] # [time, value][]
await self.detector.train(data, segments)
detector = self.get_detector(analytic_unit_id, pattern)
await detector.learn(segments)
async def do_predict(self, analytic_unit_id, payload): async def do_predict(self, analytic_unit_id, payload):
pattern = payload['pattern'] pattern = payload['pattern']

4
analytics/detectors/detector.py

@ -5,9 +5,9 @@ from pandas import DataFrame
class Detector(ABC): class Detector(ABC):
@abstractmethod @abstractmethod
def fit(self, dataframe: DataFrame, segments: list): async def train(self, dataframe: DataFrame, segments: list):
pass pass
@abstractmethod @abstractmethod
def predict(self, dataframe: DataFrame) -> list: async def predict(self, dataframe: DataFrame) -> list:
pass pass

25
analytics/detectors/general_detector/general_detector.py

@ -1,4 +1,5 @@
from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm
from detectors import Detector
import utils import utils
# from grafana_data_provider import GrafanaDataProvider # from grafana_data_provider import GrafanaDataProvider
from data_preprocessor import data_preprocessor from data_preprocessor import data_preprocessor
@ -15,15 +16,13 @@ NANOSECONDS_IN_MS = 1000000
logger = logging.getLogger('analytic_toolset') logger = logging.getLogger('analytic_toolset')
class GeneralDetector: class GeneralDetector(Detector):
def __init__(self, anomaly_name): def __init__(self):
self.anomaly_name = anomaly_name
self.model = None self.model = None
self.__load_model() self.__load_model()
async def learn(self, segments, data): async def train(self, segments, data):
logger.info("Start to learn for anomaly_name='%s'" % self.anomaly_name)
confidence = 0.02 confidence = 0.02
dataframe = data # make dataframae from array dataframe = data # make dataframae from array
@ -40,7 +39,7 @@ class GeneralDetector:
segments segments
) )
self.model = self.create_algorithm() self.model = SupervisedAlgorithm()
await self.model.fit(train_augmented, confidence) await self.model.fit(train_augmented, confidence)
if len(segments) > 0: if len(segments) > 0:
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
@ -52,7 +51,7 @@ class GeneralDetector:
logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name)
return last_prediction_time return last_prediction_time
async def predict(self, last_prediction_time): async def predict(self, data):
logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name) logger.info("Start to predict for anomaly type='%s'" % self.anomaly_name)
last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms') last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms')
@ -83,15 +82,3 @@ class GeneralDetector:
logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name) logger.info("Predicting is finished for anomaly type='%s'" % self.anomaly_name)
return predicted_anomalies, last_prediction_time return predicted_anomalies, last_prediction_time
def create_algorithm(self):
return SupervisedAlgorithm()
def __save_model(self):
pass
# TODO: use data_service to save anything
def __load_model(self):
pass
# TODO: use data_service to save anything

34
analytics/detectors/pattern_detector.py

@ -11,9 +11,10 @@ import config
import pandas as pd import pandas as pd
from detectors import Detector
logger = logging.getLogger('analytic_toolset')
logger = logging.getLogger('analytic_toolset')
def resolve_model_by_pattern(pattern: str) -> models.Model: def resolve_model_by_pattern(pattern: str) -> models.Model:
@ -28,30 +29,20 @@ def resolve_model_by_pattern(pattern: str) -> models.Model:
raise ValueError('Unknown pattern "%s"' % pattern) raise ValueError('Unknown pattern "%s"' % pattern)
class PatternDetector: class PatternDetector(Detector):
def __init__(self, analytic_unit_id, pattern_type): def __init__(self, pattern_type):
self.analytic_unit_id = analytic_unit_id
self.pattern_type = pattern_type self.pattern_type = pattern_type
self.model = None
self.__load_model(pattern_type)
async def learn(self, segments, data):
self.model = resolve_model_by_pattern(self.pattern_type) self.model = resolve_model_by_pattern(self.pattern_type)
window_size = 200 window_size = 100
async def train(self, dataframe: pd.DataFrame, segments: list):
# TODO: pass only part of dataframe that has segments # TODO: pass only part of dataframe that has segments
self.model.fit(dataframe, segments, data) self.model.fit(dataframe, data, segments)
self.__save_model() self.__save_model()
return 0 return 0
async def predict(self, last_prediction_time, data): async def predict(self, data):
if self.model is None:
return [], last_prediction_time
window_size = 100
last_prediction_time = pd.to_datetime(last_prediction_time, unit='ms')
start_index = self.data_prov.get_upper_bound(last_prediction_time) start_index = self.data_prov.get_upper_bound(last_prediction_time)
start_index = max(0, start_index - window_size) start_index = max(0, start_index - window_size)
@ -73,12 +64,3 @@ class PatternDetector:
last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = int(last_dataframe_time.timestamp() * 1000) last_prediction_time = int(last_dataframe_time.timestamp() * 1000)
return segments, last_prediction_time return segments, last_prediction_time
# return predicted_anomalies, last_prediction_time
def __save_model(self):
pass
# TODO: use data_service to save anything
def __load_model(self, pattern):
pass
# TODO: use data_service to save anything

8
analytics/server.py

@ -5,13 +5,13 @@ import sys
import asyncio import asyncio
import services import services
from analytic_unit_worker import AnalyticUnitWorker from analytic_unit_manager import handle_analytic_task
root = logging.getLogger() root = logging.getLogger()
logger = logging.getLogger('SERVER') logger = logging.getLogger('SERVER')
worker: AnalyticUnitWorker = None
server_service: services.ServerService = None server_service: services.ServerService = None
data_service: services.DataService = None data_service: services.DataService = None
@ -28,6 +28,8 @@ logging_handler.setFormatter(logging_formatter)
root.addHandler(logging_handler) root.addHandler(logging_handler)
async def handle_task(task: object): async def handle_task(task: object):
try: try:
@ -43,7 +45,7 @@ async def handle_task(task: object):
message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload)
await server_service.send_message(message) await server_service.send_message(message)
res = await worker.do_task(task) res = await handle_analytic_task(task)
res['_id'] = task['_id'] res['_id'] = task['_id']
message = services.server_service.ServerMessage('TASK_RESULT', res) message = services.server_service.ServerMessage('TASK_RESULT', res)

Loading…
Cancel
Save