From 2a2fd54d3e8ed2dac1941cab0438f7b926f29e57 Mon Sep 17 00:00:00 2001 From: rozetko Date: Fri, 15 Mar 2019 20:05:10 +0300 Subject: [PATCH] Add tasks to queue when analytics is not ready #468 --- .../src/controllers/analytics_controller.ts | 15 ++++----------- server/src/index.ts | 4 ++-- server/src/services/analytics_service.ts | 19 +++++++++++++++++-- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 5b553da..6a14285 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -99,9 +99,6 @@ export function terminate() { async function runTask(task: AnalyticsTask): Promise { return new Promise((resolver: TaskResolver) => { - if(!analyticsService.ready) { - throw new Error(`Can't send task, analytics is not ready`); - } taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult() analyticsService.sendTask(task); // we dont wait for result here }); @@ -192,10 +189,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { throw new Error('Can`t start learning when it`s already started [' + id + ']'); } - if(!isAnalyticReady()) { - throw new Error('Analytics is not ready'); - } - let oldCache = await AnalyticUnitCache.findById(id); if(oldCache !== null) { oldCache = oldCache.data; @@ -247,10 +240,6 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) { let previousLastDetectionTime: number = undefined; try { - if(!isAnalyticReady()) { - throw new Error('Analytics is not ready'); - } - let unit = await AnalyticUnit.findById(id); previousLastDetectionTime = unit.lastDetectionTime; let analyticUnitType = unit.type; @@ -365,6 +354,10 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI } +export function getQueueLength() { + return analyticsService.queueLength; +} + export function isAnalyticReady(): boolean { return analyticsService.ready; } diff --git a/server/src/index.ts b/server/src/index.ts index bce00de..970400f 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -43,7 +43,6 @@ rootRouter.use('/threshold', thresholdRouter.routes(), thresholdRouter.allowedMe rootRouter.get('/', async (ctx) => { const activeWebhooks = await AnalyticsController.getActiveWebhooks(); - const activeWebhooksAmount = activeWebhooks.length; ctx.response.body = { server: 'OK', @@ -55,7 +54,8 @@ rootRouter.get('/', async (ctx) => { zmqConectionString: ZMQ_CONNECTION_STRING, serverPort: HASTIC_PORT, git: GIT_INFO, - activeWebhooksAmount + activeWebhooks: activeWebhooks.length, + queueLength: AnalyticsController.getQueueLength() }; }); diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index c352379..dcb6fbd 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -9,6 +9,7 @@ import * as zmq from 'zeromq'; import * as childProcess from 'child_process' import * as fs from 'fs'; import * as path from 'path'; +import * as _ from 'lodash'; export class AnalyticsService { @@ -23,6 +24,7 @@ export class AnalyticsService { private _isClosed = false; private _productionMode = false; private _inDocker = false; + private _queue: AnalyticsTask[] = []; constructor(private _onMessage: (message: AnalyticsMessage) => void) { this._productionMode = config.PRODUCTION_MODE; @@ -30,9 +32,15 @@ export class AnalyticsService { this._init(); } - public async sendTask(task: AnalyticsTask): Promise { + public async sendTask(task: AnalyticsTask, fromQueue = false): Promise { if(!this._ready) { - throw new Error('Analytics is not ready'); + console.log('Analytics is not ready'); + if(!fromQueue) { + // TODO: add to db? + this._queue.push(task); + console.log('Adding task to queue'); + } + return; } let method = task.type === AnalyticsTaskType.PUSH ? AnalyticsMessageMethod.DATA : AnalyticsMessageMethod.TASK @@ -167,6 +175,9 @@ export class AnalyticsService { private _onAnalyticsUp() { const msg = 'Analytics is up'; + for(let i in _.range(this._queue.length)) { + this.sendTask(this._queue.shift(), true); + } console.log(msg); //this._alertService.sendMsg(msg, WebhookType.RECOVERY); } @@ -225,4 +236,8 @@ export class AnalyticsService { return filename; } + public get queueLength() { + return this._queue.length; + } + }