Browse Source

Missing communication between analytics and server in production #91 (#95)

* analytics ipc config via env vs dev mode

* binding to zmq before child python start

* better usage of ipc and handling server process stopping

* ipc via /tmp

* ZQM_DEV_PORT to config
pull/1/head
Alexey Velikiy 6 years ago committed by rozetko
parent
commit
42356821ad
  1. 5
      analytics/config.py
  2. 17
      analytics/server.py
  3. 4
      analytics/services/server_service.py
  4. 3
      config.example.json
  5. 4
      server/src/config.ts
  6. 10
      server/src/controllers/analytics_controller.ts
  7. 12
      server/src/index.ts
  8. 80
      server/src/services/analytics_service.ts
  9. 5
      server/src/services/data_service.ts
  10. 35
      server/src/services/process_service.ts

5
analytics/config.py

@ -27,11 +27,12 @@ def get_config_field(field, default_val = None):
raise Exception('Please configure {}'.format(field)) raise Exception('Please configure {}'.format(field))
DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets') DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets')
ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units') ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units')
MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models') MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models')
METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics') METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics')
HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') HASTIC_API_KEY = get_config_field('HASTIC_API_KEY')
ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002')
ZMQ_DEV_PORT = get_config_field('ZMQ_DEV_PORT', '8002')
ZMQ_CONNECTION_STRING = get_config_field('ZMQ_CONNECTION_STRING', 'tcp://*:%s' % ZMQ_DEV_PORT)

17
analytics/server.py

@ -8,7 +8,6 @@ import services
from analytic_unit_worker import AnalyticUnitWorker from analytic_unit_worker import AnalyticUnitWorker
root = logging.getLogger() root = logging.getLogger()
logger = logging.getLogger('SERVER') logger = logging.getLogger('SERVER')
@ -18,11 +17,15 @@ data_service = None
root.setLevel(logging.DEBUG) root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG) logging_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
ch.setFormatter(formatter) logging_handler = logging.StreamHandler(sys.stdout)
root.addHandler(ch) #logging_handler = logging.FileHandler(config.DATA_FOLDER + '/analytics.log')
logging_handler.setLevel(logging.DEBUG)
logging_handler.setFormatter(logging_formatter)
root.addHandler(logging_handler)
async def handle_task(text): async def handle_task(text):
@ -61,4 +64,6 @@ if __name__ == "__main__":
worker = AnalyticUnitWorker() worker = AnalyticUnitWorker()
logger.info("Ok") logger.info("Ok")
server_service, data_service = init_services() server_service, data_service = init_services()
print('Analytics process is running') # we need to print to stdout and flush
sys.stdout.flush() # because node.js expects it
loop.run_until_complete(server_service.handle_loop()) loop.run_until_complete(server_service.handle_loop())

4
analytics/services/server_service.py

@ -14,10 +14,10 @@ class ServerService:
def __init__(self, on_message_handler): def __init__(self, on_message_handler):
self.on_message_handler = on_message_handler self.on_message_handler = on_message_handler
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) logger.info("Binding to %s ..." % config.ZMQ_CONNECTION_STRING)
self.context = zmq.asyncio.Context() self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.PAIR) self.socket = self.context.socket(zmq.PAIR)
self.socket.bind(config.ZEROMQ_CONNECTION_STRING) self.socket.bind(config.ZMQ_CONNECTION_STRING)
async def handle_loop(self): async def handle_loop(self):
while True: while True:

3
config.example.json

@ -1,5 +1,4 @@
{ {
"HASTIC_PORT": 8000, "HASTIC_PORT": 8000,
"HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9", "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9"
"ZEROMQ_CONNECTION_STRING": "ipc:///tmp/hastic/8000"
} }

4
server/src/config.ts

@ -21,7 +21,9 @@ export const METRICS_PATH = path.join(DATA_PATH, 'metrics');
export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');
export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002'); export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null);
export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join('/tmp', 'hastic'));
export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002');
export const ANLYTICS_PING_INTERVAL = 500; // ms export const ANLYTICS_PING_INTERVAL = 500; // ms

10
server/src/controllers/analytics_controller.ts

