diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index fb400a2..6890bcb 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -1,11 +1,11 @@ -import { Task } from '../models/task_model'; +import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model' +import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; import * as Segments from '../models/segment_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; -import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; +import { AnalyticsService } from '../services/analytics_service'; const taskMap = new Map(); -let nextTaskId = 0; let analyticsService: AnalyticsService = undefined; @@ -26,7 +26,7 @@ async function onMessage(message: AnalyticsMessage) { let responsePayload = null; let resolvedMethod = false; - if(message.method === 'TASK_RESULT') { + if(message.method === AnalyticsMessageMethod.TASK_RESULT) { onTaskResult(message.payload); resolvedMethod = true; } @@ -50,7 +50,7 @@ export function terminate() { analyticsService.close(); } -async function runTask(task: Task): Promise { +async function runTask(task: AnalyticsTask): Promise { // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); // task.metric = { // datasource: anomaly.metric.datasource, @@ -66,26 +66,59 @@ async function runTask(task: Task): Promise { } export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { - let segments = Segments.findMany(id, { labeled: true }); + let segments = await Segments.findMany(id, { labeled: true }); + let segmentObjs = segments.map(s => s.toObject()); + + let analyticUnit = await AnalyticUnit.findById(id); + if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { + throw new Error('Can`t starn learning when it`s already started [' + id + ']'); + } + AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); - let unit = await AnalyticUnit.findById(id); - let pattern = unit.type; - let task = { - analyticUnitId: id, - type: 'LEARN', - pattern, - segments: segments - }; + let previousLastPredictionTime = analyticUnit.lastPredictionTime; + + try { + let pattern = analyticUnit.type; + let task = new AnalyticsTask( + id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs } + ); + let result = await runTask(task); + let { lastPredictionTime, segments } = await processLearningResult(result); + + await Promise.all([ + Segments.insertSegments(segments), + AnalyticUnit.setPredictionTime(id, lastPredictionTime) + ]); + await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); + + } catch (err) { + await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, err); + await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); + } - let result = await runTask(task); +} - if (result.status === 'SUCCESS') { - AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); - insertSegments(id, result.segments, false); - AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); - } else { - AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, result.error); +async function processLearningResult(taskResult: any): Promise<{ + lastPredictionTime: number, + segments: Segments.Segment[] +}> { + if(taskResult.status !== 'SUCCESS') { + return Promise.reject(taskResult.error); } + if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) { + throw new Error('Missing segments is result or it is corrupted: ' + taskResult); + } + if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) { + throw new Error( + 'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime + ); + } + + return { + lastPredictionTime: +taskResult.lastPredictionTime, + segments: taskResult.segments.map(Segments.Segment.fromObject) + }; + } export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { diff --git a/server/src/models/analytics_message_model.ts b/server/src/models/analytics_message_model.ts new file mode 100644 index 0000000..12ebe2d --- /dev/null +++ b/server/src/models/analytics_message_model.ts @@ -0,0 +1,30 @@ +export enum AnalyticsMessageMethod { + TASK = 'TASK', + PING = 'PING', + TASK_RESULT = 'TASK_RESULT' +} + +export class AnalyticsMessage { + public constructor( + public method: AnalyticsMessageMethod, + public payload?: string, + public requestId?: number + ) { + + } + + public toObject() { + return { + method: this.method, + payload: this.payload, + requestId: this.requestId + }; + } + + static fromObject(obj: any): AnalyticsMessage { + if(obj.method === undefined) { + throw new Error('No method in obj:' + obj); + } + return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); + } +} \ No newline at end of file diff --git a/server/src/models/task_model.ts b/server/src/models/analytics_task_model.ts similarity index 55% rename from server/src/models/task_model.ts rename to server/src/models/analytics_task_model.ts index bd01f5a..97bcd89 100644 --- a/server/src/models/task_model.ts +++ b/server/src/models/analytics_task_model.ts @@ -1,14 +1,18 @@ import { AnalyticUnitId } from "./analytic_unit_model"; -export type TaskId = string; -export enum TaskType { LEARN = 'LEARN' }; +export type AnalyticsTaskId = string; +export enum AnalyticsTaskType { + LEARN = 'LEARN', + PREDICT = 'PREDICT' +}; -export class Task { +export class AnalyticsTask { constructor( public analyticUnitId: AnalyticUnitId, - public type: TaskType, - public id?: TaskId + public type: AnalyticsTaskType, + public payload?: any, + public id?: AnalyticsTaskId ) { if(analyticUnitId === undefined) { throw new Error('analyticUnitId is undefined'); @@ -21,17 +25,18 @@ export class Task { public toObject() { return { _id: this.id, - analyticUnitId: this.analyticUnitId + analyticUnitId: this.analyticUnitId, + type: this.type }; } - static fromObject(obj: any): Task { + static fromObject(obj: any): AnalyticsTask { if(obj === undefined) { throw new Error('obj is undefined'); } - return new Task( + return new AnalyticsTask( obj.analyticUnitId, - obj.type as TaskType, + obj.type as AnalyticsTaskType, obj._id, ); } diff --git a/server/src/models/segment_model.ts b/server/src/models/segment_model.ts index d78435a..56c5330 100644 --- a/server/src/models/segment_model.ts +++ b/server/src/models/segment_model.ts @@ -12,7 +12,7 @@ export class Segment { public analyticUnitId: AnalyticUnitId, public from: number, public to: number, - public labeled: boolean, + public labeled: boolean = false, public id?: SegmentId ) { if(analyticUnitId === undefined) { @@ -30,9 +30,6 @@ export class Segment { if(isNaN(to)) { throw new Error('to is NaN'); } - if(labeled === undefined) { - throw new Error('labeled is undefined'); - } } public toObject() { diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index c38efea..bab4851 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,3 +1,4 @@ +import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model' import * as config from '../config'; const zmq = require('zeromq'); @@ -7,20 +8,6 @@ import * as fs from 'fs'; import * as path from 'path'; -export class AnalyticsMessage { - public constructor(public method: string, public payload?: string, public requestId?: number) { - - } - - static fromJSON(obj: any): AnalyticsMessage { - if(obj.method === undefined) { - throw new Error('No method in obj:' + obj); - } - return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); - } -} - - export class AnalyticsService { private _requester: any; @@ -40,7 +27,7 @@ export class AnalyticsService { return Promise.reject("Analytics is not ready"); } let message = { - method: 'TASK', + method: AnalyticsMessageMethod.TASK, payload: taskObj } return this.sendMessage(message); @@ -48,7 +35,7 @@ export class AnalyticsService { public async sendMessage(message: AnalyticsMessage): Promise { let strMessage = JSON.stringify(message); - if(message.method === 'PING') { + if(message.method === AnalyticsMessageMethod.PING) { strMessage = 'PING'; } return new Promise((resolve, reject) => { @@ -200,7 +187,7 @@ export class AnalyticsService { console.error(text); throw new Error('Unexpected response'); } - this._onMessage(AnalyticsMessage.fromJSON(response)); + this._onMessage(AnalyticsMessage.fromObject(response)); } private async _runAlalyticsPinger() { @@ -214,7 +201,7 @@ export class AnalyticsService { } this._pingResponded = false; // TODO: set life limit for this ping - this.sendMessage({ method: 'PING' }); + this.sendMessage({ method: AnalyticsMessageMethod.PING }); }, config.ANLYTICS_PING_INTERVAL); }