diff --git a/analytics/.vscode/launch.json b/analytics/.vscode/launch.json new file mode 100644 index 0000000..9f78f4e --- /dev/null +++ b/analytics/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}\\server.py" + } + ] +} \ No newline at end of file diff --git a/analytics/.vscode/settings.json b/analytics/.vscode/settings.json index 0a043ed..194e1c0 100644 --- a/analytics/.vscode/settings.json +++ b/analytics/.vscode/settings.json @@ -1,5 +1,6 @@ { "python.pythonPath": "python", + "python.linting.enabled": false, "terminal.integrated.shell.windows": "C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe", "editor.tabSize": 4, "editor.insertSpaces": true diff --git a/analytics/Compilation.md b/analytics/Compilation.md index f2e54d4..c804490 100644 --- a/analytics/Compilation.md +++ b/analytics/Compilation.md @@ -7,6 +7,6 @@ Compiled module is supported by all *nix systems. ```bash pip3 install pyinstaller cd $HASTIC_SERVER_PATH/analytics -pyinstaller --additional-hooks-dir=pyinstaller_hooks worker.py +pyinstaller --additional-hooks-dir=pyinstaller_hooks server.py ``` diff --git a/analytics/config.py b/analytics/config.py index 709a198..f00ed86 100644 --- a/analytics/config.py +++ b/analytics/config.py @@ -1,25 +1,31 @@ import os import json -def get_config_field(field, default_val = None): - val = default_val - config_exists = os.path.isfile(CONFIG_FILE) - if config_exists: - with open(CONFIG_FILE) as f: - config = json.load(f) +DATA_FOLDER = '../data' +CONFIG_FILE = '../config.json' + + +config_exists = os.path.isfile(CONFIG_FILE) +if config_exists: + with open(CONFIG_FILE) as f: + config = json.load(f) + + +def get_config_field(field, default_val = None): if field in os.environ: - val = os.environ[field] - elif config_exists and field in config: - val = config[field] - else: - raise Exception('Please configure {}'.format(field)) + return os.environ[field] + + if config_exists and field in config: + return config[field] + + if default_val is not None: + return default_val + + raise Exception('Please configure {}'.format(field)) - return val -DATA_FOLDER = '../data' -CONFIG_FILE = '../config.json' DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets/') ANOMALIES_FOLDER = os.path.join(DATA_FOLDER, 'anomalies/') @@ -27,4 +33,4 @@ MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models/') METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics/') HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') - +ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002') diff --git a/analytics/requirements.txt b/analytics/requirements.txt index 9caa056..01698dd 100644 --- a/analytics/requirements.txt +++ b/analytics/requirements.txt @@ -7,6 +7,7 @@ patsy==0.5.0 pefile==2017.11.5 python-dateutil==2.7.3 pytz==2018.4 +pyzmq==17.0.0 scikit-learn==0.19.1 scipy==1.1.0 seglearn==0.1.6 diff --git a/analytics/server.py b/analytics/server.py new file mode 100644 index 0000000..b4aa276 --- /dev/null +++ b/analytics/server.py @@ -0,0 +1,63 @@ +import config +import json +import logging +import zmq +import sys + +from worker import Worker + +root = logging.getLogger() +root.setLevel(logging.DEBUG) + +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") +ch.setFormatter(formatter) +root.addHandler(ch) + +logger = logging.getLogger('SERVER') +socket = None + +def handlePing(): + socket.send(b'pong') + +def handleTask(text): + try: + task = json.loads(text) + logger.info("Command is OK") + + socket.send_string(json.dumps({ + 'task': task['type'], + 'anomaly_id': task['anomaly_id'], + '__task_id': task['__task_id'], + 'status': "in progress" + })) + + res = w.do_task(task) + res['__task_id'] = task['__task_id'] + socket.send_string(json.dumps(res)) + + except Exception as e: + logger.error("Exception: '%s'" % str(e)) + + +if __name__ == "__main__": + w = Worker() + logger.info("Worker was started") + + logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) + context = zmq.Context() + socket = context.socket(zmq.PAIR) + socket.bind(config.ZEROMQ_CONNECTION_STRING) + logger.info("Ok") + + while True: + text = socket.recv() + logger.info('Got message %s' % text) + if text == b'ping': + handlePing() + logger.info('Sent pong') + else: + handleTask(text) + + diff --git a/analytics/worker.py b/analytics/worker.py index 852e801..69d68e5 100644 --- a/analytics/worker.py +++ b/analytics/worker.py @@ -1,3 +1,4 @@ +import config from anomaly_model import AnomalyModel from pattern_detection_model import PatternDetectionModel import queue @@ -6,15 +7,13 @@ import json import logging import sys import traceback +import time -logging.basicConfig(level=logging.WARNING, - format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', - filename='analytic_toolset.log', - filemode='a') -logger = logging.getLogger('analytic_toolset') +logger = logging.getLogger('WORKER') -class worker(object): + +class Worker(object): models_cache = {} thread = None queue = queue.Queue() @@ -108,28 +107,3 @@ class worker(object): return self.models_cache[anomaly_id] -if __name__ == "__main__": - w = worker() - logger.info("Worker was started") - while True: - try: - text = input("") - task = json.loads(text) - logger.info("Received command '%s'" % text) - if task['type'] == "stop": - logger.info("Stopping...") - break - print(json.dumps({ - 'task': task['type'], - 'anomaly_id': task['anomaly_id'], - '__task_id': task['__task_id'], - 'status': "in progress" - })) - sys.stdout.flush() - res = w.do_task(task) - res['__task_id'] = task['__task_id'] - print(json.dumps(res)) - sys.stdout.flush() - except Exception as e: - logger.error("Exception: '%s'" % str(e)) - diff --git a/config.example.json b/config.example.json index a27c12f..54d0ed7 100644 --- a/config.example.json +++ b/config.example.json @@ -1,4 +1,5 @@ { "HASTIC_PORT": 8000, - "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9" + "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9", + "ZEROMQ_CONNECTION_STRING": "ipc:///tmp/hastic/8000" } diff --git a/server/build/node-loader.js b/server/build/node-loader.js new file mode 100644 index 0000000..196d0ec --- /dev/null +++ b/server/build/node-loader.js @@ -0,0 +1,13 @@ +var path = require('path'); + +// based on: https://github.com/webpack-contrib/node-loader/blob/master/index.js +module.exports = function nodeLoader(m, q) { + return (` + var modulePath = __dirname + '/${path.basename(this.resourcePath)}'; + try { + global.process.dlopen(module, modulePath); + } catch(e) { + throw new Error('dlopen: Cannot open ' + modulePath + ': ' + e); + } + `); +} \ No newline at end of file diff --git a/server/build/webpack.614.prod.conf.js b/server/build/webpack.614.prod.conf.js index 3200acd..4c50954 100644 --- a/server/build/webpack.614.prod.conf.js +++ b/server/build/webpack.614.prod.conf.js @@ -1,8 +1,4 @@ const path = require('path'); -const fs = require('fs'); - -const webpack = require('webpack'); - module.exports = { mode: 'production', diff --git a/server/build/webpack.prod.conf.js b/server/build/webpack.prod.conf.js index 4022eca..5af7d63 100644 --- a/server/build/webpack.prod.conf.js +++ b/server/build/webpack.prod.conf.js @@ -1,4 +1,15 @@ var base = require('./webpack.base.conf'); + base.mode = 'production'; + +base.module.rules.push({ + test: /\.node$/, + use: [ + { loader: './build/node-loader' }, + { loader: 'file-loader', options: { name: '[name].[ext]' } }, + ] +}); + module.exports = base; + diff --git a/server/package.json b/server/package.json index 9f37fe6..2348acd 100644 --- a/server/package.json +++ b/server/package.json @@ -4,8 +4,8 @@ "description": "REST server for managing data for analytics", "scripts": { "start": "node dist/server.js", - "dev": "node build/dev-server.js", - "build-prod": "webpack --config build/webpack.prod.conf.js", + "dev": "NODE_ENV=development node build/dev-server.js", + "build-prod": "npm rebuild zeromq && webpack --config build/webpack.prod.conf.js", "build": "npm run build-prod && webpack --config build/webpack.614.prod.conf.js" }, "repository": { @@ -31,13 +31,16 @@ "babel-preset-es2015": "^6.24.1", "es6-promise": "^4.2.4", "event-stream": "^3.3.4", + "file-loader": "^1.1.11", "koa": "^2.5.1", "koa-bodyparser": "^4.2.1", "koa-router": "^7.4.0", + "node-loader": "^0.6.0", "nodemon": "^1.17.5", "ts-loader": "^4.4.1", "typescript": "^2.8.3", "webpack": "^4.12.0", - "webpack-cli": "^3.0.8" + "webpack-cli": "^3.0.8", + "zeromq": "^4.6.0" } } diff --git a/server/src/config.ts b/server/src/config.ts index 0bf14f8..57fe259 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -17,6 +17,9 @@ export const METRICS_PATH = path.join(DATA_PATH, 'metrics'); export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); +export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002'); + + function getConfigField(field, defaultVal?) { let val = defaultVal; @@ -31,7 +34,7 @@ function getConfigField(field, defaultVal?) { } if(val === undefined) { - throw new Error(`Please configure ${field}`) + throw new Error(`Please configure ${field}`); } return val; } diff --git a/server/src/services/analytics.ts b/server/src/services/analytics.ts index 52185af..19f5f52 100644 --- a/server/src/services/analytics.ts +++ b/server/src/services/analytics.ts @@ -35,9 +35,9 @@ async function runTask(task): Promise { }; task.__task_id = nextTaskId++; - await analyticsConnection.sendMessage(task); + await analyticsConnection.sendTask(task); - return new Promise((resolve, reject) => { + return new Promise(resolve => { taskMap[task.__task_id] = resolve; }) } diff --git a/server/src/services/analyticsConnection.ts b/server/src/services/analyticsConnection.ts index 45f71e5..da28053 100644 --- a/server/src/services/analyticsConnection.ts +++ b/server/src/services/analyticsConnection.ts @@ -1,39 +1,89 @@ -import { ANALYTICS_PATH } from '../config' +const zmq = require('zeromq'); -import { spawn, ChildProcess } from 'child_process' -import { split, mapSync } from 'event-stream'; +import { spawn } from 'child_process' +import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING } from '../config' import * as fs from 'fs'; import * as path from 'path'; + export class AnalyticsConnection { - private _learnWorker: ChildProcess; + private _requester: any; constructor(private _onResponse: (response: any) => void) { + this._initConnection(); + } + + public async sendTask(msgObj: any): Promise { + let message = JSON.stringify(msgObj); + return this.sendMessage(message); + } + + public async sendMessage(message: string): Promise { + return new Promise((resolve, reject) => { + this._requester.send(message, undefined, (err) => { + if(err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + public close() { + // TODO: close socket & terminate process if you have any + this._requester.close(); + } + + private async _initConnection() { + this._requester = zmq.socket('pair'); + + if(process.env.NODE_ENV !== 'development') { + this._runAnalyticsProcess(); + } + + console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING); + this._requester.connect(ZEROMQ_CONNECTION_STRING); + console.log('Ok'); + + console.log('Sending ping to analytics...'); + await this._connectToAnalytics(); + console.log('Ok') + + this._requester.on("message", this._onAnalyticsMessage.bind(this)); + + } + + private async _connectToAnalytics() { + this.sendMessage('ping'); // we don`t await here + return new Promise(resolve => { + this._requester.once('message', (message) => { + console.log('Got message from analytics: ' + message); + resolve(); + }) + }); + } + + private _runAnalyticsProcess() { + console.log('Creating analytics process...'); if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) { - this._learnWorker = spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH }) + console.log('dist/worker/worker'); + spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH }) } else { + console.log('python3 server.py'); // If compiled analytics script doesn't exist - fallback to regular python - this._learnWorker = spawn('python3', ['worker.py'], { cwd: ANALYTICS_PATH }) + spawn('python3', ['server.py'], { cwd: ANALYTICS_PATH }) } - - this._learnWorker.stdout.pipe( - split()).pipe(mapSync(this._onPipeMessage.bind(this)) - ); - this._learnWorker.stderr.on('data', data => console.error(`worker stderr: ${data}`)); + console.log('ok'); } - private _onPipeMessage(data) { - console.log(`worker stdout: ${data}`); + private _onAnalyticsMessage(data) { + console.log(`analytics message: ${data}`); let response = JSON.parse(data); this._onResponse(response); } - public async sendMessage(task: any): Promise { - let command = JSON.stringify(task); - this._learnWorker.stdin.write(`${command}\n`); - } - }