Browse Source

_taskId -> id and runTask++

pull/1/head
Coin de Gamma 6 years ago
parent
commit
6395bcc26e
  1. 8
      analytics/server.py
  2. 21
      server/src/controllers/analytics_controller.ts
  3. 12
      server/src/models/analytics_task_model.ts

8
analytics/server.py

@ -33,18 +33,18 @@ async def handle_task(task: object):
logger.info("Command is OK")
response_task_payload = {
'_taskId': task['_taskId'],
task_result_payload = {
'_id': task['_id'],
'task': task['type'],
'analyticUnitId': task['analyticUnitId'],
'status': "IN_PROGRESS"
}
message = services.server_service.ServerMessage('TASK_RESULT', response_task_payload)
message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload)
await server_service.send_message(message)
res = await worker.do_task(task)
res['_taskId'] = task['_taskId']
res['_id'] = task['_id']
message = services.server_service.ServerMessage('TASK_RESULT', res)
await server_service.send_message(message)

21
server/src/controllers/analytics_controller.ts

@ -1,23 +1,23 @@
import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'
import { AnalyticsTask, AnalyticsTaskType } from '../models/analytics_task_model';
import { AnalyticsTask, AnalyticsTaskType, AnalyticsTaskId } from '../models/analytics_task_model';
import * as Segments from '../models/segment_model';
import * as AnalyticUnit from '../models/analytic_unit_model';
import { AnalyticsService } from '../services/analytics_service';
const taskMap = new Map<string, any>();
const taskResolvers = new Map<AnalyticsTaskId, any>();
let analyticsService: AnalyticsService = undefined;
function onTaskResult(taskResult: any) {
let taskId = taskResult._taskId;
let taskId = taskResult._id;
let status = taskResult.status;
if(status === 'SUCCESS' || status === 'FAILED') {
if(taskId in taskMap) {
let resolver: any = taskMap.get(taskId);
if(taskId in taskResolvers) {
let resolver: any = taskResolvers.get(taskId);
resolver(taskResult);
taskMap.delete(taskId);
taskResolvers.delete(taskId);
}
}
}
@ -58,11 +58,12 @@ async function runTask(task: AnalyticsTask): Promise<any> {
// };
// task._taskId = nextTaskId++;
// await analyticsService.sendTask(task);
// await ;
// return new Promise<void>(resolve => {
// taskMap[task._taskId] = resolve;
// })
return new Promise<void>(resolve => {
taskResolvers[task.id] = resolve;
})
.then(() => analyticsService.sendTask(task));
}
export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {

12
server/src/models/analytics_task_model.ts

@ -12,11 +12,12 @@ export enum AnalyticsTaskType {
};
export class AnalyticsTask {
constructor(
public analyticUnitId: AnalyticUnitId,
public type: AnalyticsTaskType,
public payload?: any,
public id?: AnalyticsTaskId
private _id?: AnalyticsTaskId
) {
if(analyticUnitId === undefined) {
throw new Error('analyticUnitId is undefined');
@ -24,9 +25,14 @@ export class AnalyticsTask {
if(type === undefined || type === null) {
throw new Error('type is undefined or null');
}
if(id === undefined) {
this.id = uid(UID_LENGTH);
}
public get id(): AnalyticsTaskId {
if(this._id === undefined) {
this._id = uid(UID_LENGTH);
}
return this._id;
}
public toObject() {

Loading…
Cancel
Save