diff --git a/analytics/server.py b/analytics/server.py index 6636546..87b063b 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -58,10 +58,8 @@ if __name__ == "__main__": while True: received_bytes = socket.recv() text = received_bytes.decode('utf-8') - logger.info('Got message %s' % text) if text == 'ping': handle_ping() - logger.info('Sent pong') else: handle_task(text) diff --git a/server/src/config.ts b/server/src/config.ts index d6fd28a..6aefcb9 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -18,6 +18,7 @@ export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002'); +export const ANLYTICS_PING_INTERVAL = 500; // ms function getConfigField(field, defaultVal?) { diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index d9752e0..0b2e44d 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,4 +1,4 @@ -import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING } from '../config' +import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING, ANLYTICS_PING_INTERVAL } from '../config' const zmq = require('zeromq'); @@ -11,10 +11,11 @@ export class AnalyticsService { private _requester: any; private _ready: boolean = false; + private _pingResponded = false; constructor(private _onResponse: (response: any) => void) { - this._initConnection(); + this._init(); } public async sendTask(msgObj: any): Promise { @@ -44,7 +45,7 @@ export class AnalyticsService { public get ready(): boolean { return this._ready; } - private async _initConnection() { + private async _init() { this._requester = zmq.socket('pair'); if(process.env.NODE_ENV !== 'development') { @@ -53,26 +54,13 @@ export class AnalyticsService { console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING); this._requester.connect(ZEROMQ_CONNECTION_STRING); - console.log('Ok'); - - console.log('Sending ping to analytics...'); - await this._connectToAnalytics(); - console.log('Ok') - this._requester.on("message", this._onAnalyticsMessage.bind(this)); + console.log('Ok'); - this._ready = true; - - } + console.log('Start analytics pinger...'); + this._runAlalyticsPinger(); + console.log('Ok'); - private async _connectToAnalytics() { - this.sendMessage('ping'); // we don`t await here - return new Promise(resolve => { - this._requester.once('message', (message) => { - console.log('Got message from analytics: ' + message); - resolve(); - }) - }); } private _runAnalyticsProcess() { @@ -88,10 +76,48 @@ export class AnalyticsService { console.log('ok'); } - private _onAnalyticsMessage(data) { - console.log(`analytics message: ${data}`); - let response = JSON.parse(data); + private _onAnalyticsUp() { + console.log('Analytics is up'); + } + + private _onAnalyticsDown() { + console.log('Analytics is down'); + if(process.env.NODE_ENV !== 'development') { + this._runAnalyticsProcess(); + } + } + + private _onAnalyticsMessage(text: any, error) { + if(text.toString() === 'pong') { + this._pingResponded = true; + if(!this._ready) { + this._ready = true; + this._onAnalyticsUp(); + } + return; + } + console.log(`analytics message: "${text}"`); + let response; + try { + response = JSON.parse(text); + } catch (e) { + console.error("Can`t parse response from analytics as json:"); + console.error(text); + throw new Error('Unexpected response'); + } this._onResponse(response); } + private async _runAlalyticsPinger() { + setInterval(() => { + if(!this._pingResponded && this._ready) { + this._ready = false; + this._onAnalyticsDown(); + } + this._pingResponded = false; + // TODO: set life limit for this ping + this.sendMessage('ping'); + }, ANLYTICS_PING_INTERVAL); + } + }