Browse Source

taskresolver type + metric._id

pull/1/head
Coin de Gamma 6 years ago
parent
commit
13df2b4349
  1. 4
      analytics/server.py
  2. 65
      server/src/controllers/analytics_controller.ts
  3. 16
      server/src/models/metric_model.ts

4
analytics/server.py

@ -45,7 +45,7 @@ async def handle_task(task: object):
res = await worker.do_task(task) res = await worker.do_task(task)
res['_id'] = task['_id'] res['_id'] = task['_id']
message = services.server_service.ServerMessage('TASK_RESULT', res) message = services.server_service.ServerMessage('TASK_RESULT', res)
await server_service.send_message(message) await server_service.send_message(message)
@ -56,7 +56,7 @@ async def handle_message(message: services.ServerMessage):
payload = None payload = None
if message.method == 'TASK': if message.method == 'TASK':
await handle_task(message.payload) await handle_task(message.payload)
def init_services(): def init_services():
logger.info("Starting services...") logger.info("Starting services...")

65
server/src/controllers/analytics_controller.ts

@ -5,19 +5,27 @@ import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service'; import { AnalyticsService } from '../services/analytics_service';
const taskResolvers = new Map<AnalyticsTaskId, any>(); type TaskResult = any;
export type TaskResolver = (taskResult: TaskResult) => void;
const taskResolvers = new Map<AnalyticsTaskId, TaskResolver>();
let analyticsService: AnalyticsService = undefined; let analyticsService: AnalyticsService = undefined;
function onTaskResult(taskResult: any) { function onTaskResult(taskResult: TaskResult) {
let taskId = taskResult._id; let id = taskResult._id;
if(id === undefined) {
throw new Error('id of task is undefined');
}
let status = taskResult.status; let status = taskResult.status;
if(status === 'SUCCESS' || status === 'FAILED') { if(status === 'SUCCESS' || status === 'FAILED') {
if(taskId in taskResolvers) { if(id in taskResolvers) {
let resolver: any = taskResolvers.get(taskId); let resolver: any = taskResolvers.get(id);
resolver(taskResult); resolver(taskResult);
taskResolvers.delete(taskId); taskResolvers.delete(id);
} else {
throw new Error(`TaskResut [${id}] has no resolver`);
} }
} }
} }
@ -50,51 +58,52 @@ export function terminate() {
analyticsService.close(); analyticsService.close();
} }
async function runTask(task: AnalyticsTask): Promise<any> { async function runTask(task: AnalyticsTask): Promise<TaskResult> {
// let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId);
// task.metric = { // task.metric = {
// datasource: anomaly.metric.datasource, // datasource: anomaly.metric.datasource,
// targets: anomaly.metric.targets.map(getTarget) // targets: anomaly.metric.targets.map(getTarget)
// }; // };
// task._taskId = nextTaskId++; return new Promise<TaskResult>((resolver: TaskResolver) => {
// await ; taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult()
analyticsService.sendTask(task); // we dont wait for result here
return new Promise<void>(resolve => { });
taskResolvers[task.id] = resolve;
})
.then(() => analyticsService.sendTask(task));
} }
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let segments = await Segments.findMany(id, { labeled: true });
let segmentObjs = segments.map(s => s.toObject()); let previousLastPredictionTime: number = undefined;
let analyticUnit = await AnalyticUnit.findById(id); try {
if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { let segments = await Segments.findMany(id, { labeled: true });
throw new Error('Can`t starn learning when it`s already started [' + id + ']'); let segmentObjs = segments.map(s => s.toObject());
}
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); let analyticUnit = await AnalyticUnit.findById(id);
let previousLastPredictionTime = analyticUnit.lastPredictionTime; if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) {
throw new Error('Can`t starn learning when it`s already started [' + id + ']');
}
AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING);
try {
let pattern = analyticUnit.type; let pattern = analyticUnit.type;
let task = new AnalyticsTask( let task = new AnalyticsTask(
id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs } id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs }
); );
let result = await runTask(task); let result = await runTask(task);
let { lastPredictionTime, segments } = await processLearningResult(result); let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result);
previousLastPredictionTime = analyticUnit.lastPredictionTime;
await Promise.all([ await Promise.all([
Segments.insertSegments(segments), Segments.insertSegments(predictedSegments),
AnalyticUnit.setPredictionTime(id, lastPredictionTime) AnalyticUnit.setPredictionTime(id, lastPredictionTime)
]); ]);
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY);
} catch (err) { } catch (err) {
await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, err); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, err);
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); if(previousLastPredictionTime !== undefined) {
await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime);
}
} }
} }

16
server/src/models/metric_model.ts

@ -1,5 +1,11 @@
export type MetricId = string;
export class Metric { export class Metric {
constructor(public datasource: string, public targets: any[]) { constructor(
public datasource: string,
public targets: any[],
public id?: MetricId
) {
if(datasource === undefined) { if(datasource === undefined) {
throw new Error('datasource is undefined'); throw new Error('datasource is undefined');
} }
@ -14,7 +20,8 @@ export class Metric {
public toObject() { public toObject() {
return { return {
datasource: this.datasource, datasource: this.datasource,
targets: this.targets targets: this.targets,
_id: this.id
}; };
} }
@ -24,9 +31,8 @@ export class Metric {
} }
return new Metric( return new Metric(
obj.datasource, obj.datasource,
obj.targets obj.targets,
obj._id
); );
} }
} }

Loading…
Cancel
Save