Browse Source

LF eol in all files & add doc about that & setting to settings.json

pull/1/head
Alexey Velikiy 7 years ago
parent
commit
72e8c4520e
  1. 3
      analytics/.vscode/settings.json
  2. 40
      analytics/Codestyle.md
  3. 74
      analytics/config.py
  4. 10
      analytics/detectors/__init__.py
  5. 276
      analytics/detectors/jump_detector.py
  6. 128
      analytics/server.py
  7. 4
      analytics/services/__init__.py
  8. 16
      analytics/services/data_service.py
  9. 84
      analytics/services/server_service.py

3
analytics/.vscode/settings.json vendored

@ -3,5 +3,6 @@
"python.linting.enabled": false,
"terminal.integrated.shell.windows": "C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe",
"editor.tabSize": 4,
"editor.insertSpaces": true
"editor.insertSpaces": true,
"files.eol": "\n"
}

40
analytics/Codestyle.md

@ -1,19 +1,23 @@
# Imports
You import local files first, than spesific liba and then standart libs.
So you import from something very scecific to something very common.
It allows you to pay attention on most important things from beginning.
```
from data_provider import DataProvider
from anomaly_model import AnomalyModel
from pattern_detection_model import PatternDetectionModel
import numpy as np
from scipy.signal import argrelextrema
import pickle
# Line endings
We use CRLS everywhere
# Imports
You import local files first, than spesific liba and then standart libs.
So you import from something very scecific to something very common.
It allows you to pay attention on most important things from beginning.
```
from data_provider import DataProvider
from anomaly_model import AnomalyModel
from pattern_detection_model import PatternDetectionModel
import numpy as np
from scipy.signal import argrelextrema
import pickle
```

74
analytics/config.py

@ -1,37 +1,37 @@
import os
import json
PARENT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
DATA_FOLDER = os.path.join(PARENT_FOLDER, 'data')
CONFIG_FILE = os.path.join(PARENT_FOLDER, 'config.json')
config_exists = os.path.isfile(CONFIG_FILE)
if config_exists:
with open(CONFIG_FILE) as f:
config = json.load(f)
def get_config_field(field, default_val = None):
if field in os.environ:
return os.environ[field]
if config_exists and field in config:
return config[field]
if default_val is not None:
return default_val
raise Exception('Please configure {}'.format(field))
DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets')
ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units')
MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models')
METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics')
HASTIC_API_KEY = get_config_field('HASTIC_API_KEY')
ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002')
import os
import json
PARENT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
DATA_FOLDER = os.path.join(PARENT_FOLDER, 'data')
CONFIG_FILE = os.path.join(PARENT_FOLDER, 'config.json')
config_exists = os.path.isfile(CONFIG_FILE)
if config_exists:
with open(CONFIG_FILE) as f:
config = json.load(f)
def get_config_field(field, default_val = None):
if field in os.environ:
return os.environ[field]
if config_exists and field in config:
return config[field]
if default_val is not None:
return default_val
raise Exception('Please configure {}'.format(field))
DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets')
ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units')
MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models')
METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics')
HASTIC_API_KEY = get_config_field('HASTIC_API_KEY')
ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002')

10
analytics/detectors/__init__.py

@ -1,5 +1,5 @@
from detectors.general_detector import GeneralDetector
from detectors.pattern_detection_model import PatternDetectionModel
from detectors.peaks_detector import PeaksDetector
from detectors.step_detector import StepDetector
from detectors.jump_detector import Jumpdetector
from detectors.general_detector import GeneralDetector
from detectors.pattern_detection_model import PatternDetectionModel
from detectors.peaks_detector import PeaksDetector
from detectors.step_detector import StepDetector
from detectors.jump_detector import Jumpdetector

276
analytics/detectors/jump_detector.py

@ -1,138 +1,138 @@
import numpy as np
import pickle
import scipy.signal
from scipy.fftpack import fft
from scipy.signal import argrelextrema
import math
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['start'], target_segment[0])
finish = min(segment['finish'], target_segment[1])
if start <= finish:
return True
return False
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n-1])
return result
class Jumpdetector:
def __init__(self, pattern):
self.pattern = pattern
self.segments = []
self.confidence = 1.5
self.convolve_max = 120
def fit(self, dataframe, segments):
data = dataframe['value']
confidences = []
convolve_list = []
for segment in segments:
if segment['labeled']:
segment_data = data[segment['start'] : segment['finish'] + 1]
segment_min = min(segment_data)
segment_max = max(segment_data)
confidences.append(0.20 * (segment_max - segment_min))
flat_segment = segment_data.rolling(window=5).mean() #сглаживаем сегмент
# в идеале нужно посмотреть гистограмму сегмента и выбрать среднее значение,
# далее от него брать + -120
segment_summ = 0
for val in flat_segment:
segment_summ += val
segment_mid = segment_summ / len(flat_segment) #посчитать нормально среднее значение/медиану
for ind in range(1, len(flat_segment) - 1):
if flat_segment[ind + 1] > segment_mid and flat_segment[ind - 1] < segment_mid:
flat_mid_index = ind # найти пересечение средней и графика, получить его индекс
segment_mid_index = flat_mid_index - 5
labeled_drop = data[segment_mid_index - 120 : segment_mid_index + 120]
labeled_min = min(labeled_drop)
for value in labeled_drop: # обрезаем
value = value - labeled_min
labeled_max = max(labeled_drop)
for value in labeled_drop: # нормируем
value = value / labeled_max
convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop)
convolve_list.append(max(convolve)) # сворачиваем паттерн
# плюс надо впихнуть сюда логистическую сигмоиду и поиск альфы
if len(confidences) > 0:
self.confidence = min(confidences)
else:
self.confidence = 1.5
if len(convolve_list) > 0:
self.convolve_max = max(convolve_list)
else:
self.convolve_max = 120 # макс метрика свертки равна отступу(120), вау!
def logistic_sigmoid(x1, x2, alpha, height):
distribution = []
for i in range(x, y):
F = 1 * height / (1 + math.exp(-i * alpha))
distribution.append(F)
return distribution
async def predict(self, dataframe):
data = dataframe['value']
result = self.__predict(data)
result.sort()
if len(self.segments) > 0:
result = [segment for segment in result if not is_intersect(segment, self.segments)]
return result
def __predict(self, data):
window_size = 24
all_max_flatten_data = data.rolling(window=window_size).mean()
extrema_list = []
# добавить все пересечения экспоненты со сглаженным графиком
#
for i in exponential_smoothing(data + self.confidence, 0.02):
extrema_list.append(i)
segments = []
for i in all_mins:
if all_max_flatten_data[i] > extrema_list[i]:
segments.append(i - window_size)
return [(x - 1, x + 1) for x in self.__filter_prediction(segments, all_max_flatten_data)]
def __filter_prediction(self, segments, all_max_flatten_data):
delete_list = []
variance_error = int(0.004 * len(all_max_flatten_data))
if variance_error > 200:
variance_error = 200
for i in range(1, len(segments)):
if segments[i] < segments[i - 1] + variance_error:
delete_list.append(segments[i])
for item in delete_list:
segments.remove(item)
# изменить секонд делит лист, сделать для свертки с сигмоидой
delete_list = []
pattern_data = all_max_flatten_data[segments[0] - 120 : segments[0] + 120]
for segment in segments:
convol_data = all_max_flatten_data[segment - 120 : segment + 120]
conv = scipy.signal.fftconvolve(pattern_data, convol_data)
if max(conv) > self.convolve_max * 1.1 or max(conv) < self.convolve_max * 0.9:
delete_list.append(segment)
for item in delete_list:
segments.remove(item)
return segments
def save(self, model_filename):
with open(model_filename, 'wb') as file:
pickle.dump((self.confidence, self.convolve_max), file)
def load(self, model_filename):
try:
with open(model_filename, 'rb') as file:
(self.confidence, self.convolve_max) = pickle.load(file)
except:
pass
import numpy as np
import pickle
import scipy.signal
from scipy.fftpack import fft
from scipy.signal import argrelextrema
import math
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['start'], target_segment[0])
finish = min(segment['finish'], target_segment[1])
if start <= finish:
return True
return False
def exponential_smoothing(series, alpha):
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n-1])
return result
class Jumpdetector:
def __init__(self, pattern):
self.pattern = pattern
self.segments = []
self.confidence = 1.5
self.convolve_max = 120
def fit(self, dataframe, segments):
data = dataframe['value']
confidences = []
convolve_list = []
for segment in segments:
if segment['labeled']:
segment_data = data[segment['start'] : segment['finish'] + 1]
segment_min = min(segment_data)
segment_max = max(segment_data)
confidences.append(0.20 * (segment_max - segment_min))
flat_segment = segment_data.rolling(window=5).mean() #сглаживаем сегмент
# в идеале нужно посмотреть гистограмму сегмента и выбрать среднее значение,
# далее от него брать + -120
segment_summ = 0
for val in flat_segment:
segment_summ += val
segment_mid = segment_summ / len(flat_segment) #посчитать нормально среднее значение/медиану
for ind in range(1, len(flat_segment) - 1):
if flat_segment[ind + 1] > segment_mid and flat_segment[ind - 1] < segment_mid:
flat_mid_index = ind # найти пересечение средней и графика, получить его индекс
segment_mid_index = flat_mid_index - 5
labeled_drop = data[segment_mid_index - 120 : segment_mid_index + 120]
labeled_min = min(labeled_drop)
for value in labeled_drop: # обрезаем
value = value - labeled_min
labeled_max = max(labeled_drop)
for value in labeled_drop: # нормируем
value = value / labeled_max
convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop)
convolve_list.append(max(convolve)) # сворачиваем паттерн
# плюс надо впихнуть сюда логистическую сигмоиду и поиск альфы
if len(confidences) > 0:
self.confidence = min(confidences)
else:
self.confidence = 1.5
if len(convolve_list) > 0:
self.convolve_max = max(convolve_list)
else:
self.convolve_max = 120 # макс метрика свертки равна отступу(120), вау!
def logistic_sigmoid(x1, x2, alpha, height):
distribution = []
for i in range(x, y):
F = 1 * height / (1 + math.exp(-i * alpha))
distribution.append(F)
return distribution
async def predict(self, dataframe):
data = dataframe['value']
result = self.__predict(data)
result.sort()
if len(self.segments) > 0:
result = [segment for segment in result if not is_intersect(segment, self.segments)]
return result
def __predict(self, data):
window_size = 24
all_max_flatten_data = data.rolling(window=window_size).mean()
extrema_list = []
# добавить все пересечения экспоненты со сглаженным графиком
#
for i in exponential_smoothing(data + self.confidence, 0.02):
extrema_list.append(i)
segments = []
for i in all_mins:
if all_max_flatten_data[i] > extrema_list[i]:
segments.append(i - window_size)
return [(x - 1, x + 1) for x in self.__filter_prediction(segments, all_max_flatten_data)]
def __filter_prediction(self, segments, all_max_flatten_data):
delete_list = []
variance_error = int(0.004 * len(all_max_flatten_data))
if variance_error > 200:
variance_error = 200
for i in range(1, len(segments)):
if segments[i] < segments[i - 1] + variance_error:
delete_list.append(segments[i])
for item in delete_list:
segments.remove(item)
# изменить секонд делит лист, сделать для свертки с сигмоидой
delete_list = []
pattern_data = all_max_flatten_data[segments[0] - 120 : segments[0] + 120]
for segment in segments:
convol_data = all_max_flatten_data[segment - 120 : segment + 120]
conv = scipy.signal.fftconvolve(pattern_data, convol_data)
if max(conv) > self.convolve_max * 1.1 or max(conv) < self.convolve_max * 0.9:
delete_list.append(segment)
for item in delete_list:
segments.remove(item)
return segments
def save(self, model_filename):
with open(model_filename, 'wb') as file:
pickle.dump((self.confidence, self.convolve_max), file)
def load(self, model_filename):
try:
with open(model_filename, 'rb') as file:
(self.confidence, self.convolve_max) = pickle.load(file)
except:
pass

128
analytics/server.py

@ -1,64 +1,64 @@
import config
import json
import logging
import sys
import asyncio
import services
from analytic_unit_worker import AnalyticUnitWorker
root = logging.getLogger()
logger = logging.getLogger('SERVER')
worker = None
server_service = None
data_service = None
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
ch.setFormatter(formatter)
root.addHandler(ch)
async def handle_task(text):
try:
task = json.loads(text)
logger.info("Command is OK")
await server_service.send_message(json.dumps({
'_taskId': task['_taskId'],
'task': task['type'],
'analyticUnitId': task['analyticUnitId'],
'status': "in progress"
}))
res = await worker.do_task(task)
res['_taskId'] = task['_taskId']
await server_service.send_message(json.dumps(res))
except Exception as e:
logger.error("Exception: '%s'" % str(e))
def init_services():
logger.info("Starting services...")
logger.info("Server...")
server_service = services.ServerService(handle_task)
logger.info("Ok")
logger.info("Data service...")
data_service = services.DataService(server_service)
logger.info("Ok")
return server_service, data_service
if __name__ == "__main__":
loop = asyncio.get_event_loop()
logger.info("Starting worker...")
worker = AnalyticUnitWorker()
logger.info("Ok")
server_service, data_service = init_services()
loop.run_until_complete(server_service.handle_loop())
import config
import json
import logging
import sys
import asyncio
import services
from analytic_unit_worker import AnalyticUnitWorker
root = logging.getLogger()
logger = logging.getLogger('SERVER')
worker = None
server_service = None
data_service = None
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
ch.setFormatter(formatter)
root.addHandler(ch)
async def handle_task(text):
try:
task = json.loads(text)
logger.info("Command is OK")
await server_service.send_message(json.dumps({
'_taskId': task['_taskId'],
'task': task['type'],
'analyticUnitId': task['analyticUnitId'],
'status': "in progress"
}))
res = await worker.do_task(task)
res['_taskId'] = task['_taskId']
await server_service.send_message(json.dumps(res))
except Exception as e:
logger.error("Exception: '%s'" % str(e))
def init_services():
logger.info("Starting services...")
logger.info("Server...")
server_service = services.ServerService(handle_task)
logger.info("Ok")
logger.info("Data service...")
data_service = services.DataService(server_service)
logger.info("Ok")
return server_service, data_service
if __name__ == "__main__":
loop = asyncio.get_event_loop()
logger.info("Starting worker...")
worker = AnalyticUnitWorker()
logger.info("Ok")
server_service, data_service = init_services()
loop.run_until_complete(server_service.handle_loop())

4
analytics/services/__init__.py

@ -1,2 +1,2 @@
from services.server_service import ServerService
from services.data_service import DataService
from services.server_service import ServerService
from services.data_service import DataService

16
analytics/services/data_service.py

@ -1,9 +1,9 @@
class DataService:
def __init__(self, server_service):
self.server_service = server_service
async def safe_file(filename, content):
pass
async def load_file(filename, content):
class DataService:
def __init__(self, server_service):
self.server_service = server_service
async def safe_file(filename, content):
pass
async def load_file(filename, content):
pass

84
analytics/services/server_service.py

@ -1,42 +1,42 @@
import config
import zmq
import zmq.asyncio
import logging
import asyncio
logger = logging.getLogger('SERVER_SERVICE')
class ServerService:
def __init__(self, on_message_handler):
self.on_message_handler = on_message_handler
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING)
self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind(config.ZEROMQ_CONNECTION_STRING)
async def handle_loop(self):
while True:
received_bytes = await self.socket.recv()
text = received_bytes.decode('utf-8')
if text == 'ping':
asyncio.ensure_future(self.__handle_ping())
else:
asyncio.ensure_future(self.__handle_message(text))
async def send_message(self, string):
await self.socket.send_string(string)
async def __handle_ping(self):
await self.socket.send(b'pong')
async def __handle_message(self, text):
try:
asyncio.ensure_future(self.on_message_handler(text))
except Exception as e:
logger.error("Exception: '%s'" % str(e))
import config
import zmq
import zmq.asyncio
import logging
import asyncio
logger = logging.getLogger('SERVER_SERVICE')
class ServerService:
def __init__(self, on_message_handler):
self.on_message_handler = on_message_handler
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING)
self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind(config.ZEROMQ_CONNECTION_STRING)
async def handle_loop(self):
while True:
received_bytes = await self.socket.recv()
text = received_bytes.decode('utf-8')
if text == 'ping':
asyncio.ensure_future(self.__handle_ping())
else:
asyncio.ensure_future(self.__handle_message(text))
async def send_message(self, string):
await self.socket.send_string(string)
async def __handle_ping(self):
await self.socket.send(b'pong')
async def __handle_message(self, text):
try:
asyncio.ensure_future(self.on_message_handler(text))
except Exception as e:
logger.error("Exception: '%s'" % str(e))

Loading…
Cancel
Save