@ -7,7 +7,7 @@ import { AnalyticsService } from '../services/analytics_service'
const taskMap = {}; const taskMap = {};
let nextTaskId = 0; let nextTaskId = 0;
const analyticsService = new AnalyticsService(onResponse); let analyticsService = undefined;
function onResponse(response: any) { function onResponse(response: any) {
let taskId = response._taskId; let taskId = response._taskId;
@ -21,6 +21,14 @@ function onResponse(response: any) {
} }
} }
export function init() {
analyticsService = new AnalyticsService(onResponse);
}
export function terminate() {
analyticsService.close();
}
async function runTask(task): Promise<any> { async function runTask(task): Promise<any> {
let anomaly: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(task.analyticUnitId); let anomaly: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(task.analyticUnitId);
task.metric = { task.metric = {

12
server/src/index.ts

@ -5,6 +5,7 @@ import { router as alertsRouter } from './routes/alerts_router';
import * as AnalyticsController from './controllers/analytics_controller'; import * as AnalyticsController from './controllers/analytics_controller';
import * as Data from './services/data_service'; import * as Data from './services/data_service';
import * as ProcessService from './services/process_service';
import { HASTIC_PORT } from './config'; import { HASTIC_PORT } from './config';
@ -12,7 +13,10 @@ import * as Koa from 'koa';
import * as Router from 'koa-router'; import * as Router from 'koa-router';
import * as bodyParser from 'koa-bodyparser'; import * as bodyParser from 'koa-bodyparser';
Data.checkDataFolders(); Data.checkDataFolders();
AnalyticsController.init();
ProcessService.registerExitHandler(AnalyticsController.terminate);
var app = new Koa(); var app = new Koa();
@ -31,7 +35,6 @@ rootRouter.use('/analyticUnits', anomaliesRouter.routes(), anomaliesRouter.allow
rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods()); rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods());
rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods()); rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods());
rootRouter.get('/', async (ctx) => { rootRouter.get('/', async (ctx) => {
ctx.response.body = { ctx.response.body = {
server: 'Ok', server: 'Ok',
analyticsReady: AnalyticsController.isAnalyticReady(), analyticsReady: AnalyticsController.isAnalyticReady(),
@ -43,6 +46,11 @@ app
.use(rootRouter.routes()) .use(rootRouter.routes())
.use(rootRouter.allowedMethods()); .use(rootRouter.allowedMethods());
app.listen(HASTIC_PORT, () => { let server = app.listen(HASTIC_PORT, () => {
console.log(`Server is running on :${HASTIC_PORT}`); console.log(`Server is running on :${HASTIC_PORT}`);
}); });
ProcessService.registerExitHandler(() => {
console.log('Stopping server...');
server.close();
})

80
server/src/services/analytics_service.ts

@ -1,4 +1,4 @@
import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING, ANLYTICS_PING_INTERVAL } from '../config' import * as config from '../config';
const zmq = require('zeromq'); const zmq = require('zeromq');
@ -12,6 +12,10 @@ export class AnalyticsService {
private _requester: any; private _requester: any;
private _ready: boolean = false; private _ready: boolean = false;
private _pingResponded = false; private _pingResponded = false;
private _zmqConnectionString = null;
private _ipcPath = null;
private _analyticsPinger: NodeJS.Timer = null;
private _isClosed = false;
constructor(private _onResponse: (response: any) => void) { constructor(private _onResponse: (response: any) => void) {
@ -39,19 +43,41 @@ export class AnalyticsService {
} }
public close() { public close() {
// TODO: close socket & terminate process if you have any this._isClosed = true;
console.log('Terminating analytics service...');
clearInterval(this._analyticsPinger);
if(this._ipcPath !== null) {
fs.unlinkSync(this._ipcPath);
}
this._requester.close(); this._requester.close();
console.log('Ok');
} }
public get ready(): boolean { return this._ready; } public get ready(): boolean { return this._ready; }
private async _init() { private async _init() {
this._requester = zmq.socket('pair'); this._requester = zmq.socket('pair');
let productionMode = process.env.NODE_ENV !== 'development';
this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode
if(productionMode) {
this._zmqConnectionString = config.ZMQ_CONNECTION_STRING;
if(this._zmqConnectionString === null) {
var createResult = await AnalyticsService.createIPCAddress();
this._zmqConnectionString = createResult.address;
this._ipcPath = createResult.file;
}
}
if(process.env.NODE_ENV !== 'development') { console.log("Binding to zmq... %s", this._zmqConnectionString);
this._requester.connect(this._zmqConnectionString);
this._requester.on("message", this._onAnalyticsMessage.bind(this));
console.log('Ok');
if(productionMode) {
console.log('Creating analytics process...'); console.log('Creating analytics process...');
try { try {
var cp = await AnalyticsService._runAnalyticsProcess(); var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
} catch(error) { } catch(error) {
console.error('Can`t run analytics process: %s', error); console.error('Can`t run analytics process: %s', error);
return; return;
@ -59,11 +85,6 @@ export class AnalyticsService {
console.log('Ok, pid: %s', cp.pid); console.log('Ok, pid: %s', cp.pid);
} }
console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING);
this._requester.connect(ZEROMQ_CONNECTION_STRING);
this._requester.on("message", this._onAnalyticsMessage.bind(this));
console.log('Ok');
console.log('Start analytics pinger...'); console.log('Start analytics pinger...');
this._runAlalyticsPinger(); this._runAlalyticsPinger();
console.log('Ok'); console.log('Ok');
@ -71,21 +92,30 @@ export class AnalyticsService {
} }
/** /**
* Spawns analytics process. Reads process stderr and fails if it * Spawns analytics process. Reads process stderr and fails if it isn`t empty.
* is not empty. No need to stop process later. * No need to stop the process later.
* *
* @returns creaded child process * @returns Creaded child process
* @throws Process start error or first exception during python start
*/ */
private static async _runAnalyticsProcess(): Promise<childProcess.ChildProcess> { private static async _runAnalyticsProcess(zmqConnectionString: string): Promise<childProcess.ChildProcess> {
let cp: childProcess.ChildProcess; let cp: childProcess.ChildProcess;
if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) { let cpOptions = {
cwd: config.ANALYTICS_PATH,
env: {
...process.env,
ZMQ_CONNECTION_STRING: zmqConnectionString
}
};
if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) {
console.log('dist/worker/worker'); console.log('dist/worker/worker');
cp = childProcess.spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH }); cp = childProcess.spawn('dist/worker/worker', [], cpOptions);
} else { } else {
console.log('python3 server.py'); console.log('python3 server.py');
// If compiled analytics script doesn't exist - fallback to regular python // If compiled analytics script doesn't exist - fallback to regular python
console.log(ANALYTICS_PATH); console.log(config.ANALYTICS_PATH);
cp = childProcess.spawn('python3', ['server.py'], { cwd: ANALYTICS_PATH }); cp = childProcess.spawn('python3', ['server.py'], cpOptions);
} }
if(cp.pid === undefined) { if(cp.pid === undefined) {
@ -126,7 +156,7 @@ export class AnalyticsService {
private async _onAnalyticsDown() { private async _onAnalyticsDown() {
console.log('Analytics is down'); console.log('Analytics is down');
if(process.env.NODE_ENV !== 'development') { if(process.env.NODE_ENV !== 'development') {
await AnalyticsService._runAnalyticsProcess(); await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
} }
} }
@ -152,7 +182,10 @@ export class AnalyticsService {
} }
private async _runAlalyticsPinger() { private async _runAlalyticsPinger() {
setInterval(() => { this._analyticsPinger = setInterval(() => {
if(this._isClosed) {
return;
}
if(!this._pingResponded && this._ready) { if(!this._pingResponded && this._ready) {
this._ready = false; this._ready = false;
this._onAnalyticsDown(); this._onAnalyticsDown();
@ -160,7 +193,14 @@ export class AnalyticsService {
this._pingResponded = false; this._pingResponded = false;
// TODO: set life limit for this ping // TODO: set life limit for this ping
this.sendMessage('ping'); this.sendMessage('ping');
}, ANLYTICS_PING_INTERVAL); }, config.ANLYTICS_PING_INTERVAL);
}
private static async createIPCAddress(): Promise<{ address: string, file: string }> {
let filename = `${process.pid}.ipc`
let p = path.join(config.ZMQ_IPC_PATH, filename);
fs.writeFileSync(p, '');
return Promise.resolve({ address: 'ipc://' + p, file: p });
} }
} }

5
server/src/services/data_service.ts

@ -15,7 +15,9 @@ function maybeCreate(path: string): void {
if(fs.existsSync(path)) { if(fs.existsSync(path)) {
return; return;
} }
console.log('mkdir: ' + path);
fs.mkdirSync(path); fs.mkdirSync(path);
console.log('exists: ' + fs.existsSync(path));
} }
export function checkDataFolders(): void { export function checkDataFolders(): void {
@ -25,6 +27,7 @@ export function checkDataFolders(): void {
config.ANALYTIC_UNITS_PATH, config.ANALYTIC_UNITS_PATH,
config.MODELS_PATH, config.MODELS_PATH,
config.METRICS_PATH, config.METRICS_PATH,
config.SEGMENTS_PATH config.SEGMENTS_PATH,
config.ZMQ_IPC_PATH
].forEach(maybeCreate); ].forEach(maybeCreate);
} }

35
server/src/services/process_service.ts

@ -0,0 +1,35 @@
var exitHandlers = []
var exitHandled = false;
/**
* Add a callback for closing programm bacause of any reason
*
* @param callback a sync function
*/
export function registerExitHandler(callback: () => void) {
exitHandlers.push(callback);
}
function exitHandler(options, err) {
if(exitHandled) {
return;
}
exitHandled = true;
for(let i = 0; i < exitHandlers.length; i++) {
exitHandlers[i]();
}
}
//do something when app is closing
process.on('exit', exitHandler.bind(null,{cleanup:true}));
//catches ctrl+c event
process.on('SIGINT', exitHandler.bind(null, {exit:true}));
// catches "kill pid" (for example: nodemon restart)
process.on('SIGUSR1', exitHandler.bind(null, {exit:true}));
process.on('SIGUSR2', exitHandler.bind(null, {exit:true}));
//catches uncaught exceptions
process.on('uncaughtException', exitHandler.bind(null, {exit:true}));
Loading…
Cancel
Save