Browse Source

learning++ and better models

pull/1/head
Coin de Gamma 6 years ago
parent
commit
69c3de355b
  1. 73
      server/src/controllers/analytics_controller.ts
  2. 30
      server/src/models/analytics_message_model.ts
  3. 23
      server/src/models/analytics_task_model.ts
  4. 5
      server/src/models/segment_model.ts
  5. 23
      server/src/services/analytics_service.ts

73
server/src/controllers/analytics_controller.ts

@ -1,11 +1,11 @@
import { Task } from '../models/task_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'
import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import * as Segments from '../models/segment_model'; import * as Segments from '../models/segment_model';
import * as AnalyticUnit from '../models/analytic_unit_model'; import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; import { AnalyticsService } from '../services/analytics_service';
const taskMap = new Map<string, any>(); const taskMap = new Map<string, any>();
let nextTaskId = 0;
let analyticsService: AnalyticsService = undefined; let analyticsService: AnalyticsService = undefined;
@ -26,7 +26,7 @@ async function onMessage(message: AnalyticsMessage) {
let responsePayload = null; let responsePayload = null;
let resolvedMethod = false; let resolvedMethod = false;
if(message.method === 'TASK_RESULT') { if(message.method === AnalyticsMessageMethod.TASK_RESULT) {
onTaskResult(message.payload); onTaskResult(message.payload);
resolvedMethod = true; resolvedMethod = true;
} }
@ -50,7 +50,7 @@ export function terminate() {
analyticsService.close(); analyticsService.close();
} }
async function runTask(task: Task): Promise<any> { async function runTask(task: AnalyticsTask): Promise<any> {
// let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId);
// task.metric = { // task.metric = {
// datasource: anomaly.metric.datasource, // datasource: anomaly.metric.datasource,
@ -66,26 +66,59 @@ async function runTask(task: Task): Promise<any> {
} }
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let segments = Segments.findMany(id, { labeled: true }); let segments = await Segments.findMany(id, { labeled: true });
let segmentObjs = segments.map(s => s.toObject());
let analyticUnit = await AnalyticUnit.findById(id);
if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) {
throw new Error('Can`t starn learning when it`s already started [' + id + ']');
}
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
let unit = await AnalyticUnit.findById(id); let previousLastPredictionTime = analyticUnit.lastPredictionTime;
let pattern = unit.type;
let task = {
analyticUnitId: id,
type: 'LEARN',
pattern,
segments: segments
};
try {
let pattern = analyticUnit.type;
let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs }
);
let result = await runTask(task); let result = await runTask(task);
let { lastPredictionTime, segments } = await processLearningResult(result);
await Promise.all([
Segments.insertSegments(segments),
AnalyticUnit.setPredictionTime(id, lastPredictionTime)
]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch (err) {
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, err);
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime);
}
if (result.status === 'SUCCESS') {
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
insertSegments(id, result.segments, false);
AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
} else {
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, result.error);
} }
async function processLearningResult(taskResult: any): Promise<{
lastPredictionTime: number,
segments: Segments.Segment[]
}> {
if(taskResult.status !== 'SUCCESS') {
return Promise.reject(taskResult.error);
}
if(taskResult.segments === undefined || !Array.isArray(taskResult.segments)) {
throw new Error('Missing segments is result or it is corrupted: ' + taskResult);
}
if(taskResult.lastPredictionTime === undefined || isNaN(+taskResult.lastPredictionTime)) {
throw new Error(
'Missing lastPredictionTime is result or it is corrupted: ' + taskResult.lastPredictionTime
);
}
return {
lastPredictionTime: +taskResult.lastPredictionTime,
segments: taskResult.segments.map(Segments.Segment.fromObject)
};
} }
export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {

30
server/src/models/analytics_message_model.ts

@ -0,0 +1,30 @@
export enum AnalyticsMessageMethod {
TASK = 'TASK',
PING = 'PING',
TASK_RESULT = 'TASK_RESULT'
}
export class AnalyticsMessage {
public constructor(
public method: AnalyticsMessageMethod,
public payload?: string,
public requestId?: number
) {
}
public toObject() {
return {
method: this.method,
payload: this.payload,
requestId: this.requestId
};
}
static fromObject(obj: any): AnalyticsMessage {
if(obj.method === undefined) {
throw new Error('No method in obj:' + obj);
}
return new AnalyticsMessage(obj.method, obj.payload, obj.requestId);
}
}

23
server/src/models/task_model.ts → server/src/models/analytics_task_model.ts

@ -1,14 +1,18 @@
import { AnalyticUnitId } from "./analytic_unit_model"; import { AnalyticUnitId } from "./analytic_unit_model";
export type TaskId = string; export type AnalyticsTaskId = string;
export enum TaskType { LEARN = 'LEARN' }; export enum AnalyticsTaskType {
LEARN = 'LEARN',
PREDICT = 'PREDICT'
};
export class Task { export class AnalyticsTask {
constructor( constructor(
public analyticUnitId: AnalyticUnitId, public analyticUnitId: AnalyticUnitId,
public type: TaskType, public type: AnalyticsTaskType,
public id?: TaskId public payload?: any,
public id?: AnalyticsTaskId
) { ) {
if(analyticUnitId === undefined) { if(analyticUnitId === undefined) {
throw new Error('analyticUnitId is undefined'); throw new Error('analyticUnitId is undefined');
@ -21,17 +25,18 @@ export class Task {
public toObject() { public toObject() {
return { return {
_id: this.id, _id: this.id,
analyticUnitId: this.analyticUnitId analyticUnitId: this.analyticUnitId,
type: this.type
}; };
} }
static fromObject(obj: any): Task { static fromObject(obj: any): AnalyticsTask {
if(obj === undefined) { if(obj === undefined) {
throw new Error('obj is undefined'); throw new Error('obj is undefined');
} }
return new Task( return new AnalyticsTask(
obj.analyticUnitId, obj.analyticUnitId,
obj.type as TaskType, obj.type as AnalyticsTaskType,
obj._id, obj._id,
); );
} }

5
server/src/models/segment_model.ts

@ -12,7 +12,7 @@ export class Segment {
public analyticUnitId: AnalyticUnitId, public analyticUnitId: AnalyticUnitId,
public from: number, public from: number,
public to: number, public to: number,
public labeled: boolean, public labeled: boolean = false,
public id?: SegmentId public id?: SegmentId
) { ) {
if(analyticUnitId === undefined) { if(analyticUnitId === undefined) {
@ -30,9 +30,6 @@ export class Segment {
if(isNaN(to)) { if(isNaN(to)) {
throw new Error('to is NaN'); throw new Error('to is NaN');
} }
if(labeled === undefined) {
throw new Error('labeled is undefined');
}
} }
public toObject() { public toObject() {

23
server/src/services/analytics_service.ts

@ -1,3 +1,4 @@
import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'
import * as config from '../config'; import * as config from '../config';
const zmq = require('zeromq'); const zmq = require('zeromq');
@ -7,20 +8,6 @@ import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
export class AnalyticsMessage {
public constructor(public method: string, public payload?: string, public requestId?: number) {
}
static fromJSON(obj: any): AnalyticsMessage {
if(obj.method === undefined) {
throw new Error('No method in obj:' + obj);
}
return new AnalyticsMessage(obj.method, obj.payload, obj.requestId);
}
}
export class AnalyticsService { export class AnalyticsService {
private _requester: any; private _requester: any;
@ -40,7 +27,7 @@ export class AnalyticsService {
return Promise.reject("Analytics is not ready"); return Promise.reject("Analytics is not ready");
} }
let message = { let message = {
method: 'TASK', method: AnalyticsMessageMethod.TASK,
payload: taskObj payload: taskObj
} }
return this.sendMessage(message); return this.sendMessage(message);
@ -48,7 +35,7 @@ export class AnalyticsService {
public async sendMessage(message: AnalyticsMessage): Promise<void> { public async sendMessage(message: AnalyticsMessage): Promise<void> {
let strMessage = JSON.stringify(message); let strMessage = JSON.stringify(message);
if(message.method === 'PING') { if(message.method === AnalyticsMessageMethod.PING) {
strMessage = 'PING'; strMessage = 'PING';
} }
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
@ -200,7 +187,7 @@ export class AnalyticsService {
console.error(text); console.error(text);
throw new Error('Unexpected response'); throw new Error('Unexpected response');
} }
this._onMessage(AnalyticsMessage.fromJSON(response)); this._onMessage(AnalyticsMessage.fromObject(response));
} }
private async _runAlalyticsPinger() { private async _runAlalyticsPinger() {
@ -214,7 +201,7 @@ export class AnalyticsService {
} }
this._pingResponded = false; this._pingResponded = false;
// TODO: set life limit for this ping // TODO: set life limit for this ping
this.sendMessage({ method: 'PING' }); this.sendMessage({ method: AnalyticsMessageMethod.PING });
}, config.ANLYTICS_PING_INTERVAL); }, config.ANLYTICS_PING_INTERVAL);
} }

Loading…
Cancel
Save