diff --git a/analytics/analytic_unit_manager.py b/analytics/analytic_unit_manager.py index 2c52ce4..3da52a0 100644 --- a/analytics/analytic_unit_manager.py +++ b/analytics/analytic_unit_manager.py @@ -30,14 +30,16 @@ def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker: async def handle_analytic_task(task): try: payload = task['payload'] - payload['data'] = pd.DataFrame(payload['data'], columns = ['timestamp', 'value']) + worker = ensure_worker(task['analyticUnitId'], payload['pattern']) + data = pd.DataFrame(payload['data'], columns=['timestamp', 'value']) + data['timestamp'] = pd.to_datetime(data['timestamp']) result_payload = {} if task['type'] == "LEARN": - await worker.do_learn(AnalyticUnitId, payload) + await worker.do_learn(payload['segments'], data) elif task['type'] == "PREDICT": - result_payload = await worker.do_predict(AnalyticUnitId, payload) + result_payload = await worker.do_predict(data) else: raise ValueError('Unknown task type "%s"' % task['type']) return { diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index e4bf380..86add4f 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -1,6 +1,7 @@ import config import detectors import logging +import pandas as pd logger = logging.getLogger('AnalyticUnitWorker') @@ -12,16 +13,10 @@ class AnalyticUnitWorker: self.analytic_unit_id = analytic_unit_id self.detector = detector - async def do_learn(self, analytic_unit_id, payload) -> None: - pattern = payload['pattern'] - segments = payload['segments'] - data = payload['data'] # [time, value][] + async def do_learn(self, segments: list, data: pd.DataFrame) -> None: await self.detector.train(data, segments) - async def do_predict(self, analytic_unit_id, payload): - pattern = payload['pattern'] - data = payload['data'] # [time, value][] - + async def do_predict(self, data: pd.DataFrame): segments, last_prediction_time = await self.detector.predict(data) return { 'segments': segments, diff --git a/analytics/detectors/pattern_detector.py b/analytics/detectors/pattern_detector.py index 787c46d..2ec407b 100644 --- a/analytics/detectors/pattern_detector.py +++ b/analytics/detectors/pattern_detector.py @@ -36,24 +36,17 @@ class PatternDetector(Detector): # TODO: save model after fit return 0 - async def predict(self, data): + async def predict(self, dataframe: pd.DataFrame): + predicted_indexes = await self.model.predict(dataframe) - start_index = self.data_prov.get_upper_bound(last_prediction_time) - start_index = max(0, start_index - window_size) - dataframe = self.data_prov.get_data_range(start_index) - - predicted_indexes = self.model.predict(dataframe) - predicted_indexes = [(x, y) for (x, y) in predicted_indexes if x >= start_index and y >= start_index] - - predicted_times = self.data_prov.inverse_transform_indexes(predicted_indexes) segments = [] - for time_value in predicted_times: - ts1 = int(time_value[0].timestamp() * 1000) - ts2 = int(time_value[1].timestamp() * 1000) - segments.append({ - 'start': min(ts1, ts2), - 'finish': max(ts1, ts2) - }) + # for time_value in predicted_times: + # ts1 = int(time_value[0].timestamp() * 1000) + # ts2 = int(time_value[1].timestamp() * 1000) + # segments.append({ + # 'start': min(ts1, ts2), + # 'finish': max(ts1, ts2) + # }) last_dataframe_time = dataframe.iloc[-1]['timestamp'] last_prediction_time = int(last_dataframe_time.timestamp() * 1000) diff --git a/analytics/models/step_model.py b/analytics/models/step_model.py index 78c8320..cff3b56 100644 --- a/analytics/models/step_model.py +++ b/analytics/models/step_model.py @@ -6,7 +6,7 @@ from scipy.signal import argrelextrema import utils import numpy as np -import pickle +import pandas as pd class StepModel(Model): @@ -30,8 +30,8 @@ class StepModel(Model): convolve_list = [] for segment in segments: if segment['labeled']: - segment_from_index = utils.timestamp_to_index(dataframe, segment['from']) - segment_to_index = utils.timestamp_to_index(dataframe, segment['to']) + segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from'])) + segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to'])) segment_data = data[segment_from_index : segment_to_index + 1] segment_min = min(segment_data) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 537a62b..e390bc5 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -28,7 +28,7 @@ function onTaskResult(taskResult: TaskResult) { resolver(taskResult); taskResolvers.delete(id); } else { - throw new Error(`TaskResut [${id}] has no resolver`); + throw new Error(`TaskResult [${id}] has no resolver`); } } } @@ -124,7 +124,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { AnalyticUnit.setPredictionTime(id, lastPredictionTime) ]); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); - } catch (err) { let message = err.message || JSON.stringify(err); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message); @@ -139,7 +138,7 @@ async function processLearningResult(taskResult: any): Promise<{ lastPredictionTime: number, segments: Segment.Segment[] }> { - if(taskResult.status !== 'SUCCESS') { + if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) { return Promise.reject(taskResult.error); } console.log(taskResult) @@ -153,8 +152,8 @@ async function processLearningResult(taskResult: any): Promise<{ // } return { - lastPredictionTime: +taskResult.lastPredictionTime, - segments: taskResult.segments.map(Segment.Segment.fromObject) + lastPredictionTime: 0, + segments: [] }; } @@ -163,18 +162,27 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { let unit = await AnalyticUnit.findById(id); let pattern = unit.type; + let segments = await Segment.findMany(id, { labeled: true }); + if (segments.length < 2) { + throw new Error('Need at least 2 labeled segments'); + } + + let { from, to } = getQueryRangeForLearningBySegments(segments); + let data = await queryByMetric(unit.metric, unit.panelUrl, from, to); + if (data.length === 0) { + throw new Error('Empty data to predict on'); + } + let task = new AnalyticsTask( id, AnalyticsTaskType.PREDICT, - { pattern, lastPredictionTime: unit.lastPredictionTime } + { pattern, lastPredictionTime: unit.lastPredictionTime, data } ); let result = await runTask(task); - - if(result.status === 'FAILED') { + if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) { return []; } // Merging segments - let segments = await Segment.findMany(id, { labeled: true }); if(segments.length > 0 && result.segments.length > 0) { let lastOldSegment = segments[segments.length - 1]; let firstNewSegment = result.segments[0]; @@ -214,6 +222,6 @@ export async function updateSegments( ]); // TODO: move setting status somehow "inside" learning await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING); - runLearning(id); + runLearning(id).then(() => runPredict(id)); return { addedIds, removed }; }