Browse Source

Analytics server messaging #24 v2 (#49)

* add zmq to deps

* basic zmq usage & build system fxs

* continue zmq integration & refactorings

* server.py + logging

* some commit

* ping-pong server-analytics & pair type

* packing zmq.node for production
pull/1/head
Alexey Velikiy 6 years ago committed by rozetko
parent
commit
52b3a1286d
  1. 14
      analytics/.vscode/launch.json
  2. 1
      analytics/.vscode/settings.json
  3. 2
      analytics/Compilation.md
  4. 26
      analytics/config.py
  5. 1
      analytics/requirements.txt
  6. 63
      analytics/server.py
  7. 36
      analytics/worker.py
  8. 3
      config.example.json
  9. 13
      server/build/node-loader.js
  10. 4
      server/build/webpack.614.prod.conf.js
  11. 11
      server/build/webpack.prod.conf.js
  12. 9
      server/package.json
  13. 5
      server/src/config.ts
  14. 4
      server/src/services/analytics.ts
  15. 86
      server/src/services/analyticsConnection.ts

14
analytics/.vscode/launch.json vendored

@ -0,0 +1,14 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}\\server.py"
}
]
}

1
analytics/.vscode/settings.json vendored

@ -1,5 +1,6 @@
{ {
"python.pythonPath": "python", "python.pythonPath": "python",
"python.linting.enabled": false,
"terminal.integrated.shell.windows": "C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe", "terminal.integrated.shell.windows": "C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\powershell.exe",
"editor.tabSize": 4, "editor.tabSize": 4,
"editor.insertSpaces": true "editor.insertSpaces": true

2
analytics/Compilation.md

@ -7,6 +7,6 @@ Compiled module is supported by all *nix systems.
```bash ```bash
pip3 install pyinstaller pip3 install pyinstaller
cd $HASTIC_SERVER_PATH/analytics cd $HASTIC_SERVER_PATH/analytics
pyinstaller --additional-hooks-dir=pyinstaller_hooks worker.py pyinstaller --additional-hooks-dir=pyinstaller_hooks server.py
``` ```

26
analytics/config.py

@ -1,25 +1,31 @@
import os import os
import json import json
def get_config_field(field, default_val = None):
val = default_val DATA_FOLDER = '../data'
CONFIG_FILE = '../config.json'
config_exists = os.path.isfile(CONFIG_FILE) config_exists = os.path.isfile(CONFIG_FILE)
if config_exists: if config_exists:
with open(CONFIG_FILE) as f: with open(CONFIG_FILE) as f:
config = json.load(f) config = json.load(f)
def get_config_field(field, default_val = None):
if field in os.environ: if field in os.environ:
val = os.environ[field] return os.environ[field]
elif config_exists and field in config:
val = config[field] if config_exists and field in config:
else: return config[field]
if default_val is not None:
return default_val
raise Exception('Please configure {}'.format(field)) raise Exception('Please configure {}'.format(field))
return val
DATA_FOLDER = '../data'
CONFIG_FILE = '../config.json'
DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets/') DATASET_FOLDER = os.path.join(DATA_FOLDER, 'datasets/')
ANOMALIES_FOLDER = os.path.join(DATA_FOLDER, 'anomalies/') ANOMALIES_FOLDER = os.path.join(DATA_FOLDER, 'anomalies/')
@ -27,4 +33,4 @@ MODELS_FOLDER = os.path.join(DATA_FOLDER, 'models/')
METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics/') METRICS_FOLDER = os.path.join(DATA_FOLDER, 'metrics/')
HASTIC_API_KEY = get_config_field('HASTIC_API_KEY') HASTIC_API_KEY = get_config_field('HASTIC_API_KEY')
ZEROMQ_CONNECTION_STRING = get_config_field('ZEROMQ_CONNECTION_STRING', 'tcp://*:8002')

1
analytics/requirements.txt

@ -7,6 +7,7 @@ patsy==0.5.0
pefile==2017.11.5 pefile==2017.11.5
python-dateutil==2.7.3 python-dateutil==2.7.3
pytz==2018.4 pytz==2018.4
pyzmq==17.0.0
scikit-learn==0.19.1 scikit-learn==0.19.1
scipy==1.1.0 scipy==1.1.0
seglearn==0.1.6 seglearn==0.1.6

63
analytics/server.py

@ -0,0 +1,63 @@
import config
import json
import logging
import zmq
import sys
from worker import Worker
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
ch.setFormatter(formatter)
root.addHandler(ch)
logger = logging.getLogger('SERVER')
socket = None
def handlePing():
socket.send(b'pong')
def handleTask(text):
try:
task = json.loads(text)
logger.info("Command is OK")
socket.send_string(json.dumps({
'task': task['type'],
'anomaly_id': task['anomaly_id'],
'__task_id': task['__task_id'],
'status': "in progress"
}))
res = w.do_task(task)
res['__task_id'] = task['__task_id']
socket.send_string(json.dumps(res))
except Exception as e:
logger.error("Exception: '%s'" % str(e))
if __name__ == "__main__":
w = Worker()
logger.info("Worker was started")
logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING)
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind(config.ZEROMQ_CONNECTION_STRING)
logger.info("Ok")
while True:
text = socket.recv()
logger.info('Got message %s' % text)
if text == b'ping':
handlePing()
logger.info('Sent pong')
else:
handleTask(text)

36
analytics/worker.py

@ -1,3 +1,4 @@
import config
from anomaly_model import AnomalyModel from anomaly_model import AnomalyModel
from pattern_detection_model import PatternDetectionModel from pattern_detection_model import PatternDetectionModel
import queue import queue
@ -6,15 +7,13 @@ import json
import logging import logging
import sys import sys
import traceback import traceback
import time
logging.basicConfig(level=logging.WARNING,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
filename='analytic_toolset.log',
filemode='a')
logger = logging.getLogger('analytic_toolset')
logger = logging.getLogger('WORKER')
class worker(object):
class Worker(object):
models_cache = {} models_cache = {}
thread = None thread = None
queue = queue.Queue() queue = queue.Queue()
@ -108,28 +107,3 @@ class worker(object):
return self.models_cache[anomaly_id] return self.models_cache[anomaly_id]
if __name__ == "__main__":
w = worker()
logger.info("Worker was started")
while True:
try:
text = input("")
task = json.loads(text)
logger.info("Received command '%s'" % text)
if task['type'] == "stop":
logger.info("Stopping...")
break
print(json.dumps({
'task': task['type'],
'anomaly_id': task['anomaly_id'],
'__task_id': task['__task_id'],
'status': "in progress"
}))
sys.stdout.flush()
res = w.do_task(task)
res['__task_id'] = task['__task_id']
print(json.dumps(res))
sys.stdout.flush()
except Exception as e:
logger.error("Exception: '%s'" % str(e))

3
config.example.json

@ -1,4 +1,5 @@
{ {
"HASTIC_PORT": 8000, "HASTIC_PORT": 8000,
"HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9" "HASTIC_API_KEY": "eyJrIjoiVjZqMHY0dHk4UEE3eEN4MzgzRnd2aURlMWlIdXdHNW4iLCJuIjoiaGFzdGljIiwiaWQiOjF9",
"ZEROMQ_CONNECTION_STRING": "ipc:///tmp/hastic/8000"
} }

13
server/build/node-loader.js

@ -0,0 +1,13 @@
var path = require('path');
// based on: https://github.com/webpack-contrib/node-loader/blob/master/index.js
module.exports = function nodeLoader(m, q) {
return (`
var modulePath = __dirname + '/${path.basename(this.resourcePath)}';
try {
global.process.dlopen(module, modulePath);
} catch(e) {
throw new Error('dlopen: Cannot open ' + modulePath + ': ' + e);
}
`);
}

4
server/build/webpack.614.prod.conf.js

@ -1,8 +1,4 @@
const path = require('path'); const path = require('path');
const fs = require('fs');
const webpack = require('webpack');
module.exports = { module.exports = {
mode: 'production', mode: 'production',

11
server/build/webpack.prod.conf.js

@ -1,4 +1,15 @@
var base = require('./webpack.base.conf'); var base = require('./webpack.base.conf');
base.mode = 'production'; base.mode = 'production';
base.module.rules.push({
test: /\.node$/,
use: [
{ loader: './build/node-loader' },
{ loader: 'file-loader', options: { name: '[name].[ext]' } },
]
});
module.exports = base; module.exports = base;

9
server/package.json

@ -4,8 +4,8 @@
"description": "REST server for managing data for analytics", "description": "REST server for managing data for analytics",
"scripts": { "scripts": {
"start": "node dist/server.js", "start": "node dist/server.js",
"dev": "node build/dev-server.js", "dev": "NODE_ENV=development node build/dev-server.js",
"build-prod": "webpack --config build/webpack.prod.conf.js", "build-prod": "npm rebuild zeromq && webpack --config build/webpack.prod.conf.js",
"build": "npm run build-prod && webpack --config build/webpack.614.prod.conf.js" "build": "npm run build-prod && webpack --config build/webpack.614.prod.conf.js"
}, },
"repository": { "repository": {
@ -31,13 +31,16 @@
"babel-preset-es2015": "^6.24.1", "babel-preset-es2015": "^6.24.1",
"es6-promise": "^4.2.4", "es6-promise": "^4.2.4",
"event-stream": "^3.3.4", "event-stream": "^3.3.4",
"file-loader": "^1.1.11",
"koa": "^2.5.1", "koa": "^2.5.1",
"koa-bodyparser": "^4.2.1", "koa-bodyparser": "^4.2.1",
"koa-router": "^7.4.0", "koa-router": "^7.4.0",
"node-loader": "^0.6.0",
"nodemon": "^1.17.5", "nodemon": "^1.17.5",
"ts-loader": "^4.4.1", "ts-loader": "^4.4.1",
"typescript": "^2.8.3", "typescript": "^2.8.3",
"webpack": "^4.12.0", "webpack": "^4.12.0",
"webpack-cli": "^3.0.8" "webpack-cli": "^3.0.8",
"zeromq": "^4.6.0"
} }
} }

5
server/src/config.ts

@ -17,6 +17,9 @@ export const METRICS_PATH = path.join(DATA_PATH, 'metrics');
export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments'); export const SEGMENTS_PATH = path.join(DATA_PATH, 'segments');
export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000'); export const HASTIC_PORT = getConfigField('HASTIC_PORT', '8000');
export const ZEROMQ_CONNECTION_STRING = getConfigField('ZEROMQ_CONNECTION_STRING', 'tcp://127.0.0.1:8002');
function getConfigField(field, defaultVal?) { function getConfigField(field, defaultVal?) {
let val = defaultVal; let val = defaultVal;
@ -31,7 +34,7 @@ function getConfigField(field, defaultVal?) {
} }
if(val === undefined) { if(val === undefined) {
throw new Error(`Please configure ${field}`) throw new Error(`Please configure ${field}`);
} }
return val; return val;
} }

