import { AnalyticsTask } from '../models/analytics_task_model'; import { AnalyticsMessageMethod, AnalyticsMessage } from '../models/analytics_message_model'; import * as config from '../config'; import * as zmq from 'zeromq'; import * as childProcess from 'child_process' import * as fs from 'fs'; import * as path from 'path'; export class AnalyticsService { private _requester: any; private _ready: boolean = false; private _pingResponded = false; private _zmqConnectionString: string = null; private _ipcPath: string = null; private _analyticsPinger: NodeJS.Timer = null; private _isClosed = false; constructor(private _onMessage: (message: AnalyticsMessage) => void) { this._init(); } public async sendTask(task: AnalyticsTask): Promise { if(!this._ready) { return Promise.reject("Analytics is not ready"); } let message = new AnalyticsMessage( AnalyticsMessageMethod.TASK, task.toObject() ); return this.sendMessage(message); } public async sendMessage(message: AnalyticsMessage): Promise { let strMessage = JSON.stringify(message); return this.sendText(strMessage); } public async sendText(text: string): Promise { return new Promise((resolve, reject) => { this._requester.send(text, undefined, (err: any) => { if(err) { reject(err); } else { resolve(); } }); }); } public close() { this._isClosed = true; console.log('Terminating analytics service...'); clearInterval(this._analyticsPinger); if(this._ipcPath !== null) { console.log('Remove ipc path: ' + this._ipcPath); fs.unlinkSync(this._ipcPath); } this._requester.close(); console.log('Ok'); } public get ready(): boolean { return this._ready; } private async _init() { this._requester = zmq.socket('pair'); let productionMode = process.env.NODE_ENV !== 'development' && process.env.INSIDE_DOCKER !== 'true'; this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode if(productionMode) { this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; if(this._zmqConnectionString === null) { var createResult = await AnalyticsService.createIPCAddress(); this._zmqConnectionString = createResult.address; this._ipcPath = createResult.file; } } console.log("Binding to zmq... %s", this._zmqConnectionString); this._requester.connect(this._zmqConnectionString); this._requester.on("message", this._onAnalyticsMessage.bind(this)); console.log('Ok'); if(productionMode) { console.log('Creating analytics process...'); try { var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); } catch(error) { console.error('Can`t run analytics process: %s', error); return; } console.log('Ok, pid: %s', cp.pid); } console.log('Start analytics pinger...'); this._runAlalyticsPinger(); console.log('Ok'); } /** * Spawns analytics process. Reads process stderr and fails if it isn`t empty. * No need to stop the process later. * * @returns Creaded child process * @throws Process start error or first exception during python start */ private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { let cp: childProcess.ChildProcess; let cpOptions = { cwd: config.ANALYTICS_PATH, env: { ...process.env, ZMQ_CONNECTION_STRING: zmqConnectionString } }; if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/server/server'))) { console.log('dist/server/server'); cp = childProcess.spawn('dist/server/server', [], cpOptions); } else { console.log('python3 server.py'); // If compiled analytics script doesn't exist - fallback to regular python console.log(config.ANALYTICS_PATH); cp = childProcess.spawn('python3', ['server.py'], cpOptions); } if(cp.pid === undefined) { return new Promise((resolve, reject) => { cp.on('error', reject); }); } return new Promise((resolve, reject) => { var resolved = false; cp.stdout.on('data', () => { if(resolved) { return; } else { resolved = true; } resolve(cp); }); cp.stderr.on('data', function(data) { if(resolved) { return; } else { resolved = true; } reject(data); }); }); } private _onAnalyticsUp() { console.log('Analytics is up'); } private async _onAnalyticsDown() { console.log('Analytics is down'); if(process.env.NODE_ENV !== 'development' && process.env.INSIDE_DOCKER !== 'true') { await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); } } private _onAnalyticsMessage(data: any) { let text = data.toString(); if(text === 'PONG') { this._pingResponded = true; if(!this._ready) { this._ready = true; this._onAnalyticsUp(); } return; } let response; try { response = JSON.parse(text); } catch (e) { console.error("Can`t parse response from analytics as json:"); console.error(text); throw new Error('Unexpected response'); } this._onMessage(AnalyticsMessage.fromObject(response)); } private async _runAlalyticsPinger() { this._analyticsPinger = setInterval(() => { if(this._isClosed) { return; } if(!this._pingResponded && this._ready) { this._ready = false; this._onAnalyticsDown(); } this._pingResponded = false; // TODO: set life limit for this ping this.sendText('PING'); }, config.ANLYTICS_PING_INTERVAL); } private static async createIPCAddress(): Promise<{ address: string, file: string }> { let filename = `${process.pid}.ipc`; let p = path.join(config.ZMQ_IPC_PATH, filename); fs.writeFileSync(p, ''); return Promise.resolve({ address: 'ipc://' + p, file: p }); } }