You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
6.2 KiB

import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model';
import { AnalyticsTask, AnalyticsTaskType, AnalyticsTaskId } from '../models/analytics_task_model';
import * as Segment from '../models/segment_model';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service';
import { queryByMetric } from '../services/grafana_service';
6 years ago
type TaskResult = any;
export type TaskResolver = (taskResult: TaskResult) => void;
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
6 years ago
let analyticsService: AnalyticsService = undefined;
function onTaskResult(taskResult: TaskResult) {
let id = taskResult._id;
if(id === undefined) {
throw new Error('id of task is undefined');
}
let status = taskResult.status;
if(status === 'SUCCESS' || status === 'FAILED') {
if(taskResolvers.has(id)) {
let resolver: any = taskResolvers.get(id);
resolver(taskResult);
taskResolvers.delete(id);
} else {
throw new Error(`TaskResut [${id}] has no resolver`);
6 years ago
}
}
}
async function onMessage(message: AnalyticsMessage) {
let responsePayload = null;
let resolvedMethod = false;
if(message.method === AnalyticsMessageMethod.TASK_RESULT) {
onTaskResult(message.payload);
resolvedMethod = true;
}
if(!resolvedMethod) {
throw new TypeError('Unknown method ' + message.method);
}
// TODO: catch exception and send error in this case
if(message.requestId !== undefined) {
message.payload = responsePayload;
analyticsService.sendMessage(message);
}
}
export function init() {
analyticsService = new AnalyticsService(onMessage);
}
export function terminate() {
analyticsService.close();
}
async function runTask(task: AnalyticsTask): Promise<TaskResult> {
// task.metric = {
// datasource: anomaly.metric.datasource,
// targets: anomaly.metric.targets.map(getTarget)
// };
return new Promise<TaskResult>((resolver: TaskResolver) => {
taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult()
analyticsService.sendTask(task); // we dont wait for result here
});
6 years ago
}
6 years ago
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let previousLastPredictionTime: number = undefined;
try {
let segments = await Segment.findMany(id, { labeled: true });
let analyticUnit = await AnalyticUnit.findById(id);
let segmentObjs = segments.map(s => s.toObject());
let data = await queryByMetric(analyticUnit.metric, analyticUnit.panelUrl);
if(data.length === 0) {
throw new Error('Empty data to learn on');
}
if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) {
throw new Error('Can`t starn learning when it`s already started [' + id + ']');
}
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let pattern = analyticUnit.type;
let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs }
);
let result = await runTask(task);
let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result);
previousLastPredictionTime = analyticUnit.lastPredictionTime;
await Promise.all([
Segment.insertSegments(predictedSegments),
AnalyticUnit.setPredictionTime(id, lastPredictionTime)
]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch (err) {
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, err);
if(previousLastPredictionTime !== undefined) {
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime);
}
}
}
async function processLearningResult(taskResult: any): Promise<{
lastPredictionTime: number,
segments: Segment.Segment[]
}> {
if(taskResult.status !== 'SUCCESS') {
return Promise.reject(taskResult.error);
}
if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) {
throw new Error('Missing segments is result or it is corrupted: ' + taskResult);
}
if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) {
throw new Error(
'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime
);
}
return {
lastPredictionTime: +taskResult.lastPredictionTime,
segments: taskResult.segments.map(Segment.Segment.fromObject)
};
6 years ago
}
6 years ago
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
// let unit = await AnalyticUnit.findById(id);
// let pattern = unit.type;
// let task = {
// type: 'PREDICT',
// analyticUnitId: id,
// pattern,
// lastPredictionTime: unit.lastPredictionTime
// };
// let result = await runTask(task);
// if(result.status === 'FAILED') {
// return [];
// }
// // Merging segments
// let segments = getLabeledSegments(id);
// if(segments.length > 0 && result.segments.length > 0) {
// let lastOldSegment = segments[segments.length - 1];
// let firstNewSegment = result.segments[0];
// if(firstNewSegment.start <= lastOldSegment.finish) {
// result.segments[0].start = lastOldSegment.start;
// removeSegments(id, [lastOldSegment.id]);
// }
// }
// insertSegments(id, result.segments, false);
// AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
// return result.segments;
}
export function isAnalyticReady(): boolean {
return analyticsService.ready;
}
export async function createAnalyticUnitFromObject(obj: any): Promise<AnalyticUnit.AnalyticUnitId> {
if(obj.datasource !== undefined) {
obj.metric.datasource = obj.datasource;
}
6 years ago
let unit: AnalyticUnit.AnalyticUnit = AnalyticUnit.AnalyticUnit.fromObject(obj);
let id = await AnalyticUnit.create(unit);
return id;
}
export async function updateSegments(
id: AnalyticUnit.AnalyticUnitId,
segmentsToInsert: Segment.Segment[],
removedIds: Segment.SegmentId[]
) {
let [addedIds, removed] = await Promise.all([
Segment.insertSegments(segmentsToInsert),
Segment.removeSegments(removedIds)
]);
// TODO: move setting status somehow "inside" learning
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.PENDING);
runLearning(id);
return { addedIds, removed };
}