Browse Source

Server/Analytics - Client/Server switch #75 & #74 & #72

pull/1/head
Alexey Velikiy 6 years ago
parent
commit
02160209e2
  1. 2
      analytics/server.py
  2. 1
      server/src/config.ts
  3. 72
      server/src/services/analytics_service.ts

2
analytics/server.py

@ -58,10 +58,8 @@ if __name__ == "__main__":
while True: while True:
received_bytes = socket.recv() received_bytes = socket.recv()
text = received_bytes.decode('utf-8') text = received_bytes.decode('utf-8')
logger.info('Got message %s' % text)
if text == 'ping': if text == 'ping':
handle_ping() handle_ping()
logger.info('Sent pong')
else: else:
handle_task(text) handle_task(text)

1
server/src/config.ts

@ -18,6 +18,7 @@ export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');
export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002'); export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002');
export const ANLYTICS_PING_INTERVAL = 500; // ms
function getConfigField(field, defaultVal?) { function getConfigField(field, defaultVal?) {

72
server/src/services/analytics_service.ts

@ -1,4 +1,4 @@
import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING } from '../config' import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING, ANLYTICS_PING_INTERVAL } from '../config'
const zmq = require('zeromq'); const zmq = require('zeromq');
@ -11,10 +11,11 @@ export class AnalyticsService {
private _requester: any; private _requester: any;
private _ready: boolean = false; private _ready: boolean = false;
private _pingResponded = false;
constructor(private _onResponse: (response: any) => void) { constructor(private _onResponse: (response: any) => void) {
this._initConnection(); this._init();
} }
public async sendTask(msgObj: any): Promise<void> { public async sendTask(msgObj: any): Promise<void> {
@ -44,7 +45,7 @@ export class AnalyticsService {
public get ready(): boolean { return this._ready; } public get ready(): boolean { return this._ready; }
private async _initConnection() { private async _init() {
this._requester = zmq.socket('pair'); this._requester = zmq.socket('pair');
if(process.env.NODE_ENV !== 'development') { if(process.env.NODE_ENV !== 'development') {
@ -53,26 +54,13 @@ export class AnalyticsService {
console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING); console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING);
this._requester.connect(ZEROMQ_CONNECTION_STRING); this._requester.connect(ZEROMQ_CONNECTION_STRING);
console.log('Ok');
console.log('Sending ping to analytics...');
await this._connectToAnalytics();
console.log('Ok')
this._requester.on("message", this._onAnalyticsMessage.bind(this)); this._requester.on("message", this._onAnalyticsMessage.bind(this));
console.log('Ok');
this._ready = true; console.log('Start analytics pinger...');
this._runAlalyticsPinger();
} console.log('Ok');
private async _connectToAnalytics() {
this.sendMessage('ping'); // we don`t await here
return new Promise(resolve => {
this._requester.once('message', (message) => {
console.log('Got message from analytics: ' + message);
resolve();
})
});
} }
private _runAnalyticsProcess() { private _runAnalyticsProcess() {
@ -88,10 +76,48 @@ export class AnalyticsService {
console.log('ok'); console.log('ok');
} }
private _onAnalyticsMessage(data) { private _onAnalyticsUp() {
console.log(`analytics message: ${data}`); console.log('Analytics is up');
let response = JSON.parse(data); }
private _onAnalyticsDown() {
console.log('Analytics is down');
if(process.env.NODE_ENV !== 'development') {
this._runAnalyticsProcess();
}
}
private _onAnalyticsMessage(text: any, error) {
if(text.toString() === 'pong') {
this._pingResponded = true;
if(!this._ready) {
this._ready = true;
this._onAnalyticsUp();
}
return;
}
console.log(`analytics message: "${text}"`);
let response;
try {
response = JSON.parse(text);
} catch (e) {
console.error("Can`t parse response from analytics as json:");
console.error(text);
throw new Error('Unexpected response');
}
this._onResponse(response); this._onResponse(response);
} }
private async _runAlalyticsPinger() {
setInterval(() => {
if(!this._pingResponded && this._ready) {
this._ready = false;
this._onAnalyticsDown();
}
this._pingResponded = false;
// TODO: set life limit for this ping
this.sendMessage('ping');
}, ANLYTICS_PING_INTERVAL);
}
} }

Loading…
Cancel
Save