From 6420f16c3d0b2a2aee9e9e07a5dd3a1c6f639896 Mon Sep 17 00:00:00 2001 From: rozetko Date: Mon, 27 Aug 2018 20:11:53 +0300 Subject: [PATCH] Fix learning (#116) --- analytics/analytic_unit_manager.py | 32 +++++---- analytics/analytic_unit_worker.py | 3 +- analytics/data_preprocessor.py | 8 +-- .../general_detector/general_detector.py | 6 +- analytics/detectors/pattern_detector.py | 6 +- analytics/models/jump_model.py | 7 +- analytics/models/step_model.py | 12 ++-- analytics/server.py | 4 +- analytics/services/server_service.py | 4 +- analytics/utils/__init__.py | 21 ++++-- .../src/controllers/analytics_controller.ts | 71 ++++++++++--------- 11 files changed, 95 insertions(+), 79 deletions(-) diff --git a/analytics/analytic_unit_manager.py b/analytics/analytic_unit_manager.py index 252ca1c..99208d3 100644 --- a/analytics/analytic_unit_manager.py +++ b/analytics/analytic_unit_manager.py @@ -1,5 +1,6 @@ from typing import Dict -import logging +import pandas as pd +import logging, traceback import detectors from analytic_unit_worker import AnalyticUnitWorker @@ -9,7 +10,7 @@ logger = logging.getLogger('AnalyticUnitManager') analytic_unit_id = str analytic_workers: Dict[analytic_unit_id, AnalyticUnitWorker] = dict() -def get_detector(self, analytic_unit_type) -> detectors.Detector: +def get_detector(analytic_unit_type) -> detectors.Detector: if analytic_unit_type == 'GENERAL': detector = detectors.GeneralDetector() else: @@ -27,25 +28,32 @@ def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: async def handle_analytic_task(task): try: - worker = ensure_worker(task['analyticUnitId'], task['type']) payload = task['payload'] - if type == "PREDICT": + payload['data'] = pd.DataFrame(payload['data'], columns = ['timestamp', 'value']) + worker = ensure_worker(task['analyticUnitId'], payload['pattern']) + + result_payload = {} + print(task['type']) + if task['type'] == "PREDICT": result_payload = await worker.do_predict(analytic_unit_id, payload) - elif type == "LEARN": - result_payload = await worker.do_learn(analytic_unit_id, payload) + print(result_payload) + elif task['type'] == "LEARN": + await worker.do_learn(analytic_unit_id, payload) else: - raise ValueError('Unknown task type "%s"' % type) + raise ValueError('Unknown task type "%s"' % task['type']) + print(result_payload) + return { + 'status': 'SUCCESS', + 'payload': result_payload + } except Exception as e: + error_text = traceback.format_exc() logger.error("handle_analytic_task exception: '%s'" % error_text) # TODO: move result to a class which renders to json for messaging to analytics - result = { + return { 'status': "FAILED", 'error': str(e) } - return { - 'status': 'SUCCESS', - 'payload': result_payload - } diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index a1309c4..e4bf380 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -22,8 +22,7 @@ class AnalyticUnitWorker: pattern = payload['pattern'] data = payload['data'] # [time, value][] - detector = self.get_detector(analytic_unit_id, pattern) - segments, last_prediction_time = await detector.predict(data) + segments, last_prediction_time = await self.detector.predict(data) return { 'segments': segments, 'lastPredictionTime': last_prediction_time diff --git a/analytics/data_preprocessor.py b/analytics/data_preprocessor.py index b6d5b9c..49fedaa 100644 --- a/analytics/data_preprocessor.py +++ b/analytics/data_preprocessor.py @@ -74,8 +74,8 @@ class data_preprocessor: anomaly_index = None dataframe = self.data_provider.get_dataframe(None) for anomaly in anomalies: - start_time = pd.to_datetime(anomaly['start'], unit='ms') - finish_time = pd.to_datetime(anomaly['finish'], unit='ms') + start_time = pd.to_datetime(anomaly['from'], unit='ms') + finish_time = pd.to_datetime(anomaly['to'], unit='ms') current_index = (dataframe['timestamp'] >= start_time) & (dataframe['timestamp'] <= finish_time) if anomaly_index is not None: anomaly_index = (anomaly_index | current_index) @@ -97,12 +97,12 @@ class data_preprocessor: start = source_dataframe['timestamp'][start_frame_index] finish = source_dataframe['timestamp'][finish_frame_index] if cur_anomaly is None: - if len(anomalies) > 0 and start <= anomalies[len(anomalies) - 1]['finish']: + if len(anomalies) > 0 and start <= anomalies[len(anomalies) - 1]['to']: cur_anomaly = anomalies[len(anomalies) - 1] anomalies.pop() else: cur_anomaly = {'start': start, 'finish': finish} - cur_anomaly['finish'] = finish + cur_anomaly['to'] = finish elif cur_anomaly is not None: anomalies.append(cur_anomaly) cur_anomaly = None diff --git a/analytics/detectors/general_detector/general_detector.py b/analytics/detectors/general_detector/general_detector.py index 5225ef7..ab9adaf 100644 --- a/analytics/detectors/general_detector/general_detector.py +++ b/analytics/detectors/general_detector/general_detector.py @@ -1,7 +1,6 @@ from detectors.general_detector.supervised_algorithm import SupervisedAlgorithm from detectors import Detector import utils -# from grafana_data_provider import GrafanaDataProvider from data_preprocessor import data_preprocessor import pandas as pd import logging @@ -20,12 +19,10 @@ class GeneralDetector(Detector): def __init__(self): self.model = None - self.__load_model() - async def train(self, segments, data): + async def train(self, dataframe, segments): confidence = 0.02 - dataframe = data # make dataframae from array start_index, stop_index = 0, len(dataframe) if len(segments) > 0: confidence = 0.0 @@ -47,7 +44,6 @@ class GeneralDetector(Detector): else: last_prediction_time = 0 - self.__save_model() logger.info("Learning is finished for anomaly_name='%s'" % self.anomaly_name) return last_prediction_time diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index c1e0be4..6c5b730 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -1,8 +1,6 @@ import models import utils -# from grafana_data_provider import GrafanaDataProvider - import logging from urllib.parse import urlparse import os.path @@ -38,8 +36,8 @@ class PatternDetector(Detector): async def train(self, dataframe: pd.DataFrame, segments: list): # TODO: pass only part of dataframe that has segments - self.model.fit(dataframe, data, segments) - self.__save_model() + self.model.fit(dataframe, segments) + # TODO: save model after fit return 0 async def predict(self, data): diff --git a/analytics/models/jump_model.py b/analytics/models/jump_model.py index c8ec265..e0dd95b 100644 --- a/analytics/models/jump_model.py +++ b/analytics/models/jump_model.py @@ -32,9 +32,10 @@ class JumpModel(Model): convolve_list = [] jump_height_list = [] jump_length_list = [] + print(segments) for segment in segments: if segment['labeled']: - segment_data = data[segment['start'] : segment['finish'] + 1].reset_index(drop=True) + segment_data = data.loc[segment['from'] : segment['to'] + 1].reset_index(drop=True) segment_min = min(segment_data) segment_max = max(segment_data) confidences.append(0.20 * (segment_max - segment_min)) @@ -60,7 +61,7 @@ class JumpModel(Model): cen_ind = utils.intersection_segment(flat_segment, segment_median) #finds all interseprions with median #cen_ind = utils.find_ind_median(segment_median, flat_segment) jump_center = cen_ind[0] - segment_cent_index = jump_center - 5 + segment['start'] + segment_cent_index = jump_center - 5 + segment['from'] self.ijumps.append(segment_cent_index) labeled_drop = data[segment_cent_index - WINDOW_SIZE : segment_cent_index + WINDOW_SIZE] labeled_min = min(labeled_drop) @@ -136,4 +137,4 @@ class JumpModel(Model): for ijump in self.ijumps: segments.append(ijump) - return segments \ No newline at end of file + return segments diff --git a/analytics/models/step_model.py b/analytics/models/step_model.py index 828c717..78c8320 100644 --- a/analytics/models/step_model.py +++ b/analytics/models/step_model.py @@ -26,21 +26,21 @@ class StepModel(Model): for i in range(0,len(dataframe['value'])): dataframe.loc[i, 'value'] = dataframe.loc[i, 'value'] - d_min data = dataframe['value'] - new_data = [] - for val in data: - new_data.append(val) confidences = [] convolve_list = [] for segment in segments: if segment['labeled']: - segment_data = new_data[segment['start'] : segment['finish'] + 1] + segment_from_index = utils.timestamp_to_index(dataframe, segment['from']) + segment_to_index = utils.timestamp_to_index(dataframe, segment['to']) + + segment_data = data[segment_from_index : segment_to_index + 1] segment_min = min(segment_data) segment_max = max(segment_data) confidences.append( 0.4*(segment_max - segment_min)) flat_segment = segment_data #.rolling(window=5).mean() - segment_min_index = flat_segment.index(min(flat_segment)) - 5 + segment['start'] + segment_min_index = flat_segment.idxmin() - 5 self.idrops.append(segment_min_index) - labeled_drop = new_data[segment_min_index - 240 : segment_min_index + 240] + labeled_drop = data[segment_min_index - 240 : segment_min_index + 240] convolve = scipy.signal.fftconvolve(labeled_drop, labeled_drop) convolve_list.append(max(convolve)) diff --git a/analytics/server.py b/analytics/server.py index cd6a258..2b49700 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -3,6 +3,7 @@ import json import logging import sys import asyncio +import traceback import services from analytic_unit_manager import handle_analytic_task @@ -52,7 +53,8 @@ async def handle_task(task: object): await server_service.send_message(message) except Exception as e: - logger.error("handle_task Exception: '%s'" % str(e)) + error_text = traceback.format_exc() + logger.error("handle_task Exception: '%s'" % error_text) async def handle_message(message: services.ServerMessage): payload = None diff --git a/analytics/services/server_service.py b/analytics/services/server_service.py index 82caa40..9cd2911 100644 --- a/analytics/services/server_service.py +++ b/analytics/services/server_service.py @@ -6,6 +6,7 @@ import zmq.asyncio import logging import json import asyncio +import traceback logger = logging.getLogger('SERVER_SERVICE') @@ -88,4 +89,5 @@ class ServerService: asyncio.ensure_future(self.on_message_handler(message)) except Exception as e: - logger.error("__handle_message Exception: '%s'" % str(e)) + error_text = traceback.format_exc() + logger.error("__handle_message Exception: '%s'" % error_text) diff --git a/analytics/utils/__init__.py b/analytics/utils/__init__.py index ef8ee74..2b3d857 100644 --- a/analytics/utils/__init__.py +++ b/analytics/utils/__init__.py @@ -1,10 +1,11 @@ import numpy as np +import pandas as pd def is_intersect(target_segment, segments): for segment in segments: - start = max(segment['start'], target_segment[0]) - finish = min(segment['finish'], target_segment[1]) + start = max(segment['from'], target_segment[0]) + finish = min(segment['to'], target_segment[1]) if start <= finish: return True return False @@ -46,16 +47,16 @@ def find_steps(array, threshold): def anomalies_to_timestamp(anomalies): for anomaly in anomalies: - anomaly['start'] = int(anomaly['start'].timestamp() * 1000) - anomaly['finish'] = int(anomaly['finish'].timestamp() * 1000) + anomaly['from'] = int(anomaly['from'].timestamp() * 1000) + anomaly['to'] = int(anomaly['to'].timestamp() * 1000) return anomalies def segments_box(segments): max_time = 0 min_time = float("inf") for segment in segments: - min_time = min(min_time, segment['start']) - max_time = max(max_time, segment['finish']) + min_time = min(min_time, segment['from']) + max_time = max(max_time, segment['to']) min_time = pd.to_datetime(min_time, unit='ms') max_time = pd.to_datetime(max_time, unit='ms') return min_time, max_time @@ -156,3 +157,11 @@ def find_jump(data, height, lenght): if(data[i+x] > data[i] + height): j_list.append(i) return(j_list) + + +def timestamp_to_index(dataframe, timestamp): + data = dataframe['timestamp'] + + for i in range(len(data)): + if data[i] >= timestamp: + return i diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index d72967c..537a62b 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -142,14 +142,15 @@ async function processLearningResult(taskResult: any): Promise<{ if(taskResult.status !== 'SUCCESS') { return Promise.reject(taskResult.error); } - if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) { - throw new Error('Missing segments is result or it is corrupted: ' + taskResult); - } - if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) { - throw new Error( - 'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime - ); - } + console.log(taskResult) + // if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) { + // throw new Error('Missing segments in result or it is corrupted: ' + taskResult); + // } + // if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) { + // throw new Error( + // 'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime + // ); + // } return { lastPredictionTime: +taskResult.lastPredictionTime, @@ -159,34 +160,34 @@ async function processLearningResult(taskResult: any): Promise<{ } export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { - // let unit = await AnalyticUnit.findById(id); - // let pattern = unit.type; - // let task = { - // type: 'PREDICT', - // analyticUnitId: id, - // pattern, - // lastPredictionTime: unit.lastPredictionTime - // }; - // let result = await runTask(task); - - // if(result.status === 'FAILED') { - // return []; - // } - // // Merging segments - // let segments = getLabeledSegments(id); - // if(segments.length > 0 && result.segments.length > 0) { - // let lastOldSegment = segments[segments.length - 1]; - // let firstNewSegment = result.segments[0]; - - // if(firstNewSegment.start <= lastOldSegment.finish) { - // result.segments[0].start = lastOldSegment.start; - // removeSegments(id, [lastOldSegment.id]); - // } - // } + let unit = await AnalyticUnit.findById(id); + let pattern = unit.type; + + let task = new AnalyticsTask( + id, + AnalyticsTaskType.PREDICT, + { pattern, lastPredictionTime: unit.lastPredictionTime } + ); + let result = await runTask(task); + + if(result.status === 'FAILED') { + return []; + } + // Merging segments + let segments = await Segment.findMany(id, { labeled: true }); + if(segments.length > 0 && result.segments.length > 0) { + let lastOldSegment = segments[segments.length - 1]; + let firstNewSegment = result.segments[0]; + + if(firstNewSegment.from <= lastOldSegment.to) { + result.segments[0].from = lastOldSegment.from; + Segment.removeSegments([lastOldSegment.id]) + } + } - // insertSegments(id, result.segments, false); - // AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - // return result.segments; + Segment.insertSegments(result.segments); + AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); + return result.segments; } export function isAnalyticReady(): boolean {