Browse Source

Fix learning (#116)

pull/1/head
rozetko 6 years ago committed by Alexey Velikiy
parent
commit
6420f16c3d
  1. 32
      analytics/analytic_unit_manager.py
  2. 3
      analytics/analytic_unit_worker.py
  3. 8
      analytics/data_preprocessor.py
  4. 6
      analytics/detectors/general_detector/general_detector.py
  5. 6
      analytics/detectors/pattern_detector.py
  6. 7
      analytics/models/jump_model.py
  7. 12
      analytics/models/step_model.py
  8. 4
      analytics/server.py
  9. 4
      analytics/services/server_service.py
  10. 21
      analytics/utils/__init__.py
  11. 71
      server/src/controllers/analytics_controller.ts

32
analytics/analytic_unit_manager.py

@ -1,5 +1,6 @@
from typing import Dict
import logging
import pandas as pd
import logging, traceback
import detectors
from analytic_unit_worker import AnalyticUnitWorker
@ -9,7 +10,7 @@ logger = logging.getLogger('AnalyticUnitManager')
analytic_unit_id = str
analytic_workers: Dict[analytic_unit_id, AnalyticUnitWorker] = dict()
def get_detector(self, analytic_unit_type) -> detectors.Detector:
def get_detector(analytic_unit_type) -> detectors.Detector:
if analytic_unit_type == 'GENERAL':
detector = detectors.GeneralDetector()
else:
@ -27,25 +28,32 @@ def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
async def handle_analytic_task(task):
try:
worker = ensure_worker(task['analyticUnitId'], task['type'])
payload = task['payload']
if type == "PREDICT":
payload['data'] = pd.DataFrame(payload['data'], columns = ['timestamp', 'value'])
worker = ensure_worker(task['analyticUnitId'], payload['pattern'])
result_payload = {}
print(task['type'])
if task['type'] == "PREDICT":
result_payload = await worker.do_predict(analytic_unit_id, payload)
elif type == "LEARN":
result_payload = await worker.do_learn(analytic_unit_id, payload)
print(result_payload)
elif task['type'] == "LEARN":
await worker.do_learn(analytic_unit_id, payload)
else:
raise ValueError('Unknown task type "%s"' % type)
raise ValueError('Unknown task type "%s"' % task['type'])
print(result_payload)
return {
'status': 'SUCCESS',
'payload': result_payload
}
except Exception as e:
error_text = traceback.format_exc()
logger.error("handle_analytic_task exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics
result = {
return {
'status': "FAILED",
'error': str(e)
}
return {
'status': 'SUCCESS',
'payload': result_payload
}

3
analytics/analytic_unit_worker.py

@ -22,8 +22,7 @@ class AnalyticUnitWorker:
pattern = payload['pattern']
data = payload['data'] # [time, value][]
detector = self.get_detector(analytic_unit_id, pattern)
segments, last_prediction_time = await detector.predict(data)
segments, last_prediction_time = await self.detector.predict(data)
return {
'segments': segments,
'lastPredictionTime': last_prediction_time

8
analytics/data_preprocessor.py

@ -74,8 +74,8 @@ class data_preprocessor:
anomaly_index = None
dataframe = self.data_provider.get_dataframe(None)
for anomaly in anomalies:
start_time = pd.to_datetime(anomaly['start'], unit='ms')
finish_time = pd.to_datetime(anomaly['finish'], unit='ms')
start_time = pd.to_datetime(anomaly['from'], unit='ms')
finish_time = pd.to_datetime(anomaly['to'], unit='ms')
current_index = (dataframe['timestamp'] >= start_time) & (dataframe['timestamp'] <= finish_time)
if anomaly_index is not None:
anomaly_index = (anomaly_index | current_index)
@ -97,12 +97,12 @@ class data_preprocessor:
start = source_dataframe['timestamp'][start_frame_index]
finish = source_dataframe['timestamp'][finish_frame_index]
if cur_anomaly is None:
if len(anomalies) > 0 and start <= anomalies[len(anomalies) - 1]['finish']:
if len(anomalies) > 0 and start <= anomalies[len(anomalies) - 1]['to']:
cur_anomaly = anomalies[len(anomalies) - 1]
anomalies.pop()
else:
cur_anomaly = {'start': start, 'finish': finish}
cur_anomaly['finish'] = finish
cur_anomaly['to'] = finish
elif cur_anomaly is not None:
anomalies.append(cur_anomaly)
cur_anomaly = None

6
analytics/detectors/general_detector/general_detector.py

@ -1,7 +1,6 @@
from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm
from detectors import Detector
import utils
# from grafana_data_provider import GrafanaDataProvider
from data_preprocessor import data_preprocessor
import pandas as pd
import logging
@ -20,12 +19,10 @@ class GeneralDetector(Detector):
def __init__(self):
self.model = None
self.__load_model()
async def train(self, segments, data):
async def train(self, dataframe, segments):
confidence = 0.02
dataframe = data # make dataframae from array
start_index, stop_index = 0, len(dataframe)
if len(segments) > 0:
confidence = 0.0
@ -47,7 +44,6 @@ class GeneralDetector(Detector):
else:
last_prediction_time = 0
self.__save_model()
logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name)
return last_prediction_time

6
analytics/detectors/pattern_detector.py

@ -1,8 +1,6 @@
import models
import utils
# from grafana_data_provider import GrafanaDataProvider
import logging
from urllib.parse import urlparse
import os.path
@ -38,8 +36,8 @@ class PatternDetector(Detector):
async def train(self, dataframe: pd.DataFrame, segments: list):
# TODO: pass only part of dataframe that has segments
self.model.fit(dataframe, data, segments)
self.__save_model()
self.model.fit(dataframe, segments)
# TODO: save model after fit
return 0
async def predict(self, data):

7
analytics/models/jump_model.py

@ -32,9 +32,10 @@ class JumpModel(Model):
convolve_list = []
jump_height_list = []
jump_length_list = []
print(segments)
for segment in segments:
if segment['labeled']:
segment_data = data[segment['start'] : segment['finish'] + 1].reset_index(drop=True)
segment_data = data.loc[segment['from'] : segment['to'] + 1].reset_index(drop=True)
segment_min = min(segment_data)
segment_max = max(segment_data)
confidences.append(0.20 * (segment_max - segment_min))
@ -60,7 +61,7 @@ class JumpModel(Model):
cen_ind = utils.intersection_segment(flat_segment, segment_median) #finds all interseprions with median
#cen_ind = utils.find_ind_median(segment_median, flat_segment)
jump_center = cen_ind[0]
segment_cent_index = jump_center - 5 + segment['start']
segment_cent_index = jump_center - 5 + segment['from']
self.ijumps.append(segment_cent_index)
labeled_drop = data[segment_cent_index - WINDOW_SIZE : segment_cent_index + WINDOW_SIZE]
labeled_min = min(labeled_drop)
@ -136,4 +137,4 @@ class JumpModel(Model):
for ijump in self.ijumps:
segments.append(ijump)
return segments
return segments

12
analytics/models/step_model.py

@ -26,21 +26,21 @@ class StepModel(Model):
for i in range(0,len(dataframe['value'])):
dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min
data = dataframe['value']
new_data = []
for val in data:
new_data.append(val)
confidences = []
convolve_list = []
for segment in segments:
if segment['labeled']:
segment_data = new_data[segment['start'] : segment['finish'] + 1]
segment_from_index = utils.timestamp_to_index(dataframe, segment['from'])
segment_to_index = utils.timestamp_to_index(dataframe, segment['to'])
segment_data = data[segment_from_index : segment_to_index + 1]
segment_min = min(segment_data)
segment_max = max(segment_data)
confidences.append( 0.4*(segment_max - segment_min))
flat_segment = segment_data #.rolling(window=5).mean()
segment_min_index = flat_segment.index(min(flat_segment)) - 5 + segment['start']
segment_min_index = flat_segment.idxmin() - 5
self.idrops.append(segment_min_index)
labeled_drop = new_data[segment_min_index - 240 : segment_min_index + 240]
labeled_drop = data[segment_min_index - 240 : segment_min_index + 240]
convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop)
convolve_list.append(max(convolve))

4
analytics/server.py

@ -3,6 +3,7 @@ import json
import logging
import sys
import asyncio
import traceback
import services
from analytic_unit_manager import handle_analytic_task
@ -52,7 +53,8 @@ async def handle_task(task: object):
await server_service.send_message(message)
except Exception as e:
logger.error("handle_task Exception: '%s'" % str(e))
error_text = traceback.format_exc()
logger.error("handle_task Exception: '%s'" % error_text)
async def handle_message(message: services.ServerMessage):
payload = None

4
analytics/services/server_service.py

@ -6,6 +6,7 @@ import zmq.asyncio
import logging
import json
import asyncio
import traceback
logger = logging.getLogger('SERVER_SERVICE')
@ -88,4 +89,5 @@ class ServerService:
asyncio.ensure_future(self.on_message_handler(message))
except Exception as e:
logger.error("__handle_message Exception: '%s'" % str(e))
error_text = traceback.format_exc()
logger.error("__handle_message Exception: '%s'" % error_text)

21
analytics/utils/__init__.py

@ -1,10 +1,11 @@
import numpy as np
import pandas as pd
def is_intersect(target_segment, segments):
for segment in segments:
start = max(segment['start'], target_segment[0])
finish = min(segment['finish'], target_segment[1])
start = max(segment['from'], target_segment[0])
finish = min(segment['to'], target_segment[1])
if start <= finish:
return True
return False
@ -46,16 +47,16 @@ def find_steps(array, threshold):
def anomalies_to_timestamp(anomalies):
for anomaly in anomalies:
anomaly['start'] = int(anomaly['start'].timestamp() * 1000)
anomaly['finish'] = int(anomaly['finish'].timestamp() * 1000)
anomaly['from'] = int(anomaly['from'].timestamp() * 1000)
anomaly['to'] = int(anomaly['to'].timestamp() * 1000)
return anomalies
def segments_box(segments):
max_time = 0
min_time = float("inf")
for segment in segments:
min_time = min(min_time, segment['start'])
max_time = max(max_time, segment['finish'])
min_time = min(min_time, segment['from'])
max_time = max(max_time, segment['to'])
min_time = pd.to_datetime(min_time, unit='ms')
max_time = pd.to_datetime(max_time, unit='ms')
return min_time, max_time
@ -156,3 +157,11 @@ def find_jump(data, height, lenght):
if(data[i+x] > data[i] + height):
j_list.append(i)
return(j_list)
def timestamp_to_index(dataframe, timestamp):
data = dataframe['timestamp']
for i in range(len(data)):
if data[i] >= timestamp:
return i

71
server/src/controllers/analytics_controller.ts

@ -142,14 +142,15 @@ async function processLearningResult(taskResult: any): Promise<{
if(taskResult.status !== 'SUCCESS') {
return Promise.reject(taskResult.error);
}
if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) {
throw new Error('Missing segments is result or it is corrupted: ' + taskResult);
}
if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) {
throw new Error(
'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime
);
}
console.log(taskResult)
// if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) {
// throw new Error('Missing segments in result or it is corrupted: ' + taskResult);
// }
// if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) {
// throw new Error(
// 'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime
// );
// }
return {
lastPredictionTime: +taskResult.lastPredictionTime,
@ -159,34 +160,34 @@ async function processLearningResult(taskResult: any): Promise<{
}
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
// let unit = await AnalyticUnit.findById(id);
// let pattern = unit.type;
// let task = {
// type: 'PREDICT',
// analyticUnitId: id,
// pattern,
// lastPredictionTime: unit.lastPredictionTime
// };
// let result = await runTask(task);
// if(result.status === 'FAILED') {
// return [];
// }
// // Merging segments
// let segments = getLabeledSegments(id);
// if(segments.length > 0 && result.segments.length > 0) {
// let lastOldSegment = segments[segments.length - 1];
// let firstNewSegment = result.segments[0];
// if(firstNewSegment.start <= lastOldSegment.finish) {
// result.segments[0].start = lastOldSegment.start;
// removeSegments(id, [lastOldSegment.id]);
// }
// }
let unit = await AnalyticUnit.findById(id);
let pattern = unit.type;
let task = new AnalyticsTask(
id,
AnalyticsTaskType.PREDICT,
{ pattern, lastPredictionTime: unit.lastPredictionTime }
);
let result = await runTask(task);
if(result.status === 'FAILED') {
return [];
}
// Merging segments
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length > 0 && result.segments.length > 0) {
let lastOldSegment = segments[segments.length - 1];
let firstNewSegment = result.segments[0];
if(firstNewSegment.from <= lastOldSegment.to) {
result.segments[0].from = lastOldSegment.from;
Segment.removeSegments([lastOldSegment.id])
}
}
// insertSegments(id, result.segments, false);
// AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
// return result.segments;
Segment.insertSegments(result.segments);
AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
return result.segments;
}
export function isAnalyticReady(): boolean {

Loading…
Cancel
Save