diff --git a/analytics/.vscode/settings.json b/analytics/.vscode/settings.json index 194e1c0..88626fc 100644 --- a/analytics/.vscode/settings.json +++ b/analytics/.vscode/settings.json @@ -3,5 +3,6 @@ "python.linting.enabled": false, "terminal.integrated.shell.windows": "C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe", "editor.tabSize": 4, - "editor.insertSpaces": true + "editor.insertSpaces": true, + "files.eol": "\n" } \ No newline at end of file diff --git a/analytics/Codestyle.md b/analytics/Codestyle.md index 0333e4c..0d724e3 100644 --- a/analytics/Codestyle.md +++ b/analytics/Codestyle.md @@ -1,19 +1,23 @@ -# Imports - -You import local files first, than spesific liba and then standart libs. -So you import from something very scecific to something very common. -It allows you to pay attention on most important things from beginning. - -``` - -from data_provider import DataProvider -from anomaly_model import AnomalyModel -from pattern_detection_model import PatternDetectionModel - -import numpy as np - -from scipy.signal import argrelextrema - -import pickle - +# Line endings + +We use CRLS everywhere + +# Imports + +You import local files first, than spesific liba and then standart libs. +So you import from something very scecific to something very common. +It allows you to pay attention on most important things from beginning. + +``` + +from data_provider import DataProvider +from anomaly_model import AnomalyModel +from pattern_detection_model import PatternDetectionModel + +import numpy as np + +from scipy.signal import argrelextrema + +import pickle + ``` \ No newline at end of file diff --git a/analytics/config.py b/analytics/config.py index 9b8511b..0fdda4b 100644 --- a/analytics/config.py +++ b/analytics/config.py @@ -1,37 +1,37 @@ -import os -import json - - -PARENT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) -DATA_FOLDER = os.path.join(PARENT_FOLDER, 'data') -CONFIG_FILE = os.path.join(PARENT_FOLDER, 'config.json') - - -config_exists = os.path.isfile(CONFIG_FILE) -if config_exists: - with open(CONFIG_FILE) as f: - config = json.load(f) - - -def get_config_field(field, default_val = None): - - if field in os.environ: - return os.environ[field] - - if config_exists and field in config: - return config[field] - - if default_val is not None: - return default_val - - raise Exception('Please configure {}'.format(field)) - - - -DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets') -ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units') -MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models') -METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics') - -HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') -ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002') +import os +import json + + +PARENT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) +DATA_FOLDER = os.path.join(PARENT_FOLDER, 'data') +CONFIG_FILE = os.path.join(PARENT_FOLDER, 'config.json') + + +config_exists = os.path.isfile(CONFIG_FILE) +if config_exists: + with open(CONFIG_FILE) as f: + config = json.load(f) + + +def get_config_field(field, default_val = None): + + if field in os.environ: + return os.environ[field] + + if config_exists and field in config: + return config[field] + + if default_val is not None: + return default_val + + raise Exception('Please configure {}'.format(field)) + + + +DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets') +ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units') +MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models') +METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics') + +HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') +ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002') diff --git a/analytics/detectors/__init__.py b/analytics/detectors/__init__.py index a7ce05f..ed4522f 100644 --- a/analytics/detectors/__init__.py +++ b/analytics/detectors/__init__.py @@ -1,5 +1,5 @@ -from detectors.general_detector import GeneralDetector -from detectors.pattern_detection_model import PatternDetectionModel -from detectors.peaks_detector import PeaksDetector -from detectors.step_detector import StepDetector -from detectors.jump_detector import Jumpdetector +from detectors.general_detector import GeneralDetector +from detectors.pattern_detection_model import PatternDetectionModel +from detectors.peaks_detector import PeaksDetector +from detectors.step_detector import StepDetector +from detectors.jump_detector import Jumpdetector diff --git a/analytics/detectors/jump_detector.py b/analytics/detectors/jump_detector.py index 22dab4f..d2127da 100644 --- a/analytics/detectors/jump_detector.py +++ b/analytics/detectors/jump_detector.py @@ -1,138 +1,138 @@ -import numpy as np -import pickle -import scipy.signal -from scipy.fftpack import fft -from scipy.signal import argrelextrema -import math - -def is_intersect(target_segment, segments): - for segment in segments: - start = max(segment['start'], target_segment[0]) - finish = min(segment['finish'], target_segment[1]) - if start <= finish: - return True - return False - -def exponential_smoothing(series, alpha): - result = [series[0]] - for n in range(1, len(series)): - result.append(alpha * series[n] + (1 - alpha) * result[n-1]) - return result - -class Jumpdetector: - - def __init__(self, pattern): - self.pattern = pattern - self.segments = [] - self.confidence = 1.5 - self.convolve_max = 120 - - def fit(self, dataframe, segments): - data = dataframe['value'] - confidences = [] - convolve_list = [] - for segment in segments: - if segment['labeled']: - segment_data = data[segment['start'] : segment['finish'] + 1] - segment_min = min(segment_data) - segment_max = max(segment_data) - confidences.append(0.20 * (segment_max - segment_min)) - flat_segment = segment_data.rolling(window=5).mean() #сглаживаем сегмент - # в идеале нужно посмотреть гистограмму сегмента и выбрать среднее значение, - # далее от него брать + -120 - segment_summ = 0 - for val in flat_segment: - segment_summ += val - segment_mid = segment_summ / len(flat_segment) #посчитать нормально среднее значение/медиану - for ind in range(1, len(flat_segment) - 1): - if flat_segment[ind + 1] > segment_mid and flat_segment[ind - 1] < segment_mid: - flat_mid_index = ind # найти пересечение средней и графика, получить его индекс - segment_mid_index = flat_mid_index - 5 - labeled_drop = data[segment_mid_index - 120 : segment_mid_index + 120] - labeled_min = min(labeled_drop) - for value in labeled_drop: # обрезаем - value = value - labeled_min - labeled_max = max(labeled_drop) - for value in labeled_drop: # нормируем - value = value / labeled_max - convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) - convolve_list.append(max(convolve)) # сворачиваем паттерн - # плюс надо впихнуть сюда логистическую сигмоиду и поиск альфы - - if len(confidences) > 0: - self.confidence = min(confidences) - else: - self.confidence = 1.5 - - if len(convolve_list) > 0: - self.convolve_max = max(convolve_list) - else: - self.convolve_max = 120 # макс метрика свертки равна отступу(120), вау! - - def logistic_sigmoid(x1, x2, alpha, height): - distribution = [] - for i in range(x, y): - F = 1 * height / (1 + math.exp(-i * alpha)) - distribution.append(F) - return distribution - - async def predict(self, dataframe): - data = dataframe['value'] - - result = self.__predict(data) - result.sort() - - if len(self.segments) > 0: - result = [segment for segment in result if not is_intersect(segment, self.segments)] - return result - - def __predict(self, data): - window_size = 24 - all_max_flatten_data = data.rolling(window=window_size).mean() - extrema_list = [] - # добавить все пересечения экспоненты со сглаженным графиком - # - for i in exponential_smoothing(data + self.confidence, 0.02): - extrema_list.append(i) - - segments = [] - for i in all_mins: - if all_max_flatten_data[i] > extrema_list[i]: - segments.append(i - window_size) - - return [(x - 1, x + 1) for x in self.__filter_prediction(segments, all_max_flatten_data)] - - def __filter_prediction(self, segments, all_max_flatten_data): - delete_list = [] - variance_error = int(0.004 * len(all_max_flatten_data)) - if variance_error > 200: - variance_error = 200 - for i in range(1, len(segments)): - if segments[i] < segments[i - 1] + variance_error: - delete_list.append(segments[i]) - for item in delete_list: - segments.remove(item) - - # изменить секонд делит лист, сделать для свертки с сигмоидой - delete_list = [] - pattern_data = all_max_flatten_data[segments[0] - 120 : segments[0] + 120] - for segment in segments: - convol_data = all_max_flatten_data[segment - 120 : segment + 120] - conv = scipy.signal.fftconvolve(pattern_data, convol_data) - if max(conv) > self.convolve_max * 1.1 or max(conv) < self.convolve_max * 0.9: - delete_list.append(segment) - for item in delete_list: - segments.remove(item) - - return segments - - def save(self, model_filename): - with open(model_filename, 'wb') as file: - pickle.dump((self.confidence, self.convolve_max), file) - - def load(self, model_filename): - try: - with open(model_filename, 'rb') as file: - (self.confidence, self.convolve_max) = pickle.load(file) - except: - pass +import numpy as np +import pickle +import scipy.signal +from scipy.fftpack import fft +from scipy.signal import argrelextrema +import math + +def is_intersect(target_segment, segments): + for segment in segments: + start = max(segment['start'], target_segment[0]) + finish = min(segment['finish'], target_segment[1]) + if start <= finish: + return True + return False + +def exponential_smoothing(series, alpha): + result = [series[0]] + for n in range(1, len(series)): + result.append(alpha * series[n] + (1 - alpha) * result[n-1]) + return result + +class Jumpdetector: + + def __init__(self, pattern): + self.pattern = pattern + self.segments = [] + self.confidence = 1.5 + self.convolve_max = 120 + + def fit(self, dataframe, segments): + data = dataframe['value'] + confidences = [] + convolve_list = [] + for segment in segments: + if segment['labeled']: + segment_data = data[segment['start'] : segment['finish'] + 1] + segment_min = min(segment_data) + segment_max = max(segment_data) + confidences.append(0.20 * (segment_max - segment_min)) + flat_segment = segment_data.rolling(window=5).mean() #сглаживаем сегмент + # в идеале нужно посмотреть гистограмму сегмента и выбрать среднее значение, + # далее от него брать + -120 + segment_summ = 0 + for val in flat_segment: + segment_summ += val + segment_mid = segment_summ / len(flat_segment) #посчитать нормально среднее значение/медиану + for ind in range(1, len(flat_segment) - 1): + if flat_segment[ind + 1] > segment_mid and flat_segment[ind - 1] < segment_mid: + flat_mid_index = ind # найти пересечение средней и графика, получить его индекс + segment_mid_index = flat_mid_index - 5 + labeled_drop = data[segment_mid_index - 120 : segment_mid_index + 120] + labeled_min = min(labeled_drop) + for value in labeled_drop: # обрезаем + value = value - labeled_min + labeled_max = max(labeled_drop) + for value in labeled_drop: # нормируем + value = value / labeled_max + convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) + convolve_list.append(max(convolve)) # сворачиваем паттерн + # плюс надо впихнуть сюда логистическую сигмоиду и поиск альфы + + if len(confidences) > 0: + self.confidence = min(confidences) + else: + self.confidence = 1.5 + + if len(convolve_list) > 0: + self.convolve_max = max(convolve_list) + else: + self.convolve_max = 120 # макс метрика свертки равна отступу(120), вау! + + def logistic_sigmoid(x1, x2, alpha, height): + distribution = [] + for i in range(x, y): + F = 1 * height / (1 + math.exp(-i * alpha)) + distribution.append(F) + return distribution + + async def predict(self, dataframe): + data = dataframe['value'] + + result = self.__predict(data) + result.sort() + + if len(self.segments) > 0: + result = [segment for segment in result if not is_intersect(segment, self.segments)] + return result + + def __predict(self, data): + window_size = 24 + all_max_flatten_data = data.rolling(window=window_size).mean() + extrema_list = [] + # добавить все пересечения экспоненты со сглаженным графиком + # + for i in exponential_smoothing(data + self.confidence, 0.02): + extrema_list.append(i) + + segments = [] + for i in all_mins: + if all_max_flatten_data[i] > extrema_list[i]: + segments.append(i - window_size) + + return [(x - 1, x + 1) for x in self.__filter_prediction(segments, all_max_flatten_data)] + + def __filter_prediction(self, segments, all_max_flatten_data): + delete_list = [] + variance_error = int(0.004 * len(all_max_flatten_data)) + if variance_error > 200: + variance_error = 200 + for i in range(1, len(segments)): + if segments[i] < segments[i - 1] + variance_error: + delete_list.append(segments[i]) + for item in delete_list: + segments.remove(item) + + # изменить секонд делит лист, сделать для свертки с сигмоидой + delete_list = [] + pattern_data = all_max_flatten_data[segments[0] - 120 : segments[0] + 120] + for segment in segments: + convol_data = all_max_flatten_data[segment - 120 : segment + 120] + conv = scipy.signal.fftconvolve(pattern_data, convol_data) + if max(conv) > self.convolve_max * 1.1 or max(conv) < self.convolve_max * 0.9: + delete_list.append(segment) + for item in delete_list: + segments.remove(item) + + return segments + + def save(self, model_filename): + with open(model_filename, 'wb') as file: + pickle.dump((self.confidence, self.convolve_max), file) + + def load(self, model_filename): + try: + with open(model_filename, 'rb') as file: + (self.confidence, self.convolve_max) = pickle.load(file) + except: + pass diff --git a/analytics/server.py b/analytics/server.py index 059b5d4..75a0fa0 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -1,64 +1,64 @@ -import config -import json -import logging -import sys -import asyncio - -import services -from analytic_unit_worker import AnalyticUnitWorker - - - -root = logging.getLogger() -logger = logging.getLogger('SERVER') - -worker = None -server_service = None -data_service = None - -root.setLevel(logging.DEBUG) - -ch = logging.StreamHandler(sys.stdout) -ch.setLevel(logging.DEBUG) -formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") -ch.setFormatter(formatter) -root.addHandler(ch) - - -async def handle_task(text): - try: - task = json.loads(text) - logger.info("Command is OK") - - await server_service.send_message(json.dumps({ - '_taskId': task['_taskId'], - 'task': task['type'], - 'analyticUnitId': task['analyticUnitId'], - 'status': "in progress" - })) - - res = await worker.do_task(task) - res['_taskId'] = task['_taskId'] - await server_service.send_message(json.dumps(res)) - - except Exception as e: - logger.error("Exception: '%s'" % str(e)) - -def init_services(): - logger.info("Starting services...") - logger.info("Server...") - server_service = services.ServerService(handle_task) - logger.info("Ok") - logger.info("Data service...") - data_service = services.DataService(server_service) - logger.info("Ok") - - return server_service, data_service - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - logger.info("Starting worker...") - worker = AnalyticUnitWorker() - logger.info("Ok") - server_service, data_service = init_services() - loop.run_until_complete(server_service.handle_loop()) +import config +import json +import logging +import sys +import asyncio + +import services +from analytic_unit_worker import AnalyticUnitWorker + + + +root = logging.getLogger() +logger = logging.getLogger('SERVER') + +worker = None +server_service = None +data_service = None + +root.setLevel(logging.DEBUG) + +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") +ch.setFormatter(formatter) +root.addHandler(ch) + + +async def handle_task(text): + try: + task = json.loads(text) + logger.info("Command is OK") + + await server_service.send_message(json.dumps({ + '_taskId': task['_taskId'], + 'task': task['type'], + 'analyticUnitId': task['analyticUnitId'], + 'status': "in progress" + })) + + res = await worker.do_task(task) + res['_taskId'] = task['_taskId'] + await server_service.send_message(json.dumps(res)) + + except Exception as e: + logger.error("Exception: '%s'" % str(e)) + +def init_services(): + logger.info("Starting services...") + logger.info("Server...") + server_service = services.ServerService(handle_task) + logger.info("Ok") + logger.info("Data service...") + data_service = services.DataService(server_service) + logger.info("Ok") + + return server_service, data_service + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + logger.info("Starting worker...") + worker = AnalyticUnitWorker() + logger.info("Ok") + server_service, data_service = init_services() + loop.run_until_complete(server_service.handle_loop()) diff --git a/analytics/services/__init__.py b/analytics/services/__init__.py index 5cf4c4c..d7fb414 100644 --- a/analytics/services/__init__.py +++ b/analytics/services/__init__.py @@ -1,2 +1,2 @@ -from services.server_service import ServerService -from services.data_service import DataService +from services.server_service import ServerService +from services.data_service import DataService diff --git a/analytics/services/data_service.py b/analytics/services/data_service.py index 1310418..a136b88 100644 --- a/analytics/services/data_service.py +++ b/analytics/services/data_service.py @@ -1,9 +1,9 @@ -class DataService: - def __init__(self, server_service): - self.server_service = server_service - - async def safe_file(filename, content): - pass - - async def load_file(filename, content): +class DataService: + def __init__(self, server_service): + self.server_service = server_service + + async def safe_file(filename, content): + pass + + async def load_file(filename, content): pass \ No newline at end of file diff --git a/analytics/services/server_service.py b/analytics/services/server_service.py index 41c3543..b88c250 100644 --- a/analytics/services/server_service.py +++ b/analytics/services/server_service.py @@ -1,42 +1,42 @@ -import config - -import zmq -import zmq.asyncio -import logging - -import asyncio - -logger = logging.getLogger('SERVER_SERVICE') - - -class ServerService: - - def __init__(self, on_message_handler): - self.on_message_handler = on_message_handler - - logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) - self.context = zmq.asyncio.Context() - self.socket = self.context.socket(zmq.PAIR) - self.socket.bind(config.ZEROMQ_CONNECTION_STRING) - - async def handle_loop(self): - while True: - received_bytes = await self.socket.recv() - text = received_bytes.decode('utf-8') - - if text == 'ping': - asyncio.ensure_future(self.__handle_ping()) - else: - asyncio.ensure_future(self.__handle_message(text)) - - async def send_message(self, string): - await self.socket.send_string(string) - - async def __handle_ping(self): - await self.socket.send(b'pong') - - async def __handle_message(self, text): - try: - asyncio.ensure_future(self.on_message_handler(text)) - except Exception as e: - logger.error("Exception: '%s'" % str(e)) +import config + +import zmq +import zmq.asyncio +import logging + +import asyncio + +logger = logging.getLogger('SERVER_SERVICE') + + +class ServerService: + + def __init__(self, on_message_handler): + self.on_message_handler = on_message_handler + + logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) + self.context = zmq.asyncio.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.bind(config.ZEROMQ_CONNECTION_STRING) + + async def handle_loop(self): + while True: + received_bytes = await self.socket.recv() + text = received_bytes.decode('utf-8') + + if text == 'ping': + asyncio.ensure_future(self.__handle_ping()) + else: + asyncio.ensure_future(self.__handle_message(text)) + + async def send_message(self, string): + await self.socket.send_string(string) + + async def __handle_ping(self): + await self.socket.send(b'pong') + + async def __handle_message(self, text): + try: + asyncio.ensure_future(self.on_message_handler(text)) + except Exception as e: + logger.error("Exception: '%s'" % str(e))