From 45dc3f22b90e7334cfa4b0a15cdeb3c3dcf8e3c3 Mon Sep 17 00:00:00 2001 From: rozetko Date: Thu, 5 Mar 2020 15:04:24 +0300 Subject: [PATCH] Websocket connectivity between server and analytics (#814) --- analytics/analytics/config.py | 3 +- .../analytics/services/server_service.py | 38 ++++-- analytics/requirements.txt | 3 +- server/build/webpack.prod.conf.js | 23 ++-- server/package.json | 7 +- server/spec/setup_tests.ts | 1 - server/src/config.ts | 34 ++--- server/src/index.ts | 23 +++- server/src/services/alert_service.ts | 1 + server/src/services/analytics_service.ts | 127 +++++++++++------- server/src/services/data_puller.ts | 6 +- server/src/services/data_service/index.ts | 3 +- 12 files changed, 160 insertions(+), 109 deletions(-) diff --git a/analytics/analytics/config.py b/analytics/analytics/config.py index 293b0a6..a833df9 100644 --- a/analytics/analytics/config.py +++ b/analytics/analytics/config.py @@ -26,6 +26,5 @@ def get_config_field(field: str, default_val = None): raise Exception('Please configure {}'.format(field)) -ZMQ_DEV_PORT = get_config_field('ZMQ_DEV_PORT', '8002') -ZMQ_CONNECTION_STRING = get_config_field('ZMQ_CONNECTION_STRING', 'tcp://0.0.0.0:%s' % ZMQ_DEV_PORT) +HASTIC_SERVER_URL = get_config_field('HASTIC_SERVER_URL', 'ws://localhost:8002') LEARNING_TIMEOUT = get_config_field('LEARNING_TIMEOUT', 120) diff --git a/analytics/analytics/services/server_service.py b/analytics/analytics/services/server_service.py index a81a6de..039060a 100644 --- a/analytics/analytics/services/server_service.py +++ b/analytics/analytics/services/server_service.py @@ -1,7 +1,6 @@ import config -import zmq -import zmq.asyncio +import websockets import logging import json @@ -23,6 +22,7 @@ SERVER_SOCKET_RECV_LOOP_INTERRUPTED = False @utils.meta.JSONClass class ServerMessage: def __init__(self, method: str, payload: object = None, request_id: int = None): + # TODO: add error type / case self.method = method self.payload = payload self.request_id = request_id @@ -33,6 +33,8 @@ class ServerService(utils.concurrent.AsyncZmqActor): def __init__(self): super(ServerService, self).__init__() self.__aiter_inited = False + # this typing doesn't help vscode, maybe there is a mistake + self.__server_socket: Optional[websockets.Connect] = None self.__request_next_id = 1 self.__responses = dict() self.start() @@ -78,24 +80,44 @@ class ServerService(utils.concurrent.AsyncZmqActor): return server_message async def _run_thread(self): - logger.info("Binding to %s ..." % config.ZMQ_CONNECTION_STRING) - self.__server_socket = self._zmq_context.socket(zmq.PAIR) - self.__server_socket.bind(config.ZMQ_CONNECTION_STRING) + logger.info("Binding to %s ..." % config.HASTIC_SERVER_URL) + # TODO: consider to use async context for socket await self.__server_socket_recv_loop() async def _on_message_to_thread(self, message: str): - await self.__server_socket.send_string(message) + await self.__server_socket.send(message) async def __server_socket_recv_loop(self): while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED: - received_string = await self.__server_socket.recv_string() + received_string = await self.__reconnect_recv() if received_string == 'PING': asyncio.ensure_future(self.__handle_ping()) else: asyncio.ensure_future(self._send_message_from_thread(received_string)) + + async def __reconnect_recv(self) -> str: + while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED: + try: + if self.__server_socket is None: + self.__server_socket = await websockets.connect(config.HASTIC_SERVER_URL) + first_message = await self.__server_socket.recv() + if first_message == 'EALREADYEXISTING': + raise ConnectionError('Can`t connect as a second analytics') + return await self.__server_socket.recv() + except (ConnectionRefusedError, websockets.ConnectionClosedError): + if not self.__server_socket is None: + self.__server_socket.close() + # TODO: this logic increases the number of ThreadPoolExecutor + self.__server_socket = None + # TODO: move to config + reconnect_delay = 3 + print('connection is refused or lost, trying to reconnect in %s seconds' % reconnect_delay) + await asyncio.sleep(reconnect_delay) + raise InterruptedError() async def __handle_ping(self): - await self.__server_socket.send_string('PONG') + # TODO: self.__server_socket can be None + await self.__server_socket.send('PONG') def __parse_message_or_save(self, text: str) -> Optional[ServerMessage]: try: diff --git a/analytics/requirements.txt b/analytics/requirements.txt index 89139cb..f3bb4a1 100644 --- a/analytics/requirements.txt +++ b/analytics/requirements.txt @@ -3,4 +3,5 @@ aiounittest==1.1.0 numpy==1.14.5 pandas==0.20.3 pyzmq==18.0.1 -scipy==1.1.0 \ No newline at end of file +scipy==1.1.0 +websockets==8.1 \ No newline at end of file diff --git a/server/build/webpack.prod.conf.js b/server/build/webpack.prod.conf.js index af225ff..bd5d98b 100644 --- a/server/build/webpack.prod.conf.js +++ b/server/build/webpack.prod.conf.js @@ -1,3 +1,5 @@ +const semver = require('semver'); + const webpack = require('webpack'); const path = require('path'); const fs = require('fs'); @@ -6,20 +8,12 @@ var base = require('./webpack.base.conf'); const TARGET_NODE_VERSION = process.versions.node; const PLATFORM = `${process.platform}-${process.arch}-node-${TARGET_NODE_VERSION.split('.')[0]}`; +const DEASYNC_NODE_MODULES_PATH = path.resolve('node_modules', 'deasync', 'bin', PLATFORM); +const DEASYNC_DIST_PATH = path.resolve('dist', 'bin', PLATFORM); console.log(`Target node version: ${TARGET_NODE_VERSION}`); console.log(`Platform: ${PLATFORM}`); - -const DEASYNC_NODE_MODULES_PATH = path.resolve( - 'node_modules', - 'deasync', - 'bin', - PLATFORM -); - -const DEASYNC_DIST_PATH = path.resolve('dist', 'bin', PLATFORM); - if(!fs.existsSync(DEASYNC_NODE_MODULES_PATH)) { throw new Error(`deasync doesn't support this platform: ${PLATFORM}`); } @@ -45,12 +39,11 @@ const prodRules = [ use: { loader: 'babel-loader', options: { + plugins: ["transform-object-rest-spread"], // for transpiling "ws" lib + // it's necessare only for node < 8.3.0, + // so could be optimized presets: [ - ["env", { - "targets": { - "node": TARGET_NODE_VERSION - } - }] + ["env", { "targets": { "node": TARGET_NODE_VERSION }}] ] } } diff --git a/server/package.json b/server/package.json index a337ec3..0a5dbf7 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "scripts": { "start": "node dist/server.js", "dev": "NODE_ENV=development node build/dev-server.js", - "build": "npm rebuild zeromq && webpack --config build/webpack.prod.conf.js", + "build": "webpack --config build/webpack.prod.conf.js", "test": "jest --config jest.config.js" }, "repository": { @@ -28,11 +28,12 @@ "@types/lodash": "^4.14.116", "@types/mongodb": "^3.3.1", "@types/nedb": "^1.8.0", - "@types/zeromq": "^4.6.0", + "@types/ws": "^6.0.4", "axios": "^0.18.0", "babel-core": "^6.26.3", "babel-jest": "^23.4.2", "babel-loader": "^7.1.4", + "babel-plugin-transform-object-rest-spread": "^6.26.0", "babel-polyfill": "^6.26.0", "babel-preset-env": "^1.7.0", "babel-preset-es2015": "^6.24.1", @@ -57,6 +58,6 @@ "url": "^0.11.0", "webpack": "^4.12.0", "webpack-cli": "^3.0.8", - "zeromq": "^4.6.0" + "ws": "^7.2.1" } } diff --git a/server/spec/setup_tests.ts b/server/spec/setup_tests.ts index c3c503c..f3f4eaa 100644 --- a/server/spec/setup_tests.ts +++ b/server/spec/setup_tests.ts @@ -7,7 +7,6 @@ console.error = jest.fn(); jest.mock('../src/config.ts', () => ({ DATA_PATH: 'fake-data-path', HASTIC_API_KEY: 'fake-key', - ZMQ_IPC_PATH: 'fake-zmq-path', HASTIC_DB_CONNECTION_TYPE: 'nedb', HASTIC_IN_MEMORY_PERSISTANCE: true, HASTIC_ALERT_TYPE: 'webhook', diff --git a/server/src/config.ts b/server/src/config.ts index 8acfd32..f6b837b 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -48,9 +48,6 @@ export const DETECTION_SPANS_DATABASE_PATH = path.join(DATA_PATH, 'detection_spa export const DB_META_PATH = path.join(DATA_PATH, 'db_meta.db'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); -export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join(os.tmpdir(), 'hastic')); -export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002'); -export const ZMQ_HOST = getConfigField('ZMQ_HOST', '127.0.0.1'); export const HASTIC_API_KEY = getConfigField('HASTIC_API_KEY'); export const GRAFANA_URL = normalizeUrl(getConfigField('GRAFANA_URL', null)); @@ -69,7 +66,7 @@ export const HASTIC_TIMEZONE_OFFSET = getTimeZoneOffset(); export const HASTIC_ALERTMANAGER_URL = getConfigField('HASTIC_ALERTMANAGER_URL', null); -export const ANLYTICS_PING_INTERVAL = 500; // ms +export const ANALYTICS_PING_INTERVAL = 500; // ms export const PACKAGE_VERSION = getPackageVersion(); export const GIT_INFO = { branch: GIT_BRANCH, @@ -79,7 +76,8 @@ export const GIT_INFO = { export const INSIDE_DOCKER = process.env.INSIDE_DOCKER !== undefined; export const PRODUCTION_MODE = process.env.NODE_ENV !== 'development'; -export const ZMQ_CONNECTION_STRING = createZMQConnectionString(); +// TODO: maybe rename it to "HASTIC_SERVER_ANALYTICS_URL" +export const HASTIC_SERVER_URL = getConfigField('HASTIC_SERVER_URL', 'ws://localhost:8002'); export const HASTIC_INSTANCE_NAME = getConfigField('HASTIC_INSTANCE_NAME', os.hostname()); @@ -126,17 +124,23 @@ function getPackageVersion() { } } -function createZMQConnectionString() { - let zmq =`tcp://${ZMQ_HOST}:${ZMQ_DEV_PORT}`; //debug mode - let zmqConf = getConfigField('ZMQ_CONNECTION_STRING', null); - if(INSIDE_DOCKER) { - return zmqConf; - } else if(PRODUCTION_MODE) { - if(zmqConf === null) { - return 'ipc://' + `${path.join(ZMQ_IPC_PATH, process.pid.toString())}.ipc`; - } +function getGitInfo() { + let gitRoot = path.join(__dirname, '../../.git'); + let gitHeadFile = path.join(gitRoot, 'HEAD'); + if(!fs.existsSync(gitHeadFile)) { + console.error(`Can't find git HEAD file ${gitHeadFile}`); + return null; + } + const ref = fs.readFileSync(gitHeadFile).toString(); + let branchPath = ref.indexOf(':') === -1 ? ref : ref.slice(5, -1); + let branch = branchPath.split('/').pop(); + const branchFilename = `${gitRoot}/${branchPath}`; + if(!fs.existsSync(branchFilename)) { + console.error(`Can't find git branch file ${branchFilename}`); + return null; } - return zmq; + let commitHash = fs.readFileSync(branchFilename).toString().slice(0, 7); + return { branch, commitHash }; } // TODO: move to data_layer diff --git a/server/src/index.ts b/server/src/index.ts index 3c672df..47cb9c5 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -8,7 +8,7 @@ import * as AnalyticsController from './controllers/analytics_controller'; import * as ProcessService from './services/process_service'; -import { HASTIC_PORT, PACKAGE_VERSION, GIT_INFO, ZMQ_CONNECTION_STRING, HASTIC_INSTANCE_NAME } from './config'; +import { HASTIC_PORT, PACKAGE_VERSION, GIT_INFO, HASTIC_INSTANCE_NAME } from './config'; import { applyDBMigrations } from './services/data_service/migrations'; @@ -16,21 +16,24 @@ import * as Koa from 'koa'; import * as Router from 'koa-router'; import * as bodyParser from 'koa-bodyparser'; +import { createServer, Server } from 'http'; + init(); async function init() { await applyDBMigrations(); - AnalyticsController.init(); - ProcessService.registerExitHandler(AnalyticsController.terminate); const app = new Koa(); + let httpServer = createServer(app.callback()); + + AnalyticsController.init(); + ProcessService.registerExitHandler(AnalyticsController.terminate); app.on('error', (err, ctx) => { console.log('got server error:'); console.log(err); }); - app.use(bodyParser()); app.use(async function(ctx, next) { @@ -77,7 +80,6 @@ async function init() { packageVersion: PACKAGE_VERSION, npmUserAgent: process.env.npm_config_user_agent, docker: process.env.INSIDE_DOCKER !== undefined, - zmqConectionString: ZMQ_CONNECTION_STRING, serverPort: HASTIC_PORT, git: GIT_INFO, activeWebhooks: activeWebhooks.length, @@ -89,7 +91,16 @@ async function init() { .use(rootRouter.routes()) .use(rootRouter.allowedMethods()); - app.listen(HASTIC_PORT, () => { + httpServer.listen({ port: HASTIC_PORT, exclusive: true }, () => { console.log(`Server is running on :${HASTIC_PORT}`); }); + + httpServer.on('error', (err) => { + console.error(`Http server error: ${err.message}`) + }); + + ProcessService.registerExitHandler(() => { + httpServer.close(); + }); + } diff --git a/server/src/services/alert_service.ts b/server/src/services/alert_service.ts index 0a7f895..096ab4d 100644 --- a/server/src/services/alert_service.ts +++ b/server/src/services/alert_service.ts @@ -8,6 +8,7 @@ import { ORG_ID, HASTIC_API_KEY, HASTIC_ALERT_IMAGE } from '../config'; import axios from 'axios'; import * as _ from 'lodash'; + const Notifier = getNotifier(); export class Alert { public enabled = true; diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index 81a1fa2..031e216 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -4,7 +4,7 @@ import { WebhookType } from '../services/notification_service'; import * as config from '../config'; import { AlertService } from './alert_service'; -import * as zmq from 'zeromq'; +import * as WebSocket from 'ws'; import * as childProcess from 'child_process' import * as fs from 'fs'; @@ -15,13 +15,12 @@ import * as _ from 'lodash'; export class AnalyticsService { private _alertService = new AlertService(); - private _requester: any; + private _socket_server: WebSocket.Server; + private _socket_connection: WebSocket = null; private _ready: boolean = false; private _lastAlive: Date = null; private _pingResponded = false; - private _zmqConnectionString: string = null; - private _ipcPath: string = null; - private _analyticsPinger: NodeJS.Timer = null; + private _analyticsPinger: NodeJS.Timeout = null; private _isClosed = false; private _productionMode = false; private _inDocker = false; @@ -59,7 +58,10 @@ export class AnalyticsService { public async sendText(text: string): Promise { return new Promise((resolve, reject) => { - this._requester.send(text, undefined, (err: any) => { + if(this._socket_connection === null) { + reject('Can`t send because analytics is not connected'); + } + this._socket_connection.send(text, undefined, (err: any) => { if(err) { console.trace(`got error while sending ${err}`); reject(err); @@ -70,39 +72,22 @@ export class AnalyticsService { }); } - public close() { - this._isClosed = true; - console.log('Terminating analytics service...'); - clearInterval(this._analyticsPinger); - if(this._ipcPath !== null) { - console.log('Remove ipc path: ' + this._ipcPath); - fs.unlinkSync(this._ipcPath); - } - this._requester.close(); - console.log('Terminating successful'); - } - public get ready(): boolean { return this._ready; } public get lastAlive(): Date { return this._lastAlive; } private async _init() { - this._requester = zmq.socket('pair'); - - this._zmqConnectionString = config.ZMQ_CONNECTION_STRING; + this._socket_server = new WebSocket.Server({ port: 8002 }); - if(this._zmqConnectionString.startsWith('ipc')) { - this._ipcPath = AnalyticsService.createIPCAddress(this._zmqConnectionString); - } + // TODO: move this to config OR use existing http server + console.log("Creating websocket server ... %s", 'ws://localhost:8002'); - console.log("Binding to zmq... %s", this._zmqConnectionString); - this._requester.connect(this._zmqConnectionString); - this._requester.on("message", this._onAnalyticsMessage.bind(this)); - console.log('Binding successful'); + this._socket_server.on("connection", this._onNewConnection.bind(this)); + // TODO: handle connection drop if(this._productionMode && !this._inDocker) { console.log('Creating analytics process...'); try { - var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); + var cp = await AnalyticsService._runAnalyticsProcess(); } catch(error) { console.error('Can`t run analytics process: %s', error); return; @@ -110,10 +95,6 @@ export class AnalyticsService { console.log('Analytics creating successful, pid: %s', cp.pid); } - console.log('Start analytics pinger...'); - this._runAlalyticsPinger(); - console.log('Analytics pinger started'); - } /** @@ -123,13 +104,13 @@ export class AnalyticsService { * @returns Creaded child process * @throws Process start error or first exception during python start */ - private static async _runAnalyticsProcess(zmqConnectionString: string): Promise { + private static async _runAnalyticsProcess(): Promise { let cp: childProcess.ChildProcess; let cpOptions = { cwd: config.ANALYTICS_PATH, env: { ...process.env, - ZMQ_CONNECTION_STRING: zmqConnectionString + HASTIC_SERVER_URL: config.HASTIC_SERVER_URL } }; @@ -185,17 +166,7 @@ export class AnalyticsService { this._alertService.sendMessage(msg, WebhookType.RECOVERY); } - private async _onAnalyticsDown() { - const msg = 'Analytics is down'; - console.log(msg); - this._alertService.sendMessage(msg, WebhookType.FAILURE); - if(this._productionMode && !this._inDocker) { - await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString); - } - } - private _onAnalyticsMessage(data: any) { - let text = data.toString(); if(text === 'PONG') { this._pingResponded = true; @@ -217,30 +188,82 @@ export class AnalyticsService { } this._onMessage(AnalyticsMessage.fromObject(response)); } + + // cb(this: WebSocket, socket: WebSocket, request: http.IncomingMessage) + private async _onNewConnection(connection: WebSocket) { + if(this._socket_connection !== null) { + // TODO: use buildin websocket validator + console.error('There is already an analytics connection. Only one connection is supported.'); + // we send error and then close connection + connection.send('EALREADYEXISTING', () => { connection.close(); }); + return; + } + // TODO: log connection id + console.log('Got new analytic connection'); + this._socket_connection = connection; + this._socket_connection.on("message", this._onAnalyticsMessage.bind(this)); + // TODO: implement closing + this._socket_connection.on("close", this._onAnalyticsDown.bind(this)); + await this.sendText('hey'); + + console.log('Start analytics pinger...'); + // TODO: use websockets buildin pinger + this._runAlalyticsPinger(); + console.log('Analytics pinger started'); + } + + private async _onAnalyticsDown() { + if(!this._ready) { + // it's possible that ping is too slow and connection is closed + return; + } + this._stopAlalyticsPinger(); + if(this._socket_connection !== null) { + this._socket_connection.close(); + this._socket_connection = null; + } + this._ready = false; + const msg = 'Analytics is down'; + console.log(msg); + this._alertService.sendMessage(msg, WebhookType.FAILURE); + if(this._productionMode && !this._inDocker) { + await AnalyticsService._runAnalyticsProcess(); + } + } - private async _runAlalyticsPinger() { + private _runAlalyticsPinger() { this._analyticsPinger = setInterval(() => { if(this._isClosed) { return; } if(!this._pingResponded && this._ready) { - this._ready = false; this._onAnalyticsDown(); } this._pingResponded = false; // TODO: set life limit for this ping this.sendText('PING'); - }, config.ANLYTICS_PING_INTERVAL); + }, config.ANALYTICS_PING_INTERVAL); } - private static createIPCAddress(zmqConnectionString: string): string { - let filename = zmqConnectionString.substring(6); //without 'ipc://' - fs.writeFileSync(filename, ''); - return filename; + private _stopAlalyticsPinger() { + if(this._analyticsPinger !== null) { + clearInterval(this._analyticsPinger); + } + this._analyticsPinger = null; } public get queueLength() { return this._queue.length; } + public close() { + this._isClosed = true; + console.log('Terminating analytics service...'); + this._stopAlalyticsPinger(); + if(this._socket_connection !== null) { + this._socket_connection.close(); + } + console.log('Termination successful'); + } + } diff --git a/server/src/services/data_puller.ts b/server/src/services/data_puller.ts index e096383..0b00f3d 100644 --- a/server/src/services/data_puller.ts +++ b/server/src/services/data_puller.ts @@ -10,13 +10,13 @@ import { getGrafanaUrl } from '../utils/grafana'; import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit'; import * as _ from 'lodash'; -import { WebhookType } from './notification_service'; type MetricDataChunk = { values: [number, number][], columns: string[] }; const PULL_PERIOD_MS = 5000; + export class DataPuller { private _analyticReadyConsoleReporter = availableReporter( @@ -49,11 +49,9 @@ export class DataPuller { if(unit === undefined) { throw Error(`data puller: can't pull undefined unit`); } - const grafanaUrl = getGrafanaUrl(unit.grafanaUrl); let data = queryByMetric(unit.metric, grafanaUrl, from, to, HASTIC_API_KEY); return data; - } private pushData(unit: AnalyticUnit.AnalyticUnit, data: any) { @@ -74,7 +72,7 @@ export class DataPuller { } } - //TODO: group analyticUnits by panelID and send same dataset for group + // TODO: group analyticUnits by panelID and send same dataset for group public async runPuller() { const analyticUnits = await AnalyticUnit.findMany({ alert: true }); diff --git a/server/src/services/data_service/index.ts b/server/src/services/data_service/index.ts index 49dd374..b89173d 100644 --- a/server/src/services/data_service/index.ts +++ b/server/src/services/data_service/index.ts @@ -81,8 +81,7 @@ function maybeCreateDir(path: string): void { function checkDataFolders(): void { [ - config.DATA_PATH, - config.ZMQ_IPC_PATH + config.DATA_PATH ].forEach(maybeCreateDir); }