Browse Source

Zmq basic fs #97 (#104)

* async fs context manager

* ServerMessage class and using it

* handle resps

* requests to node & file saving ++

* fx FILE_GET -> FILE_LOAD

* ds usage

* basic files saving
pull/1/head
Alexey Velikiy 6 years ago committed by rozetko
parent
commit
753b7a0589
  1. 10
      analytics/analytic_unit_worker.py
  2. 37
      analytics/server.py
  3. 2
      analytics/services/__init__.py
  4. 88
      analytics/services/data_service.py
  5. 65
      analytics/services/server_service.py
  6. 1
      server/package.json
  7. 1
      server/src/config.ts
  8. 65
      server/src/controllers/analytics_controller.ts
  9. 2
      server/src/index.ts
  10. 45
      server/src/services/analytics_service.ts
  11. 50
      server/src/services/data_service.ts

10
analytics/analytic_unit_worker.py

@ -34,7 +34,7 @@ class AnalyticUnitWorker(object):
except Exception as e: except Exception as e:
#traceback.extract_stack() #traceback.extract_stack()
error_text = traceback.format_exc() error_text = traceback.format_exc()
logger.error("Exception: '%s'" % error_text) logger.error("do_task Exception: '%s'" % error_text)
# TODO: move result to a class which renders to json for messaging to analytics # TODO: move result to a class which renders to json for messaging to analytics
result = { result = {
'task': type, 'task': type,
@ -53,7 +53,7 @@ class AnalyticUnitWorker(object):
if pattern == 'drop' and len(segments) == 0: if pattern == 'drop' and len(segments) == 0:
# TODO: move result to a class which renders to json for messaging to analytics # TODO: move result to a class which renders to json for messaging to analytics
result = { result = {
'status': 'success', 'status': 'SUCCESS',
'analyticUnitId': analytic_unit_id, 'analyticUnitId': analytic_unit_id,
'segments': [], 'segments': [],
'lastPredictionTime': last_prediction_time 'lastPredictionTime': last_prediction_time
@ -69,8 +69,8 @@ class AnalyticUnitWorker(object):
model.synchronize_data() model.synchronize_data()
segments, last_prediction_time = await model.predict(last_prediction_time) segments, last_prediction_time = await model.predict(last_prediction_time)
return { return {
'task': "predict", 'task': 'PREDICT',
'status': "success", 'status': 'SUCCESS',
'analyticUnitId': analytic_unit_id, 'analyticUnitId': analytic_unit_id,
'segments': segments, 'segments': segments,
'lastPredictionTime': last_prediction_time 'lastPredictionTime': last_prediction_time
@ -78,7 +78,7 @@ class AnalyticUnitWorker(object):
def get_model(self, analytic_unit_id, pattern_type): def get_model(self, analytic_unit_id, pattern_type):
if analytic_unit_id not in self.models_cache: if analytic_unit_id not in self.models_cache:
if pattern_type == 'general': if pattern_type == 'GENERAL':
model = detectors.GeneralDetector(analytic_unit_id) model = detectors.GeneralDetector(analytic_unit_id)
else: else:
model = detectors.PatternDetector(analytic_unit_id, pattern_type) model = detectors.PatternDetector(analytic_unit_id, pattern_type)

37
analytics/server.py

@ -11,9 +11,9 @@ from analytic_unit_worker import AnalyticUnitWorker
root = logging.getLogger() root = logging.getLogger()
logger = logging.getLogger('SERVER') logger = logging.getLogger('SERVER')
worker = None worker: AnalyticUnitWorker = None
server_service = None server_service: services.ServerService = None
data_service = None data_service: services.DataService = None
root.setLevel(logging.DEBUG) root.setLevel(logging.DEBUG)
@ -28,29 +28,40 @@ logging_handler.setFormatter(logging_formatter)
root.addHandler(logging_handler) root.addHandler(logging_handler)
async def handle_task(text): async def handle_task(payload: str):
try: try:
task = json.loads(text) task = json.loads(payload)
logger.info("Command is OK") logger.info("Command is OK")
await server_service.send_message(json.dumps({ response_task_payload = json.dumps({
'_taskId': task['_taskId'], '_taskId': task['_taskId'],
'task': task['type'], 'task': task['type'],
'analyticUnitId': task['analyticUnitId'], 'analyticUnitId': task['analyticUnitId'],
'status': "in progress" 'status': "in progress"
})) })
message = services.server_service.ServerMessage('TASK_RESULT', response_task_payload)
await server_service.send_message(message)
res = await worker.do_task(task) res = await worker.do_task(task)
res['_taskId'] = task['_taskId'] res['_taskId'] = task['_taskId']
await server_service.send_message(json.dumps(res)) await server_service.send_message(json.dumps(res))
except Exception as e: except Exception as e:
logger.error("Exception: '%s'" % str(e)) logger.error("handle_task Exception: '%s'" % str(e))
async def handle_message(message: services.ServerMessage):
payload = None
if message.payload is not None:
payload = json.loads(message.payload)
if message.method == 'task':
await handle_task(payload)
def init_services(): def init_services():
logger.info("Starting services...") logger.info("Starting services...")
logger.info("Server...") logger.info("Server...")
server_service = services.ServerService(handle_task) server_service = services.ServerService(handle_message)
logger.info("Ok") logger.info("Ok")
logger.info("Data service...") logger.info("Data service...")
data_service = services.DataService(server_service) data_service = services.DataService(server_service)
@ -58,6 +69,11 @@ def init_services():
return server_service, data_service return server_service, data_service
async def app_loop():
server_service.handle_loop()
# await asyncio.gather(server_service.handle_loop(), test_file_save())
if __name__ == "__main__": if __name__ == "__main__":
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
logger.info("Starting worker...") logger.info("Starting worker...")
@ -66,4 +82,5 @@ if __name__ == "__main__":
server_service, data_service = init_services() server_service, data_service = init_services()
print('Analytics process is running') # we need to print to stdout and flush print('Analytics process is running') # we need to print to stdout and flush
sys.stdout.flush() # because node.js expects it sys.stdout.flush() # because node.js expects it
loop.run_until_complete(server_service.handle_loop())
loop.run_until_complete(app_loop())

2
analytics/services/__init__.py

@ -1,2 +1,2 @@
from services.server_service import ServerService from services.server_service import ServerService, ServerMessage
from services.data_service import DataService from services.data_service import DataService

88
analytics/services/data_service.py

@ -1,9 +1,87 @@
from services.server_service import ServerMessage, ServerService
import json
import asyncio
"""
This is how you can save a file:
async def test_file_save():
async with data_service.open('filename') as f:
print('write content')
await f.write('test string')
async with data_service.open('filename') as f:
content = await f.load()
print(content)
print('test file ok')
"""
LOCK_WAIT_SLEEP_TIMESPAN = 100 # mc
class FileDescriptor:
def __init__(self, filename: str, data_service):
self.filename = filename
self.data_service = data_service
async def write(self, content: str):
await self.data_service.save_file_content(self, content)
async def load(self) -> str:
return await self.data_service.load_file_content(self)
async def __aenter__(self):
await self.data_service.wait_and_lock(self)
return self
async def __aexit__(self, *exc):
await self.data_service.unlock(self)
class DataService: class DataService:
def __init__(self, server_service):
def __init__(self, server_service: ServerService):
"""Creates fs over network via server_service"""
self.server_service = server_service self.server_service = server_service
self.locks = set()
def open(self, filename: str) -> FileDescriptor:
return FileDescriptor(filename, self)
async def wait_and_lock(self, file_descriptor: FileDescriptor):
filename = file_descriptor.filename
while True:
if filename in self.locks:
asyncio.sleep(LOCK_WAIT_SLEEP_TIMESPAN)
continue
else:
self.locks.add(filename)
break
async def unlock(self, file_descriptor: FileDescriptor):
filename = file_descriptor.filename
self.locks.remove(filename)
async def save_file_content(self, file_descriptor: FileDescriptor, content: str):
""" Saves json - serializable obj with file_descriptor.filename """
self.__check_lock(file_descriptor)
message_payload = {
'filename': file_descriptor.filename,
'content': content
}
message = ServerMessage('FILE_SAVE', message_payload)
await self.server_service.send_request(message)
async def load_file_content(self, file_descriptor: FileDescriptor) -> str:
self.__check_lock(file_descriptor)
message_payload = { 'filename': file_descriptor.filename }
message = ServerMessage('FILE_LOAD', message_payload)
return await self.server_service.send_request(message)
def __check_lock(self, file_descriptor: FileDescriptor):
filename = file_descriptor.filename
if filename not in self.locks:
raise RuntimeError('No lock for file %s' % filename)
async def safe_file(filename, content):
pass
async def load_file(filename, content):
pass

65
analytics/services/server_service.py

@ -2,13 +2,41 @@ import config
import zmq import zmq
import zmq.asyncio import zmq.asyncio
import logging
import logging
import json
import asyncio import asyncio
logger = logging.getLogger('SERVER_SERVICE') logger = logging.getLogger('SERVER_SERVICE')
class ServerMessage:
def __init__(self, method: str, payload: object = None, request_id: int = None):
self.method = method
self.payload = payload
self.request_id = request_id
def toJSON(self):
result = {
'method': self.method
}
if self.payload is not None:
result['payload'] = self.payload
if self.request_id is not None:
result['requestId'] = self.request_id
return result
def fromJSON(json: dict):
method = json['method']
payload = None
request_id = None
if 'payload' in json:
payload = json['payload']
if 'requestId' in json:
request_id = json['requestId']
return ServerMessage(method, payload, request_id)
class ServerService: class ServerService:
def __init__(self, on_message_handler): def __init__(self, on_message_handler):
@ -18,25 +46,46 @@ class ServerService:
self.context = zmq.asyncio.Context() self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.PAIR) self.socket = self.context.socket(zmq.PAIR)
self.socket.bind(config.ZMQ_CONNECTION_STRING) self.socket.bind(config.ZMQ_CONNECTION_STRING)
self.request_next_id = 1
self.responses = dict()
async def handle_loop(self): async def handle_loop(self):
while True: while True:
received_bytes = await self.socket.recv() received_bytes = await self.socket.recv()
text = received_bytes.decode('utf-8') text = received_bytes.decode('utf-8')
if text == 'ping': if text == 'PING':
asyncio.ensure_future(self.__handle_ping()) asyncio.ensure_future(self.__handle_ping())
else: else:
asyncio.ensure_future(self.__handle_message(text)) asyncio.ensure_future(self.__handle_message(text))
async def send_message(self, string): async def send_message(self, message: ServerMessage):
await self.socket.send_string(string) await self.socket.send_string(json.dumps(message.toJSON()))
async def send_request(self, message: ServerMessage) -> object:
if message.request_id is not None:
raise ValueError('Message can`t have request_id before it is scheduled')
request_id = message.request_id = self.request_next_id
self.request_next_id = self.request_next_id + 1
asyncio.ensure_future(self.send_message(message))
while request_id not in self.responses:
await asyncio.sleep(1)
response = self.responses[request_id]
del self.responses[request_id]
return response
async def __handle_ping(self): async def __handle_ping(self):
await self.socket.send(b'pong') await self.socket.send(b'PONG')
async def __handle_message(self, text): async def __handle_message(self, text: str):
try: try:
asyncio.ensure_future(self.on_message_handler(text)) message_object = json.loads(text)
message = ServerMessage.fromJSON(message_object)
if message.request_id is not None:
self.responses[message_object['requestId']] = message.payload
return
asyncio.ensure_future(self.on_message_handler(message))
except Exception as e: except Exception as e:
logger.error("Exception: '%s'" % str(e)) logger.error("__handle_message Exception: '%s'" % str(e))

1
server/package.json

@ -20,6 +20,7 @@
"homepage": "https://github.com/hastic/hastic-server#readme", "homepage": "https://github.com/hastic/hastic-server#readme",
"dependencies": {}, "dependencies": {},
"devDependencies": { "devDependencies": {
"@types/es6-promisify": "^6.0.0",
"@types/koa": "^2.0.45", "@types/koa": "^2.0.45",
"@types/koa-bodyparser": "^4.2.0", "@types/koa-bodyparser": "^4.2.0",
"@types/koa-router": "^7.0.28", "@types/koa-router": "^7.0.28",

1
server/src/config.ts

@ -13,6 +13,7 @@ export const DATA_PATH = path.join(__dirname, '../../data');
export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analyticUnits.db'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analyticUnits.db');
export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db'); export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db');
export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db');
export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db');
export const DATASETS_PATH = path.join(DATA_PATH, 'datasets'); export const DATASETS_PATH = path.join(DATA_PATH, 'datasets');
export const ANALYTIC_UNITS_PATH = path.join(DATA_PATH, 'analytic_units'); export const ANALYTIC_UNITS_PATH = path.join(DATA_PATH, 'analytic_units');

65
server/src/controllers/analytics_controller.ts

@ -1,7 +1,8 @@
import * as DataService from '../services/data_service';
import { getTarget } from './metrics_controler'; import { getTarget } from './metrics_controler';
import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller' import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller';
import * as AnalyticUnit from '../models/analytic_unit' import * as AnalyticUnit from '../models/analytic_unit'
import { AnalyticsService } from '../services/analytics_service' import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service';
const taskMap = {}; const taskMap = {};
@ -9,20 +10,58 @@ let nextTaskId = 0;
let analyticsService = undefined; let analyticsService = undefined;
function onResponse(response: any) {
let taskId = response._taskId; function onTaskResult(taskResult: any) {
let status = response.status; let taskId = taskResult._taskId;
if(status === 'success' || status === 'failed') { let status = taskResult.status;
if(status === 'SUCCESS' || status === 'FAILED') {
if(taskId in taskMap) { if(taskId in taskMap) {
let resolver = taskMap[taskId]; let resolver = taskMap[taskId];
resolver(response); resolver(taskResult);
delete taskMap[taskId]; delete taskMap[taskId];
} }
} }
} }
async function onFileSave(payload: any): Promise<any> {
return DataService.saveFile(payload.filename, payload.content);
}
async function onFileLoad(payload: any): Promise<any> {
return DataService.loadFile(payload.filename);
}
async function onMessage(message: AnalyticsMessage) {
let responsePayload = null;
let resolvedMethod = false;
if(message.method === 'TASK_RESULT') {
onTaskResult(JSON.parse(message.payload));
resolvedMethod = true;
}
if(message.method === 'FILE_SAVE') {
responsePayload = await onFileSave(message.payload);
resolvedMethod = true;
}
if(message.method === 'FILE_LOAD') {
responsePayload = await onFileLoad(message.payload);
resolvedMethod = true;
}
if(!resolvedMethod) {
throw new TypeError('Unknown method ' + message.method);
}
// TODO: catch exception and send error in this case
if(message.requestId !== undefined) {
message.payload = responsePayload;
analyticsService.sendMessage(message);
}
}
export function init() { export function init() {
analyticsService = new AnalyticsService(onResponse); analyticsService = new AnalyticsService(onMessage);
} }
export function terminate() { export function terminate() {
@ -51,19 +90,19 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) {
let pattern = unit.type; let pattern = unit.type;
let task = { let task = {
analyticUnitId: id, analyticUnitId: id,
type: 'learn', type: 'LEARN',
pattern, pattern,
segments: segments segments: segments
}; };
let result = await runTask(task); let result = await runTask(task);
if (result.status === 'success') { if (result.status === 'SUCCESS') {
AnalyticUnit.setStatus(id, 'ready'); AnalyticUnit.setStatus(id, 'READY');
insertSegments(id, result.segments, false); insertSegments(id, result.segments, false);
AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); AnalyticUnit.setPredictionTime(id, result.lastPredictionTime);
} else { } else {
AnalyticUnit.setStatus(id, 'failed', result.error); AnalyticUnit.setStatus(id, 'FAILED', result.error);
} }
} }
@ -79,7 +118,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) {
}; };
let result = await runTask(task); let result = await runTask(task);
if(result.status === 'failed') { if(result.status === 'FAILED') {
return []; return [];
} }
// Merging segments // Merging segments

2
server/src/index.ts

@ -36,7 +36,7 @@ rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMetho
rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods()); rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods());
rootRouter.get('/', async (ctx) => { rootRouter.get('/', async (ctx) => {
ctx.response.body = { ctx.response.body = {
server: 'Ok', server: 'OK',
analyticsReady: AnalyticsController.isAnalyticReady(), analyticsReady: AnalyticsController.isAnalyticReady(),
version: process.env.npm_package_version version: process.env.npm_package_version
}; };

45
server/src/services/analytics_service.ts

@ -7,6 +7,23 @@ import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
export class AnalyticsMessage {
public constructor(public method: string, public payload?: string, public requestId?: number) {
}
static fromJSON(obj: any): AnalyticsMessage {
if(obj.method === undefined) {
throw new Error('No method in obj:' + obj);
}
return new AnalyticsMessage(obj.method, obj.payload, obj.requestId);
}
}
function analyticsMessageFromJson(obj: any): AnalyticsMessage {
return new AnalyticsMessage(obj);
}
export class AnalyticsService { export class AnalyticsService {
private _requester: any; private _requester: any;
@ -17,8 +34,7 @@ export class AnalyticsService {
private _analyticsPinger: NodeJS.Timer = null; private _analyticsPinger: NodeJS.Timer = null;
private _isClosed = false; private _isClosed = false;
constructor(private _onMessage: (message: AnalyticsMessage) => void) {
constructor(private _onResponse: (response: any) => void) {
this._init(); this._init();
} }
@ -26,13 +42,20 @@ export class AnalyticsService {
if(!this._ready) { if(!this._ready) {
return Promise.reject("Analytics is not ready"); return Promise.reject("Analytics is not ready");
} }
let message = JSON.stringify(msgObj); let message = {
method: 'task',
payload: JSON.stringify(msgObj)
}
return this.sendMessage(message); return this.sendMessage(message);
} }
public async sendMessage(message: string): Promise<void> { public async sendMessage(message: AnalyticsMessage): Promise<void> {
let strMessage = JSON.stringify(message);
if(message.method === 'PING') {
strMessage = 'PING';
}
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this._requester.send(message, undefined, (err) => { this._requester.send(strMessage, undefined, (err) => {
if(err) { if(err) {
reject(err); reject(err);
} else { } else {
@ -160,8 +183,8 @@ export class AnalyticsService {
} }
} }
private _onAnalyticsMessage(text: any, error) { private _onAnalyticsMessage(data: any, error) {
if(text.toString() === 'pong') { if(data.toString() === 'PONG') {
this._pingResponded = true; this._pingResponded = true;
if(!this._ready) { if(!this._ready) {
this._ready = true; this._ready = true;
@ -169,7 +192,9 @@ export class AnalyticsService {
} }
return; return;
} }
console.log(`analytics message: "${text}"`);
let text = data.toString();
let response; let response;
try { try {
response = JSON.parse(text); response = JSON.parse(text);
@ -178,7 +203,7 @@ export class AnalyticsService {
console.error(text); console.error(text);
throw new Error('Unexpected response'); throw new Error('Unexpected response');
} }
this._onResponse(response); this._onMessage(AnalyticsMessage.fromJSON(response));
} }
private async _runAlalyticsPinger() { private async _runAlalyticsPinger() {
@ -192,7 +217,7 @@ export class AnalyticsService {
} }
this._pingResponded = false; this._pingResponded = false;
// TODO: set life limit for this ping // TODO: set life limit for this ping
this.sendMessage('ping'); this.sendMessage({ method: 'PING' });
}, config.ANLYTICS_PING_INTERVAL); }, config.ANLYTICS_PING_INTERVAL);
} }

50
server/src/services/data_service.ts

@ -1,4 +1,4 @@
import * as config from '../config' import * as config from '../config';
import * as nedb from 'nedb'; import * as nedb from 'nedb';
import * as fs from 'fs'; import * as fs from 'fs';
@ -6,10 +6,42 @@ import * as fs from 'fs';
export const db = { export const db = {
analyticUnits: new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }), analyticUnits: new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }),
metrics: new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }), metrics: new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }),
segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }) segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }),
files: new nedb({ filename: config.FILES_DATABASE_PATH, autoload: true })
}; };
// see analytics/pattern_detection_model.py with folders available
let dbUpsertFile = (query: any, updateQuery: any) => {
return new Promise<void>((resolve, reject) => {
db.files.update(query, updateQuery, { upsert: true }, (err: Error) => {
if(err) {
reject(err);
} else {
console.log('saved shit with query ');
console.log(query);
console.log('saved shit with updateQuery ');
console.log(updateQuery);
resolve();
}
});
});
}
let dbLoadFile = (query: any) => {
return new Promise<any>((resolve, reject) => {
db.files.findOne(query, (err, doc) => {
if(err) {
reject(err);
} else {
console.log('got shit with query');
console.log(query);
console.log('doc');
console.log(doc);
resolve(doc);
}
});
});
}
function maybeCreate(path: string): void { function maybeCreate(path: string): void {
if(fs.existsSync(path)) { if(fs.existsSync(path)) {
@ -20,6 +52,18 @@ function maybeCreate(path: string): void {
console.log('exists: ' + fs.existsSync(path)); console.log('exists: ' + fs.existsSync(path));
} }
export async function saveFile(filename: string, content: string): Promise<void> {
return dbUpsertFile({ filename } , { filename, content });
}
export async function loadFile(filename: string): Promise<string> {
let doc = await dbLoadFile({ filename });
if(doc === null) {
return null;
}
return doc.content;
}
export function checkDataFolders(): void { export function checkDataFolders(): void {
[ [
config.DATA_PATH, config.DATA_PATH,

Loading…
Cancel
Save