Browse Source

Add tasks to queue when analytics is not ready #468

pull/1/head
rozetko 6 years ago
parent
commit
2a2fd54d3e
  1. 15
      server/src/controllers/analytics_controller.ts
  2. 4
      server/src/index.ts
  3. 19
      server/src/services/analytics_service.ts

15
server/src/controllers/analytics_controller.ts

@ -99,9 +99,6 @@ export function terminate() {
async function runTask(task: AnalyticsTask): Promise<TaskResult> { async function runTask(task: AnalyticsTask): Promise<TaskResult> {
return new Promise<TaskResult>((resolver: TaskResolver) => { return new Promise<TaskResult>((resolver: TaskResolver) => {
if(!analyticsService.ready) {
throw new Error(`Can't send task, analytics is not ready`);
}
taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult() taskResolvers.set(task.id, resolver); // it will be resolved in onTaskResult()
analyticsService.sendTask(task); // we dont wait for result here analyticsService.sendTask(task); // we dont wait for result here
}); });
@ -192,10 +189,6 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
throw new Error('Can`t start learning when it`s already started [' + id + ']'); throw new Error('Can`t start learning when it`s already started [' + id + ']');
} }
if(!isAnalyticReady()) {
throw new Error('Analytics is not ready');
}
let oldCache = await AnalyticUnitCache.findById(id); let oldCache = await AnalyticUnitCache.findById(id);
if(oldCache !== null) { if(oldCache !== null) {
oldCache = oldCache.data; oldCache = oldCache.data;
@ -247,10 +240,6 @@ export async function runDetect(id: AnalyticUnit.AnalyticUnitId) {
let previousLastDetectionTime: number = undefined; let previousLastDetectionTime: number = undefined;
try { try {
if(!isAnalyticReady()) {
throw new Error('Analytics is not ready');
}
let unit = await AnalyticUnit.findById(id); let unit = await AnalyticUnit.findById(id);
previousLastDetectionTime = unit.lastDetectionTime; previousLastDetectionTime = unit.lastDetectionTime;
let analyticUnitType = unit.type; let analyticUnitType = unit.type;
@ -365,6 +354,10 @@ async function processDetectionResult(analyticUnitId: AnalyticUnit.AnalyticUnitI
} }
export function getQueueLength() {
return analyticsService.queueLength;
}
export function isAnalyticReady(): boolean { export function isAnalyticReady(): boolean {
return analyticsService.ready; return analyticsService.ready;
} }

4
server/src/index.ts

@ -43,7 +43,6 @@ rootRouter.use('/threshold', thresholdRouter.routes(), thresholdRouter.allowedMe
rootRouter.get('/', async (ctx) => { rootRouter.get('/', async (ctx) => {
const activeWebhooks = await AnalyticsController.getActiveWebhooks(); const activeWebhooks = await AnalyticsController.getActiveWebhooks();
const activeWebhooksAmount = activeWebhooks.length;
ctx.response.body = { ctx.response.body = {
server: 'OK', server: 'OK',
@ -55,7 +54,8 @@ rootRouter.get('/', async (ctx) => {
zmqConectionString: ZMQ_CONNECTION_STRING, zmqConectionString: ZMQ_CONNECTION_STRING,
serverPort: HASTIC_PORT, serverPort: HASTIC_PORT,
git: GIT_INFO, git: GIT_INFO,
activeWebhooksAmount activeWebhooks: activeWebhooks.length,
queueLength: AnalyticsController.getQueueLength()
}; };
}); });

19
server/src/services/analytics_service.ts

@ -9,6 +9,7 @@ import * as zmq from 'zeromq';
import * as childProcess from 'child_process' import * as childProcess from 'child_process'
import * as fs from 'fs'; import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
import * as _ from 'lodash';
export class AnalyticsService { export class AnalyticsService {
@ -23,6 +24,7 @@ export class AnalyticsService {
private _isClosed = false; private _isClosed = false;
private _productionMode = false; private _productionMode = false;
private _inDocker = false; private _inDocker = false;
private _queue: AnalyticsTask[] = [];
constructor(private _onMessage: (message: AnalyticsMessage) => void) { constructor(private _onMessage: (message: AnalyticsMessage) => void) {
this._productionMode = config.PRODUCTION_MODE; this._productionMode = config.PRODUCTION_MODE;
@ -30,9 +32,15 @@ export class AnalyticsService {
this._init(); this._init();
} }
public async sendTask(task: AnalyticsTask): Promise<void> { public async sendTask(task: AnalyticsTask, fromQueue = false): Promise<void> {
if(!this._ready) { if(!this._ready) {
throw new Error('Analytics is not ready'); console.log('Analytics is not ready');
if(!fromQueue) {
// TODO: add to db?
this._queue.push(task);
console.log('Adding task to queue');
}
return;
} }
let method = task.type === AnalyticsTaskType.PUSH ? let method = task.type === AnalyticsTaskType.PUSH ?
AnalyticsMessageMethod.DATA : AnalyticsMessageMethod.TASK AnalyticsMessageMethod.DATA : AnalyticsMessageMethod.TASK
@ -167,6 +175,9 @@ export class AnalyticsService {
private _onAnalyticsUp() { private _onAnalyticsUp() {
const msg = 'Analytics is up'; const msg = 'Analytics is up';
for(let i in _.range(this._queue.length)) {
this.sendTask(this._queue.shift(), true);
}
console.log(msg); console.log(msg);
//this._alertService.sendMsg(msg, WebhookType.RECOVERY); //this._alertService.sendMsg(msg, WebhookType.RECOVERY);
} }
@ -225,4 +236,8 @@ export class AnalyticsService {
return filename; return filename;
} }
public get queueLength() {
return this._queue.length;
}
} }

Loading…
Cancel
Save