diff --git a/analytics/server.py b/analytics/server.py index 580f1a9..20db571 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -45,7 +45,7 @@ async def handle_task(task: object): res = await worker.do_task(task) res['_id'] = task['_id'] - + message = services.server_service.ServerMessage('TASK_RESULT', res) await server_service.send_message(message) @@ -56,7 +56,7 @@ async def handle_message(message: services.ServerMessage): payload = None if message.method == 'TASK': await handle_task(message.payload) - + def init_services(): logger.info("Starting services...") diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 9e7f618..80a259e 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -5,19 +5,27 @@ import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; -const taskResolvers = new Map(); +type TaskResult = any; +export type TaskResolver = (taskResult: TaskResult) => void; + +const taskResolvers = new Map(); let analyticsService: AnalyticsService = undefined; -function onTaskResult(taskResult: any) { - let taskId = taskResult._id; +function onTaskResult(taskResult: TaskResult) { + let id = taskResult._id; + if(id === undefined) { + throw new Error('id of task is undefined'); + } let status = taskResult.status; if(status === 'SUCCESS' || status === 'FAILED') { - if(taskId in taskResolvers) { - let resolver: any = taskResolvers.get(taskId); + if(id in taskResolvers) { + let resolver: any = taskResolvers.get(id); resolver(taskResult); - taskResolvers.delete(taskId); + taskResolvers.delete(id); + } else { + throw new Error(`TaskResut [${id}] has no resolver`); } } } @@ -50,51 +58,52 @@ export function terminate() { analyticsService.close(); } -async function runTask(task: AnalyticsTask): Promise { - // let anomaly: AnalyticUnit.AnalyticUnit = await AnalyticUnit.findById(task.analyticUnitId); +async function runTask(task: AnalyticsTask): Promise { // task.metric = { // datasource: anomaly.metric.datasource, // targets: anomaly.metric.targets.map(getTarget) // }; - - // task._taskId = nextTaskId++; - // await ; - - return new Promise(resolve => { - taskResolvers[task.id] = resolve; - }) - .then(() => analyticsService.sendTask(task)); + + return new Promise((resolver: TaskResolver) => { + taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult() + analyticsService.sendTask(task); // we dont wait for result here + }); } export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { - let segments = await Segments.findMany(id, { labeled: true }); - let segmentObjs = segments.map(s => s.toObject()); + + let previousLastPredictionTime: number = undefined; - let analyticUnit = await AnalyticUnit.findById(id); - if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { - throw new Error('Can`t starn learning when it`s already started [' + id + ']'); - } + try { + let segments = await Segments.findMany(id, { labeled: true }); + let segmentObjs = segments.map(s => s.toObject()); - AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); - let previousLastPredictionTime = analyticUnit.lastPredictionTime; + let analyticUnit = await AnalyticUnit.findById(id); + if(analyticUnit.status === AnalyticUnit.AnalyticUnitStatus.LEARNING) { + throw new Error('Can`t starn learning when it`s already started [' + id + ']'); + } + + AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.LEARNING); - try { let pattern = analyticUnit.type; let task = new AnalyticsTask( id, AnalyticsTaskType.LEARN, { pattern, segments: segmentObjs } ); let result = await runTask(task); - let { lastPredictionTime, segments } = await processLearningResult(result); + let { lastPredictionTime, segments: predictedSegments } = await processLearningResult(result); + previousLastPredictionTime = analyticUnit.lastPredictionTime; await Promise.all([ - Segments.insertSegments(segments), + Segments.insertSegments(predictedSegments), AnalyticUnit.setPredictionTime(id, lastPredictionTime) ]); await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.READY); } catch (err) { await AnalyticUnit.setStatus(id, AnalyticUnit.AnalyticUnitStatus.FAILED, err); - await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); + if(previousLastPredictionTime !== undefined) { + await AnalyticUnit.setPredictionTime(id, previousLastPredictionTime); + } } } diff --git a/server/src/models/metric_model.ts b/server/src/models/metric_model.ts index 2d97b2d..406418b 100644 --- a/server/src/models/metric_model.ts +++ b/server/src/models/metric_model.ts @@ -1,5 +1,11 @@ +export type MetricId = string; + export class Metric { - constructor(public datasource: string, public targets: any[]) { + constructor( + public datasource: string, + public targets: any[], + public id?: MetricId + ) { if(datasource === undefined) { throw new Error('datasource is undefined'); } @@ -14,7 +20,8 @@ export class Metric { public toObject() { return { datasource: this.datasource, - targets: this.targets + targets: this.targets, + _id: this.id }; } @@ -24,9 +31,8 @@ export class Metric { } return new Metric( obj.datasource, - obj.targets + obj.targets, + obj._id ); } } - -