Browse Source

Websocket connectivity between server and analytics (#814)

pull/1/head
rozetko 4 years ago committed by GitHub
parent
commit
45dc3f22b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      analytics/analytics/config.py
  2. 38
      analytics/analytics/services/server_service.py
  3. 3
      analytics/requirements.txt
  4. 23
      server/build/webpack.prod.conf.js
  5. 7
      server/package.json
  6. 1
      server/spec/setup_tests.ts
  7. 34
      server/src/config.ts
  8. 23
      server/src/index.ts
  9. 1
      server/src/services/alert_service.ts
  10. 127
      server/src/services/analytics_service.ts
  11. 6
      server/src/services/data_puller.ts
  12. 3
      server/src/services/data_service/index.ts

3
analytics/analytics/config.py

@ -26,6 +26,5 @@ def get_config_field(field: str, default_val = None):
raise Exception('Please configure {}'.format(field))
ZMQ_DEV_PORT = get_config_field('ZMQ_DEV_PORT', '8002')
ZMQ_CONNECTION_STRING = get_config_field('ZMQ_CONNECTION_STRING', 'tcp://0.0.0.0:%s' % ZMQ_DEV_PORT)
HASTIC_SERVER_URL = get_config_field('HASTIC_SERVER_URL', 'ws://localhost:8002')
LEARNING_TIMEOUT = get_config_field('LEARNING_TIMEOUT', 120)

38
analytics/analytics/services/server_service.py

@ -1,7 +1,6 @@
import config
import zmq
import zmq.asyncio
import websockets
import logging
import json
@ -23,6 +22,7 @@ SERVER_SOCKET_RECV_LOOP_INTERRUPTED = False
@utils.meta.JSONClass
class ServerMessage:
def __init__(self, method: str, payload: object = None, request_id: int = None):
# TODO: add error type / case
self.method = method
self.payload = payload
self.request_id = request_id
@ -33,6 +33,8 @@ class ServerService(utils.concurrent.AsyncZmqActor):
def __init__(self):
super(ServerService, self).__init__()
self.__aiter_inited = False
# this typing doesn't help vscode, maybe there is a mistake
self.__server_socket: Optional[websockets.Connect] = None
self.__request_next_id = 1
self.__responses = dict()
self.start()
@ -78,24 +80,44 @@ class ServerService(utils.concurrent.AsyncZmqActor):
return server_message
async def _run_thread(self):
logger.info("Binding to %s ..." % config.ZMQ_CONNECTION_STRING)
self.__server_socket = self._zmq_context.socket(zmq.PAIR)
self.__server_socket.bind(config.ZMQ_CONNECTION_STRING)
logger.info("Binding to %s ..." % config.HASTIC_SERVER_URL)
# TODO: consider to use async context for socket
await self.__server_socket_recv_loop()
async def _on_message_to_thread(self, message: str):
await self.__server_socket.send_string(message)
await self.__server_socket.send(message)
async def __server_socket_recv_loop(self):
while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED:
received_string = await self.__server_socket.recv_string()
received_string = await self.__reconnect_recv()
if received_string == 'PING':
asyncio.ensure_future(self.__handle_ping())
else:
asyncio.ensure_future(self._send_message_from_thread(received_string))
async def __reconnect_recv(self) -> str:
while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED:
try:
if self.__server_socket is None:
self.__server_socket = await websockets.connect(config.HASTIC_SERVER_URL)
first_message = await self.__server_socket.recv()
if first_message == 'EALREADYEXISTING':
raise ConnectionError('Can`t connect as a second analytics')
return await self.__server_socket.recv()
except (ConnectionRefusedError, websockets.ConnectionClosedError):
if not self.__server_socket is None:
self.__server_socket.close()
# TODO: this logic increases the number of ThreadPoolExecutor
self.__server_socket = None
# TODO: move to config
reconnect_delay = 3
print('connection is refused or lost, trying to reconnect in %s seconds' % reconnect_delay)
await asyncio.sleep(reconnect_delay)
raise InterruptedError()
async def __handle_ping(self):
await self.__server_socket.send_string('PONG')
# TODO: self.__server_socket can be None
await self.__server_socket.send('PONG')
def __parse_message_or_save(self, text: str) -> Optional[ServerMessage]:
try:

3
analytics/requirements.txt

@ -3,4 +3,5 @@ aiounittest==1.1.0
numpy==1.14.5
pandas==0.20.3
pyzmq==18.0.1
scipy==1.1.0
scipy==1.1.0
websockets==8.1

23
server/build/webpack.prod.conf.js

@ -1,3 +1,5 @@
const semver = require('semver');
const webpack = require('webpack');
const path = require('path');
const fs = require('fs');
@ -6,20 +8,12 @@ var base = require('./webpack.base.conf');
const TARGET_NODE_VERSION = process.versions.node;
const PLATFORM = `${process.platform}-${process.arch}-node-${TARGET_NODE_VERSION.split('.')[0]}`;
const DEASYNC_NODE_MODULES_PATH = path.resolve('node_modules', 'deasync', 'bin', PLATFORM);
const DEASYNC_DIST_PATH = path.resolve('dist', 'bin', PLATFORM);
console.log(`Target node version: ${TARGET_NODE_VERSION}`);
console.log(`Platform: ${PLATFORM}`);
const DEASYNC_NODE_MODULES_PATH = path.resolve(
'node_modules',
'deasync',
'bin',
PLATFORM
);
const DEASYNC_DIST_PATH = path.resolve('dist', 'bin', PLATFORM);
if(!fs.existsSync(DEASYNC_NODE_MODULES_PATH)) {
throw new Error(`deasync doesn't support this platform: ${PLATFORM}`);
}
@ -45,12 +39,11 @@ const prodRules = [
use: {
loader: 'babel-loader',
options: {
plugins: ["transform-object-rest-spread"], // for transpiling "ws" lib
// it's necessare only for node < 8.3.0,
// so could be optimized
presets: [
["env", {
"targets": {
"node": TARGET_NODE_VERSION
}
}]
["env", { "targets": { "node": TARGET_NODE_VERSION }}]
]
}
}

7
server/package.json

@ -5,7 +5,7 @@
"scripts": {
"start": "node dist/server.js",
"dev": "NODE_ENV=development node build/dev-server.js",
"build": "npm rebuild zeromq && webpack --config build/webpack.prod.conf.js",
"build": "webpack --config build/webpack.prod.conf.js",
"test": "jest --config jest.config.js"
},
"repository": {
@ -28,11 +28,12 @@
"@types/lodash": "^4.14.116",
"@types/mongodb": "^3.3.1",
"@types/nedb": "^1.8.0",
"@types/zeromq": "^4.6.0",
"@types/ws": "^6.0.4",
"axios": "^0.18.0",
"babel-core": "^6.26.3",
"babel-jest": "^23.4.2",
"babel-loader": "^7.1.4",
"babel-plugin-transform-object-rest-spread": "^6.26.0",
"babel-polyfill": "^6.26.0",
"babel-preset-env": "^1.7.0",
"babel-preset-es2015": "^6.24.1",
@ -57,6 +58,6 @@
"url": "^0.11.0",
"webpack": "^4.12.0",
"webpack-cli": "^3.0.8",
"zeromq": "^4.6.0"
"ws": "^7.2.1"
}
}

1
server/spec/setup_tests.ts

@ -7,7 +7,6 @@ console.error = jest.fn();
jest.mock('../src/config.ts', () => ({
DATA_PATH: 'fake-data-path',
HASTIC_API_KEY: 'fake-key',
ZMQ_IPC_PATH: 'fake-zmq-path',
HASTIC_DB_CONNECTION_TYPE: 'nedb',
HASTIC_IN_MEMORY_PERSISTANCE: true,
HASTIC_ALERT_TYPE: 'webhook',

34
server/src/config.ts

@ -48,9 +48,6 @@ export const DETECTION_SPANS_DATABASE_PATH = path.join(DATA_PATH, 'detection_spa
export const DB_META_PATH = path.join(DATA_PATH, 'db_meta.db');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');
export const ZMQ_IPC_PATH = getConfigField('ZMQ_IPC_PATH', path.join(os.tmpdir(), 'hastic'));
export const ZMQ_DEV_PORT = getConfigField('ZMQ_DEV_PORT', '8002');
export const ZMQ_HOST = getConfigField('ZMQ_HOST', '127.0.0.1');
export const HASTIC_API_KEY = getConfigField('HASTIC_API_KEY');
export const GRAFANA_URL = normalizeUrl(getConfigField('GRAFANA_URL', null));
@ -69,7 +66,7 @@ export const HASTIC_TIMEZONE_OFFSET = getTimeZoneOffset();
export const HASTIC_ALERTMANAGER_URL = getConfigField('HASTIC_ALERTMANAGER_URL', null);
export const ANLYTICS_PING_INTERVAL = 500; // ms
export const ANALYTICS_PING_INTERVAL = 500; // ms
export const PACKAGE_VERSION = getPackageVersion();
export const GIT_INFO = {
branch: GIT_BRANCH,
@ -79,7 +76,8 @@ export const GIT_INFO = {
export const INSIDE_DOCKER = process.env.INSIDE_DOCKER !== undefined;
export const PRODUCTION_MODE = process.env.NODE_ENV !== 'development';
export const ZMQ_CONNECTION_STRING = createZMQConnectionString();
// TODO: maybe rename it to "HASTIC_SERVER_ANALYTICS_URL"
export const HASTIC_SERVER_URL = getConfigField('HASTIC_SERVER_URL', 'ws://localhost:8002');
export const HASTIC_INSTANCE_NAME = getConfigField('HASTIC_INSTANCE_NAME', os.hostname());
@ -126,17 +124,23 @@ function getPackageVersion() {
}
}
function createZMQConnectionString() {
let zmq =`tcp://${ZMQ_HOST}:${ZMQ_DEV_PORT}`; //debug mode
let zmqConf = getConfigField('ZMQ_CONNECTION_STRING', null);
if(INSIDE_DOCKER) {
return zmqConf;
} else if(PRODUCTION_MODE) {
if(zmqConf === null) {
return 'ipc://' + `${path.join(ZMQ_IPC_PATH, process.pid.toString())}.ipc`;
}
function getGitInfo() {
let gitRoot = path.join(__dirname, '../../.git');
let gitHeadFile = path.join(gitRoot, 'HEAD');
if(!fs.existsSync(gitHeadFile)) {
console.error(`Can't find git HEAD file ${gitHeadFile}`);
return null;
}
const ref = fs.readFileSync(gitHeadFile).toString();
let branchPath = ref.indexOf(':') === -1 ? ref : ref.slice(5, -1);
let branch = branchPath.split('/').pop();
const branchFilename = `${gitRoot}/${branchPath}`;
if(!fs.existsSync(branchFilename)) {
console.error(`Can't find git branch file ${branchFilename}`);
return null;
}
return zmq;
let commitHash = fs.readFileSync(branchFilename).toString().slice(0, 7);
return { branch, commitHash };
}
// TODO: move to data_layer

23
server/src/index.ts

@ -8,7 +8,7 @@ import * as AnalyticsController from './controllers/analytics_controller';
import * as ProcessService from './services/process_service';
import { HASTIC_PORT, PACKAGE_VERSION, GIT_INFO, ZMQ_CONNECTION_STRING, HASTIC_INSTANCE_NAME } from './config';
import { HASTIC_PORT, PACKAGE_VERSION, GIT_INFO, HASTIC_INSTANCE_NAME } from './config';
import { applyDBMigrations } from './services/data_service/migrations';
@ -16,21 +16,24 @@ import * as Koa from 'koa';
import * as Router from 'koa-router';
import * as bodyParser from 'koa-bodyparser';
import { createServer, Server } from 'http';
init();
async function init() {
await applyDBMigrations();
AnalyticsController.init();
ProcessService.registerExitHandler(AnalyticsController.terminate);
const app = new Koa();
let httpServer = createServer(app.callback());
AnalyticsController.init();
ProcessService.registerExitHandler(AnalyticsController.terminate);
app.on('error', (err, ctx) => {
console.log('got server error:');
console.log(err);
});
app.use(bodyParser());
app.use(async function(ctx, next) {
@ -77,7 +80,6 @@ async function init() {
packageVersion: PACKAGE_VERSION,
npmUserAgent: process.env.npm_config_user_agent,
docker: process.env.INSIDE_DOCKER !== undefined,
zmqConectionString: ZMQ_CONNECTION_STRING,
serverPort: HASTIC_PORT,
git: GIT_INFO,
activeWebhooks: activeWebhooks.length,
@ -89,7 +91,16 @@ async function init() {
.use(rootRouter.routes())
.use(rootRouter.allowedMethods());
app.listen(HASTIC_PORT, () => {
httpServer.listen({ port: HASTIC_PORT, exclusive: true }, () => {
console.log(`Server is running on :${HASTIC_PORT}`);
});
httpServer.on('error', (err) => {
console.error(`Http server error: ${err.message}`)
});
ProcessService.registerExitHandler(() => {
httpServer.close();
});
}

1
server/src/services/alert_service.ts

@ -8,6 +8,7 @@ import { ORG_ID, HASTIC_API_KEY, HASTIC_ALERT_IMAGE } from '../config';
import axios from 'axios';
import * as _ from 'lodash';
const Notifier = getNotifier();
export class Alert {
public enabled = true;

127
server/src/services/analytics_service.ts

@ -4,7 +4,7 @@ import { WebhookType } from '../services/notification_service';
import * as config from '../config';
import { AlertService } from './alert_service';
import * as zmq from 'zeromq';
import * as WebSocket from 'ws';
import * as childProcess from 'child_process'
import * as fs from 'fs';
@ -15,13 +15,12 @@ import * as _ from 'lodash';
export class AnalyticsService {
private _alertService = new AlertService();
private _requester: any;
private _socket_server: WebSocket.Server;
private _socket_connection: WebSocket = null;
private _ready: boolean = false;
private _lastAlive: Date = null;
private _pingResponded = false;
private _zmqConnectionString: string = null;
private _ipcPath: string = null;
private _analyticsPinger: NodeJS.Timer = null;
private _analyticsPinger: NodeJS.Timeout = null;
private _isClosed = false;
private _productionMode = false;
private _inDocker = false;
@ -59,7 +58,10 @@ export class AnalyticsService {
public async sendText(text: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
this._requester.send(text, undefined, (err: any) => {
if(this._socket_connection === null) {
reject('Can`t send because analytics is not connected');
}
this._socket_connection.send(text, undefined, (err: any) => {
if(err) {
console.trace(`got error while sending ${err}`);
reject(err);
@ -70,39 +72,22 @@ export class AnalyticsService {
});
}
public close() {
this._isClosed = true;
console.log('Terminating analytics service...');
clearInterval(this._analyticsPinger);
if(this._ipcPath !== null) {
console.log('Remove ipc path: ' + this._ipcPath);
fs.unlinkSync(this._ipcPath);
}
this._requester.close();
console.log('Terminating successful');
}
public get ready(): boolean { return this._ready; }
public get lastAlive(): Date { return this._lastAlive; }
private async _init() {
this._requester = zmq.socket('pair');
this._zmqConnectionString = config.ZMQ_CONNECTION_STRING;
this._socket_server = new WebSocket.Server({ port: 8002 });
if(this._zmqConnectionString.startsWith('ipc')) {
this._ipcPath = AnalyticsService.createIPCAddress(this._zmqConnectionString);
}
// TODO: move this to config OR use existing http server
console.log("Creating websocket server ... %s", 'ws://localhost:8002');
console.log("Binding to zmq... %s", this._zmqConnectionString);
this._requester.connect(this._zmqConnectionString);
this._requester.on("message", this._onAnalyticsMessage.bind(this));
console.log('Binding successful');
this._socket_server.on("connection", this._onNewConnection.bind(this));
// TODO: handle connection drop
if(this._productionMode && !this._inDocker) {
console.log('Creating analytics process...');
try {
var cp = await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
var cp = await AnalyticsService._runAnalyticsProcess();
} catch(error) {
console.error('Can`t run analytics process: %s', error);
return;
@ -110,10 +95,6 @@ export class AnalyticsService {
console.log('Analytics creating successful, pid: %s', cp.pid);
}
console.log('Start analytics pinger...');
this._runAlalyticsPinger();
console.log('Analytics pinger started');
}
/**
@ -123,13 +104,13 @@ export class AnalyticsService {
* @returns Creaded child process
* @throws Process start error or first exception during python start
*/
private static async _runAnalyticsProcess(zmqConnectionString: string): Promise<childProcess.ChildProcess> {
private static async _runAnalyticsProcess(): Promise<childProcess.ChildProcess> {
let cp: childProcess.ChildProcess;
let cpOptions = {
cwd: config.ANALYTICS_PATH,
env: {
...process.env,
ZMQ_CONNECTION_STRING: zmqConnectionString
HASTIC_SERVER_URL: config.HASTIC_SERVER_URL
}
};
@ -185,17 +166,7 @@ export class AnalyticsService {
this._alertService.sendMessage(msg, WebhookType.RECOVERY);
}
private async _onAnalyticsDown() {
const msg = 'Analytics is down';
console.log(msg);
this._alertService.sendMessage(msg, WebhookType.FAILURE);
if(this._productionMode && !this._inDocker) {
await AnalyticsService._runAnalyticsProcess(this._zmqConnectionString);
}
}
private _onAnalyticsMessage(data: any) {
let text = data.toString();
if(text === 'PONG') {
this._pingResponded = true;
@ -217,30 +188,82 @@ export class AnalyticsService {
}
this._onMessage(AnalyticsMessage.fromObject(response));
}
// cb(this: WebSocket, socket: WebSocket, request: http.IncomingMessage)
private async _onNewConnection(connection: WebSocket) {
if(this._socket_connection !== null) {
// TODO: use buildin websocket validator
console.error('There is already an analytics connection. Only one connection is supported.');
// we send error and then close connection
connection.send('EALREADYEXISTING', () => { connection.close(); });
return;
}
// TODO: log connection id
console.log('Got new analytic connection');
this._socket_connection = connection;
this._socket_connection.on("message", this._onAnalyticsMessage.bind(this));
// TODO: implement closing
this._socket_connection.on("close", this._onAnalyticsDown.bind(this));
await this.sendText('hey');
console.log('Start analytics pinger...');
// TODO: use websockets buildin pinger
this._runAlalyticsPinger();
console.log('Analytics pinger started');
}
private async _onAnalyticsDown() {
if(!this._ready) {
// it's possible that ping is too slow and connection is closed
return;
}
this._stopAlalyticsPinger();
if(this._socket_connection !== null) {
this._socket_connection.close();
this._socket_connection = null;
}
this._ready = false;
const msg = 'Analytics is down';
console.log(msg);
this._alertService.sendMessage(msg, WebhookType.FAILURE);
if(this._productionMode && !this._inDocker) {
await AnalyticsService._runAnalyticsProcess();
}
}
private async _runAlalyticsPinger() {
private _runAlalyticsPinger() {
this._analyticsPinger = setInterval(() => {
if(this._isClosed) {
return;
}
if(!this._pingResponded && this._ready) {
this._ready = false;
this._onAnalyticsDown();
}
this._pingResponded = false;
// TODO: set life limit for this ping
this.sendText('PING');
}, config.ANLYTICS_PING_INTERVAL);
}, config.ANALYTICS_PING_INTERVAL);
}
private static createIPCAddress(zmqConnectionString: string): string {
let filename = zmqConnectionString.substring(6); //without 'ipc://'
fs.writeFileSync(filename, '');
return filename;
private _stopAlalyticsPinger() {
if(this._analyticsPinger !== null) {
clearInterval(this._analyticsPinger);
}
this._analyticsPinger = null;
}
public get queueLength() {
return this._queue.length;
}
public close() {
this._isClosed = true;
console.log('Terminating analytics service...');
this._stopAlalyticsPinger();
if(this._socket_connection !== null) {
this._socket_connection.close();
}
console.log('Termination successful');
}
}

6
server/src/services/data_puller.ts

@ -10,13 +10,13 @@ import { getGrafanaUrl } from '../utils/grafana';
import { queryByMetric, GrafanaUnavailable, DatasourceUnavailable } from 'grafana-datasource-kit';
import * as _ from 'lodash';
import { WebhookType } from './notification_service';
type MetricDataChunk = { values: [number, number][], columns: string[] };
const PULL_PERIOD_MS = 5000;
export class DataPuller {
private _analyticReadyConsoleReporter = availableReporter(
@ -49,11 +49,9 @@ export class DataPuller {
if(unit === undefined) {
throw Error(`data puller: can't pull undefined unit`);
}
const grafanaUrl = getGrafanaUrl(unit.grafanaUrl);
let data = queryByMetric(unit.metric, grafanaUrl, from, to, HASTIC_API_KEY);
return data;
}
private pushData(unit: AnalyticUnit.AnalyticUnit, data: any) {
@ -74,7 +72,7 @@ export class DataPuller {
}
}
//TODO: group analyticUnits by panelID and send same dataset for group
// TODO: group analyticUnits by panelID and send same dataset for group
public async runPuller() {
const analyticUnits = await AnalyticUnit.findMany({ alert: true });

3
server/src/services/data_service/index.ts

@ -81,8 +81,7 @@ function maybeCreateDir(path: string): void {
function checkDataFolders(): void {
[
config.DATA_PATH,
config.ZMQ_IPC_PATH
config.DATA_PATH
].forEach(maybeCreateDir);
}

Loading…
Cancel
Save