diff --git a/analytics/server.py b/analytics/server.py index 7548bcc..580f1a9 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -33,18 +33,18 @@ async def handle_task(task: object): logger.info("Command is OK") - response_task_payload = { - '_taskId': task['_taskId'], + task_result_payload = { + '_id': task['_id'], 'task': task['type'], 'analyticUnitId': task['analyticUnitId'], 'status': "IN_PROGRESS" } - message = services.server_service.ServerMessage('TASK_RESULT', response_task_payload) + message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) await server_service.send_message(message) res = await worker.do_task(task) - res['_taskId'] = task['_taskId'] + res['_id'] = task['_id'] message = services.server_service.ServerMessage('TASK_RESULT', res) await server_service.send_message(message) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 6890bcb..9e7f618 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -1,23 +1,23 @@ import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model' -import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model'; +import { AnalyticsTask, AnalyticsTaskType, AnalyticsTaskId } from '../models/analytics_task_model'; import * as Segments from '../models/segment_model'; import * as AnalyticUnit from '../models/analytic_unit_model'; import { AnalyticsService } from '../services/analytics_service'; -const taskMap = new Map(); +const taskResolvers = new Map(); let analyticsService: AnalyticsService = undefined; function onTaskResult(taskResult: any) { - let taskId = taskResult._taskId; + let taskId = taskResult._id; let status = taskResult.status; if(status === 'SUCCESS' || status === 'FAILED') { - if(taskId in taskMap) { - let resolver: any = taskMap.get(taskId); + if(taskId in taskResolvers) { + let resolver: any = taskResolvers.get(taskId); resolver(taskResult); - taskMap.delete(taskId); + taskResolvers.delete(taskId); } } } @@ -58,11 +58,12 @@ async function runTask(task: AnalyticsTask): Promise { // }; // task._taskId = nextTaskId++; - // await analyticsService.sendTask(task); + // await ; - // return new Promise(resolve => { - // taskMap[task._taskId] = resolve; - // }) + return new Promise(resolve => { + taskResolvers[task.id] = resolve; + }) + .then(() => analyticsService.sendTask(task)); } export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { diff --git a/server/src/models/analytics_task_model.ts b/server/src/models/analytics_task_model.ts index 85af4a0..8f0f84f 100644 --- a/server/src/models/analytics_task_model.ts +++ b/server/src/models/analytics_task_model.ts @@ -12,11 +12,12 @@ export enum AnalyticsTaskType { }; export class AnalyticsTask { + constructor( public analyticUnitId: AnalyticUnitId, public type: AnalyticsTaskType, public payload?: any, - public id?: AnalyticsTaskId + private _id?: AnalyticsTaskId ) { if(analyticUnitId === undefined) { throw new Error('analyticUnitId is undefined'); @@ -24,9 +25,14 @@ export class AnalyticsTask { if(type === undefined || type === null) { throw new Error('type is undefined or null'); } - if(id === undefined) { - this.id = uid(UID_LENGTH); + + } + + public get id(): AnalyticsTaskId { + if(this._id === undefined) { + this._id = uid(UID_LENGTH); } + return this._id; } public toObject() {