diff --git a/analytics/config.py b/analytics/config.py index 0fdda4b..90ed22a 100644 --- a/analytics/config.py +++ b/analytics/config.py @@ -27,11 +27,12 @@ def get_config_field(field, default_val = None): raise Exception('Please configure {}'.format(field)) - DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets') ANALYTIC_UNITS_FOLDER = os.path.join(DATA_FOLDER, 'analytic_units') MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models') METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics') HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') -ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002') + +ZMQ_DEV_PORT = get_config_field('ZMQ_DEV_PORT', '8002') +ZMQ_CONNECTION_STRING = get_config_field('ZMQ_CONNECTION_STRING', 'tcp://*:%s' % ZMQ_DEV_PORT) diff --git a/analytics/server.py b/analytics/server.py index 75a0fa0..dcf0952 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -8,7 +8,6 @@ import services from analytic_unit_worker import AnalyticUnitWorker - root = logging.getLogger() logger = logging.getLogger('SERVER') @@ -18,11 +17,15 @@ data_service = None root.setLevel(logging.DEBUG) -ch = logging.StreamHandler(sys.stdout) -ch.setLevel(logging.DEBUG) -formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") -ch.setFormatter(formatter) -root.addHandler(ch) + +logging_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") + +logging_handler = logging.StreamHandler(sys.stdout) +#logging_handler = logging.FileHandler(config.DATA_FOLDER + '/analytics.log') +logging_handler.setLevel(logging.DEBUG) +logging_handler.setFormatter(logging_formatter) + +root.addHandler(logging_handler) async def handle_task(text): @@ -61,4 +64,6 @@ if __name__ == "__main__": worker = AnalyticUnitWorker() logger.info("Ok") server_service, data_service = init_services() + print('Analytics process is running') # we need to print to stdout and flush + sys.stdout.flush() # because node.js expects it loop.run_until_complete(server_service.handle_loop()) diff --git a/analytics/services/server_service.py b/analytics/services/server_service.py index b88c250..a43bada 100644 --- a/analytics/services/server_service.py +++ b/analytics/services/server_service.py @@ -14,10 +14,10 @@ class ServerService: def __init__(self, on_message_handler): self.on_message_handler = on_message_handler - logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) + logger.info("Binding to %s ..." % config.ZMQ_CONNECTION_STRING) self.context = zmq.asyncio.Context() self.socket = self.context.socket(zmq.PAIR) - self.socket.bind(config.ZEROMQ_CONNECTION_STRING) + self.socket.bind(config.ZMQ_CONNECTION_STRING) async def handle_loop(self): while True: diff --git a/config.example.json b/config.example.json index 54d0ed7..a27c12f 100644 --- a/config.example.json +++ b/config.example.json @@ -1,5 +1,4 @@ { "HASTIC_PORT": 8000, - "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9", - "ZEROMQ_CONNECTION_STRING": "ipc:///tmp/hastic/8000" + "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9" } diff --git a/server/src/config.ts b/server/src/config.ts index 43efee1..fc8f133 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -21,7 +21,9 @@ export const METRICS_PATH = path.join(DATA_PATH, 'metrics'); export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); -export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002'); +export const ZMQ_CONNECTION_STRING = getConfigField('ZMQ_CONNECTION_STRING', null); +export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join('/tmp', 'hastic')); +export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); export const ANLYTICS_PING_INTERVAL = 500; // ms diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index 51eb271..aa3d77a 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -7,7 +7,7 @@ import { AnalyticsService } from '../services/analytics_service' const taskMap = {}; let nextTaskId = 0; -const analyticsService = new AnalyticsService(onResponse); +let analyticsService = undefined; function onResponse(response: any) { let taskId = response._taskId; @@ -21,6 +21,14 @@ function onResponse(response: any) { } } +export function init() { + analyticsService = new AnalyticsService(onResponse); +} + +export function terminate() { + analyticsService.close(); +} + async function runTask(task): Promise { let anomaly: AnalyticUnit.AnalyticUnit = AnalyticUnit.findById(task.analyticUnitId); task.metric = { diff --git a/server/src/index.ts b/server/src/index.ts index 7e80e29..499d41f 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -5,6 +5,7 @@ import { router as alertsRouter } from './routes/alerts_router'; import * as AnalyticsController from './controllers/analytics_controller'; import * as Data from './services/data_service'; +import * as ProcessService from './services/process_service'; import { HASTIC_PORT } from './config'; @@ -12,7 +13,10 @@ import * as Koa from 'koa'; import * as Router from 'koa-router'; import * as bodyParser from 'koa-bodyparser'; + Data.checkDataFolders(); +AnalyticsController.init(); +ProcessService.registerExitHandler(AnalyticsController.terminate); var app = new Koa(); @@ -31,7 +35,6 @@ rootRouter.use('/analyticUnits', anomaliesRouter.routes(), anomaliesRouter.allow rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods()); rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods()); rootRouter.get('/', async (ctx) => { - ctx.response.body = { server: 'Ok', analyticsReady: AnalyticsController.isAnalyticReady(), @@ -43,6 +46,11 @@ app .use(rootRouter.routes()) .use(rootRouter.allowedMethods()); -app.listen(HASTIC_PORT, () => { +let server = app.listen(HASTIC_PORT, () => { console.log(`Server is running on :${HASTIC_PORT}`); }); + +ProcessService.registerExitHandler(() => { + console.log('Stopping server...'); + server.close(); +}) diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index b58c189..703146d 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -1,4 +1,4 @@ -import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING, ANLYTICS_PING_INTERVAL } from '../config' +import * as config from '../config'; const zmq = require('zeromq'); @@ -12,6 +12,10 @@ export class AnalyticsService { private _requester: any; private _ready: boolean = false; private _pingResponded = false; + private _zmqConnectionString = null; + private _ipcPath = null; + private _analyticsPinger: NodeJS.Timer = null; + private _isClosed = false; constructor(private _onResponse: (response: any) => void) { @@ -39,19 +43,41 @@ export class AnalyticsService { } public close() { - // TODO: close socket & terminate process if you have any + this._isClosed = true; + console.log('Terminating analytics service...'); + clearInterval(this._analyticsPinger); + if(this._ipcPath !== null) { + fs.unlinkSync(this._ipcPath); + } this._requester.close(); + console.log('Ok'); } public get ready(): boolean { return this._ready; } private async _init() { this._requester = zmq.socket('pair'); + let productionMode = process.env.NODE_ENV !== 'development'; + + this._zmqConnectionString = `tcp://127.0.0.1:${config.ZMQ_DEV_PORT}`; // debug mode + if(productionMode) { + this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; + if(this._zmqConnectionString === null) { + var createResult = await AnalyticsService.createIPCAddress(); + this._zmqConnectionString = createResult.address; + this._ipcPath = createResult.file; + } + } - if(process.env.NODE_ENV !== 'development') { + console.log("Binding to zmq... %s", this._zmqConnectionString); + this._requester.connect(this._zmqConnectionString); + this._requester.on("message", this._onAnalyticsMessage.bind(this)); + console.log('Ok'); + + if(productionMode) { console.log('Creating analytics process...'); try { - var cp = await AnalyticsService._runAnalyticsProcess(); + var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); } catch(error) { console.error('Can`t run analytics process: %s', error); return; @@ -59,11 +85,6 @@ export class AnalyticsService { console.log('Ok, pid: %s', cp.pid); } - console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING); - this._requester.connect(ZEROMQ_CONNECTION_STRING); - this._requester.on("message", this._onAnalyticsMessage.bind(this)); - console.log('Ok'); - console.log('Start analytics pinger...'); this._runAlalyticsPinger(); console.log('Ok'); @@ -71,21 +92,30 @@ export class AnalyticsService { } /** - * Spawns analytics process. Reads process stderr and fails if it - * is not empty. No need to stop process later. + * Spawns analytics process. Reads process stderr and fails if it isn`t empty. + * No need to stop the process later. * - * @returns creaded child process + * @returns Creaded child process + * @throws Process start error or first exception during python start */ - private static async _runAnalyticsProcess(): Promise { + private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { let cp: childProcess.ChildProcess; - if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) { + let cpOptions = { + cwd: config.ANALYTICS_PATH, + env: { + ...process.env, + ZMQ_CONNECTION_STRING: zmqConnectionString + } + }; + + if(fs.existsSync(path.join(config.ANALYTICS_PATH, 'dist/worker/worker'))) { console.log('dist/worker/worker'); - cp = childProcess.spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH }); + cp = childProcess.spawn('dist/worker/worker', [], cpOptions); } else { console.log('python3 server.py'); // If compiled analytics script doesn't exist - fallback to regular python - console.log(ANALYTICS_PATH); - cp = childProcess.spawn('python3', ['server.py'], { cwd: ANALYTICS_PATH }); + console.log(config.ANALYTICS_PATH); + cp = childProcess.spawn('python3', ['server.py'], cpOptions); } if(cp.pid === undefined) { @@ -126,7 +156,7 @@ export class AnalyticsService { private async _onAnalyticsDown() { console.log('Analytics is down'); if(process.env.NODE_ENV !== 'development') { - await AnalyticsService._runAnalyticsProcess(); + await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); } } @@ -152,7 +182,10 @@ export class AnalyticsService { } private async _runAlalyticsPinger() { - setInterval(() => { + this._analyticsPinger = setInterval(() => { + if(this._isClosed) { + return; + } if(!this._pingResponded && this._ready) { this._ready = false; this._onAnalyticsDown(); @@ -160,7 +193,14 @@ export class AnalyticsService { this._pingResponded = false; // TODO: set life limit for this ping this.sendMessage('ping'); - }, ANLYTICS_PING_INTERVAL); + }, config.ANLYTICS_PING_INTERVAL); + } + + private static async createIPCAddress(): Promise<{ address: string, file: string }> { + let filename = `${process.pid}.ipc` + let p = path.join(config.ZMQ_IPC_PATH, filename); + fs.writeFileSync(p, ''); + return Promise.resolve({ address: 'ipc://' + p, file: p }); } } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 8897b68..24851b6 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -15,7 +15,9 @@ function maybeCreate(path: string): void { if(fs.existsSync(path)) { return; } + console.log('mkdir: ' + path); fs.mkdirSync(path); + console.log('exists: ' + fs.existsSync(path)); } export function checkDataFolders(): void { @@ -25,6 +27,7 @@ export function checkDataFolders(): void { config.ANALYTIC_UNITS_PATH, config.MODELS_PATH, config.METRICS_PATH, - config.SEGMENTS_PATH + config.SEGMENTS_PATH, + config.ZMQ_IPC_PATH ].forEach(maybeCreate); } diff --git a/server/src/services/process_service.ts b/server/src/services/process_service.ts new file mode 100644 index 0000000..9651293 --- /dev/null +++ b/server/src/services/process_service.ts @@ -0,0 +1,35 @@ + +var exitHandlers = [] +var exitHandled = false; + +/** + * Add a callback for closing programm bacause of any reason + * + * @param callback a sync function + */ +export function registerExitHandler(callback: () => void) { + exitHandlers.push(callback); +} + +function exitHandler(options, err) { + if(exitHandled) { + return; + } + exitHandled = true; + for(let i = 0; i < exitHandlers.length; i++) { + exitHandlers[i](); + } +} + +//do something when app is closing +process.on('exit', exitHandler.bind(null,{cleanup:true})); + +//catches ctrl+c event +process.on('SIGINT', exitHandler.bind(null, {exit:true})); + +// catches "kill pid" (for example: nodemon restart) +process.on('SIGUSR1', exitHandler.bind(null, {exit:true})); +process.on('SIGUSR2', exitHandler.bind(null, {exit:true})); + +//catches uncaught exceptions +process.on('uncaughtException', exitHandler.bind(null, {exit:true})); \ No newline at end of file