Browse Source

Fix prediction (#118)

pull/1/head
rozetko 6 years ago committed by GitHub
parent
commit
e884015587
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      analytics/analytic_unit_manager.py
  2. 11
      analytics/analytic_unit_worker.py
  3. 25
      analytics/detectors/pattern_detector.py
  4. 6
      analytics/models/step_model.py
  5. 28
      server/src/controllers/analytics_controller.ts

8
analytics/analytic_unit_manager.py

@ -30,14 +30,16 @@ def ensure_worker(analytic_unit_id, analytic_unit_type) -> AnalyticUnitWorker:
async def handle_analytic_task(task):
try:
payload = task['payload']
payload['data'] = pd.DataFrame(payload['data'], columns = ['timestamp', 'value'])
worker = ensure_worker(task['analyticUnitId'], payload['pattern'])
data = pd.DataFrame(payload['data'], columns=['timestamp', 'value'])
data['timestamp'] = pd.to_datetime(data['timestamp'])
result_payload = {}
if task['type'] == "LEARN":
await worker.do_learn(AnalyticUnitId, payload)
await worker.do_learn(payload['segments'], data)
elif task['type'] == "PREDICT":
result_payload = await worker.do_predict(AnalyticUnitId, payload)
result_payload = await worker.do_predict(data)
else:
raise ValueError('Unknown task type "%s"' % task['type'])
return {

11
analytics/analytic_unit_worker.py

@ -1,6 +1,7 @@
import config
import detectors
import logging
import pandas as pd
logger = logging.getLogger('AnalyticUnitWorker')
@ -12,16 +13,10 @@ class AnalyticUnitWorker:
self.analytic_unit_id = analytic_unit_id
self.detector = detector
async def do_learn(self, analytic_unit_id, payload) -> None:
pattern = payload['pattern']
segments = payload['segments']
data = payload['data'] # [time, value][]
async def do_learn(self, segments: list, data: pd.DataFrame) -> None:
await self.detector.train(data, segments)
async def do_predict(self, analytic_unit_id, payload):
pattern = payload['pattern']
data = payload['data'] # [time, value][]
async def do_predict(self, data: pd.DataFrame):
segments, last_prediction_time = await self.detector.predict(data)
return {
'segments': segments,

25
analytics/detectors/pattern_detector.py

@ -36,24 +36,17 @@ class PatternDetector(Detector):
# TODO: save model after fit
return 0
async def predict(self, data):
async def predict(self, dataframe: pd.DataFrame):
predicted_indexes = await self.model.predict(dataframe)
start_index = self.data_prov.get_upper_bound(last_prediction_time)
start_index = max(0, start_index - window_size)
dataframe = self.data_prov.get_data_range(start_index)
predicted_indexes = self.model.predict(dataframe)
predicted_indexes = [(x, y) for (x, y) in predicted_indexes if x >= start_index and y >= start_index]
predicted_times = self.data_prov.inverse_transform_indexes(predicted_indexes)
segments = []
for time_value in predicted_times:
ts1 = int(time_value[0].timestamp() * 1000)
ts2 = int(time_value[1].timestamp() * 1000)
segments.append({
'start': min(ts1, ts2),
'finish': max(ts1, ts2)
})
# for time_value in predicted_times:
# ts1 = int(time_value[0].timestamp() * 1000)
# ts2 = int(time_value[1].timestamp() * 1000)
# segments.append({
# 'start': min(ts1, ts2),
# 'finish': max(ts1, ts2)
# })
last_dataframe_time = dataframe.iloc[-1]['timestamp']
last_prediction_time = int(last_dataframe_time.timestamp() * 1000)

6
analytics/models/step_model.py

@ -6,7 +6,7 @@ from scipy.signal import argrelextrema
import utils
import numpy as np
import pickle
import pandas as pd
class StepModel(Model):
@ -30,8 +30,8 @@ class StepModel(Model):
convolve_list = []
for segment in segments:
if segment['labeled']:
segment_from_index = utils.timestamp_to_index(dataframe, segment['from'])
segment_to_index = utils.timestamp_to_index(dataframe, segment['to'])
segment_from_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['from']))
segment_to_index = utils.timestamp_to_index(dataframe, pd.to_datetime(segment['to']))
segment_data = data[segment_from_index : segment_to_index + 1]
segment_min = min(segment_data)

28
server/src/controllers/analytics_controller.ts

@ -28,7 +28,7 @@ function onTaskResult(taskResult: TaskResult) {
resolver(taskResult);
taskResolvers.delete(id);
} else {
throw new Error(`TaskResut [${id}] has no resolver`);
throw new Error(`TaskResult [${id}] has no resolver`);
}
}
}
@ -124,7 +124,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
AnalyticUnit.setPredictionTime(id, lastPredictionTime)
]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch (err) {
let message = err.message || JSON.stringify(err);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, message);
@ -139,7 +138,7 @@ async function processLearningResult(taskResult: any): Promise<{
lastPredictionTime: number,
segments: Segment.Segment[]
}> {
if(taskResult.status !== 'SUCCESS') {
if(taskResult.status !== AnalyticUnit.AnalyticUnitStatus.SUCCESS) {
return Promise.reject(taskResult.error);
}
console.log(taskResult)
@ -153,8 +152,8 @@ async function processLearningResult(taskResult: any): Promise<{
// }
return {
lastPredictionTime: +taskResult.lastPredictionTime,
segments: taskResult.segments.map(Segment.Segment.fromObject)
lastPredictionTime: 0,
segments: []
};
}
@ -163,18 +162,27 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
let unit = await AnalyticUnit.findById(id);
let pattern = unit.type;
let segments = await Segment.findMany(id, { labeled: true });
if (segments.length < 2) {
throw new Error('Need at least 2 labeled segments');
}
let { from, to } = getQueryRangeForLearningBySegments(segments);
let data = await queryByMetric(unit.metric, unit.panelUrl, from, to);
if (data.length === 0) {
throw new Error('Empty data to predict on');
}
let task = new AnalyticsTask(
id,
AnalyticsTaskType.PREDICT,
{ pattern, lastPredictionTime: unit.lastPredictionTime }
{ pattern, lastPredictionTime: unit.lastPredictionTime, data }
);
let result = await runTask(task);
if(result.status === 'FAILED') {
if(result.status === AnalyticUnit.AnalyticUnitStatus.FAILED) {
return [];
}
// Merging segments
let segments = await Segment.findMany(id, { labeled: true });
if(segments.length > 0 && result.segments.length > 0) {
let lastOldSegment = segments[segments.length - 1];
let firstNewSegment = result.segments[0];
@ -214,6 +222,6 @@ export async function updateSegments(
]);
// TODO: move setting status somehow "inside" learning
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING);
runLearning(id);
runLearning(id).then(() => runPredict(id));
return { addedIds, removed };
}

Loading…
Cancel
Save