4
server/src/services/analytics.ts

@ -35,9 +35,9 @@ async function runTask(task): Promise<any> {
}; };
task.__task_id = nextTaskId++; task.__task_id = nextTaskId++;
await analyticsConnection.sendMessage(task); await analyticsConnection.sendTask(task);
return new Promise<void>((resolve, reject) => { return new Promise<void>(resolve => {
taskMap[task.__task_id] = resolve; taskMap[task.__task_id] = resolve;
}) })
} }

86
server/src/services/analyticsConnection.ts

@ -1,39 +1,89 @@
import { ANALYTICS_PATH } from '../config' const zmq = require('zeromq');
import { spawn, ChildProcess } from 'child_process' import { spawn } from 'child_process'
import { split, mapSync } from 'event-stream'; import { ANALYTICS_PATH, ZEROMQ_CONNECTION_STRING } from '../config'
import * as fs from 'fs'; import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
export class AnalyticsConnection { export class AnalyticsConnection {
private _learnWorker: ChildProcess; private _requester: any;
constructor(private _onResponse: (response: any) => void) { constructor(private _onResponse: (response: any) => void) {
this._initConnection();
}
public async sendTask(msgObj: any): Promise<void> {
let message = JSON.stringify(msgObj);
return this.sendMessage(message);
}
public async sendMessage(message: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
this._requester.send(message, undefined, (err) => {
if(err) {
reject(err);
} else {
resolve();
}
});
});
}
public close() {
// TODO: close socket & terminate process if you have any
this._requester.close();
}
private async _initConnection() {
this._requester = zmq.socket('pair');
if(process.env.NODE_ENV !== 'development') {
this._runAnalyticsProcess();
}
console.log("Binding to zmq...: %s", ZEROMQ_CONNECTION_STRING);
this._requester.connect(ZEROMQ_CONNECTION_STRING);
console.log('Ok');
console.log('Sending ping to analytics...');
await this._connectToAnalytics();
console.log('Ok')
this._requester.on("message", this._onAnalyticsMessage.bind(this));
}
private async _connectToAnalytics() {
this.sendMessage('ping'); // we don`t await here
return new Promise(resolve => {
this._requester.once('message', (message) => {
console.log('Got message from analytics: ' + message);
resolve();
})
});
}
private _runAnalyticsProcess() {
console.log('Creating analytics process...');
if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) { if(fs.existsSync(path.join(ANALYTICS_PATH, 'dist/worker/worker'))) {
this._learnWorker = spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH }) console.log('dist/worker/worker');
spawn('dist/worker/worker', [], { cwd: ANALYTICS_PATH })
} else { } else {
console.log('python3 server.py');
// If compiled analytics script doesn't exist - fallback to regular python // If compiled analytics script doesn't exist - fallback to regular python
this._learnWorker = spawn('python3', ['worker.py'], { cwd: ANALYTICS_PATH }) spawn('python3', ['server.py'], { cwd: ANALYTICS_PATH })
} }
console.log('ok');
this._learnWorker.stdout.pipe(
split()).pipe(mapSync(this._onPipeMessage.bind(this))
);
this._learnWorker.stderr.on('data', data => console.error(`worker stderr: ${data}`));
} }
private _onPipeMessage(data) { private _onAnalyticsMessage(data) {
console.log(`worker stdout: ${data}`); console.log(`analytics message: ${data}`);
let response = JSON.parse(data); let response = JSON.parse(data);
this._onResponse(response); this._onResponse(response);
} }
public async sendMessage(task: any): Promise<void> {
let command = JSON.stringify(task);
this._learnWorker.stdin.write(`${command}\n`);
}
} }

Loading…
Cancel
Save