diff --git a/analytics/analytic_unit_worker.py b/analytics/analytic_unit_worker.py index 63c19d6..f3e1584 100644 --- a/analytics/analytic_unit_worker.py +++ b/analytics/analytic_unit_worker.py @@ -34,7 +34,7 @@ class AnalyticUnitWorker(object): except Exception as e: #traceback.extract_stack() error_text = traceback.format_exc() - logger.error("Exception: '%s'" % error_text) + logger.error("do_task Exception: '%s'" % error_text) # TODO: move result to a class which renders to json for messaging to analytics result = { 'task': type, @@ -53,7 +53,7 @@ class AnalyticUnitWorker(object): if pattern == 'drop' and len(segments) == 0: # TODO: move result to a class which renders to json for messaging to analytics result = { - 'status': 'success', + 'status': 'SUCCESS', 'analyticUnitId': analytic_unit_id, 'segments': [], 'lastPredictionTime': last_prediction_time @@ -69,8 +69,8 @@ class AnalyticUnitWorker(object): model.synchronize_data() segments, last_prediction_time = await model.predict(last_prediction_time) return { - 'task': "predict", - 'status': "success", + 'task': 'PREDICT', + 'status': 'SUCCESS', 'analyticUnitId': analytic_unit_id, 'segments': segments, 'lastPredictionTime': last_prediction_time @@ -78,7 +78,7 @@ class AnalyticUnitWorker(object): def get_model(self, analytic_unit_id, pattern_type): if analytic_unit_id not in self.models_cache: - if pattern_type == 'general': + if pattern_type == 'GENERAL': model = detectors.GeneralDetector(analytic_unit_id) else: model = detectors.PatternDetector(analytic_unit_id, pattern_type) diff --git a/analytics/server.py b/analytics/server.py index dcf0952..7818167 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -11,9 +11,9 @@ from analytic_unit_worker import AnalyticUnitWorker root = logging.getLogger() logger = logging.getLogger('SERVER') -worker = None -server_service = None -data_service = None +worker: AnalyticUnitWorker = None +server_service: services.ServerService = None +data_service: services.DataService = None root.setLevel(logging.DEBUG) @@ -28,29 +28,40 @@ logging_handler.setFormatter(logging_formatter) root.addHandler(logging_handler) -async def handle_task(text): +async def handle_task(payload: str): try: - task = json.loads(text) + task = json.loads(payload) logger.info("Command is OK") - await server_service.send_message(json.dumps({ + response_task_payload = json.dumps({ '_taskId': task['_taskId'], 'task': task['type'], 'analyticUnitId': task['analyticUnitId'], 'status': "in progress" - })) + }) + + message = services.server_service.ServerMessage('TASK_RESULT', response_task_payload) + + await server_service.send_message(message) res = await worker.do_task(task) res['_taskId'] = task['_taskId'] await server_service.send_message(json.dumps(res)) except Exception as e: - logger.error("Exception: '%s'" % str(e)) + logger.error("handle_task Exception: '%s'" % str(e)) + +async def handle_message(message: services.ServerMessage): + payload = None + if message.payload is not None: + payload = json.loads(message.payload) + if message.method == 'task': + await handle_task(payload) def init_services(): logger.info("Starting services...") logger.info("Server...") - server_service = services.ServerService(handle_task) + server_service = services.ServerService(handle_message) logger.info("Ok") logger.info("Data service...") data_service = services.DataService(server_service) @@ -58,6 +69,11 @@ def init_services(): return server_service, data_service +async def app_loop(): + server_service.handle_loop() + # await asyncio.gather(server_service.handle_loop(), test_file_save()) + + if __name__ == "__main__": loop = asyncio.get_event_loop() logger.info("Starting worker...") @@ -66,4 +82,5 @@ if __name__ == "__main__": server_service, data_service = init_services() print('Analytics process is running') # we need to print to stdout and flush sys.stdout.flush() # because node.js expects it - loop.run_until_complete(server_service.handle_loop()) + + loop.run_until_complete(app_loop()) diff --git a/analytics/services/__init__.py b/analytics/services/__init__.py index d7fb414..8f5f5a4 100644 --- a/analytics/services/__init__.py +++ b/analytics/services/__init__.py @@ -1,2 +1,2 @@ -from services.server_service import ServerService +from services.server_service import ServerService, ServerMessage from services.data_service import DataService diff --git a/analytics/services/data_service.py b/analytics/services/data_service.py index a136b88..705e2a8 100644 --- a/analytics/services/data_service.py +++ b/analytics/services/data_service.py @@ -1,9 +1,87 @@ +from services.server_service import ServerMessage, ServerService + +import json +import asyncio + +""" +This is how you can save a file: + +async def test_file_save(): + async with data_service.open('filename') as f: + print('write content') + await f.write('test string') + + async with data_service.open('filename') as f: + content = await f.load() + print(content) + print('test file ok') +""" + + +LOCK_WAIT_SLEEP_TIMESPAN = 100 # mc + +class FileDescriptor: + def __init__(self, filename: str, data_service): + self.filename = filename + self.data_service = data_service + + async def write(self, content: str): + await self.data_service.save_file_content(self, content) + + async def load(self) -> str: + return await self.data_service.load_file_content(self) + + async def __aenter__(self): + await self.data_service.wait_and_lock(self) + return self + + async def __aexit__(self, *exc): + await self.data_service.unlock(self) + + class DataService: - def __init__(self, server_service): + + def __init__(self, server_service: ServerService): + """Creates fs over network via server_service""" self.server_service = server_service + self.locks = set() + + def open(self, filename: str) -> FileDescriptor: + return FileDescriptor(filename, self) + + async def wait_and_lock(self, file_descriptor: FileDescriptor): + filename = file_descriptor.filename + while True: + if filename in self.locks: + asyncio.sleep(LOCK_WAIT_SLEEP_TIMESPAN) + continue + else: + self.locks.add(filename) + break + + async def unlock(self, file_descriptor: FileDescriptor): + filename = file_descriptor.filename + self.locks.remove(filename) + + async def save_file_content(self, file_descriptor: FileDescriptor, content: str): + """ Saves json - serializable obj with file_descriptor.filename """ + self.__check_lock(file_descriptor) + message_payload = { + 'filename': file_descriptor.filename, + 'content': content + } + message = ServerMessage('FILE_SAVE', message_payload) + await self.server_service.send_request(message) + + async def load_file_content(self, file_descriptor: FileDescriptor) -> str: + self.__check_lock(file_descriptor) + message_payload = { 'filename': file_descriptor.filename } + message = ServerMessage('FILE_LOAD', message_payload) + return await self.server_service.send_request(message) + + def __check_lock(self, file_descriptor: FileDescriptor): + filename = file_descriptor.filename + if filename not in self.locks: + raise RuntimeError('No lock for file %s' % filename) - async def safe_file(filename, content): - pass - async def load_file(filename, content): - pass \ No newline at end of file diff --git a/analytics/services/server_service.py b/analytics/services/server_service.py index a43bada..82caa40 100644 --- a/analytics/services/server_service.py +++ b/analytics/services/server_service.py @@ -2,13 +2,41 @@ import config import zmq import zmq.asyncio -import logging +import logging +import json import asyncio logger = logging.getLogger('SERVER_SERVICE') +class ServerMessage: + def __init__(self, method: str, payload: object = None, request_id: int = None): + self.method = method + self.payload = payload + self.request_id = request_id + + def toJSON(self): + result = { + 'method': self.method + } + if self.payload is not None: + result['payload'] = self.payload + if self.request_id is not None: + result['requestId'] = self.request_id + return result + + def fromJSON(json: dict): + method = json['method'] + payload = None + request_id = None + if 'payload' in json: + payload = json['payload'] + if 'requestId' in json: + request_id = json['requestId'] + + return ServerMessage(method, payload, request_id) + class ServerService: def __init__(self, on_message_handler): @@ -18,25 +46,46 @@ class ServerService: self.context = zmq.asyncio.Context() self.socket = self.context.socket(zmq.PAIR) self.socket.bind(config.ZMQ_CONNECTION_STRING) + self.request_next_id = 1 + self.responses = dict() async def handle_loop(self): while True: received_bytes = await self.socket.recv() text = received_bytes.decode('utf-8') - if text == 'ping': + if text == 'PING': asyncio.ensure_future(self.__handle_ping()) else: asyncio.ensure_future(self.__handle_message(text)) - async def send_message(self, string): - await self.socket.send_string(string) + async def send_message(self, message: ServerMessage): + await self.socket.send_string(json.dumps(message.toJSON())) + + async def send_request(self, message: ServerMessage) -> object: + if message.request_id is not None: + raise ValueError('Message can`t have request_id before it is scheduled') + request_id = message.request_id = self.request_next_id + self.request_next_id = self.request_next_id + 1 + asyncio.ensure_future(self.send_message(message)) + while request_id not in self.responses: + await asyncio.sleep(1) + response = self.responses[request_id] + del self.responses[request_id] + return response async def __handle_ping(self): - await self.socket.send(b'pong') + await self.socket.send(b'PONG') - async def __handle_message(self, text): + async def __handle_message(self, text: str): try: - asyncio.ensure_future(self.on_message_handler(text)) + message_object = json.loads(text) + message = ServerMessage.fromJSON(message_object) + + if message.request_id is not None: + self.responses[message_object['requestId']] = message.payload + return + + asyncio.ensure_future(self.on_message_handler(message)) except Exception as e: - logger.error("Exception: '%s'" % str(e)) + logger.error("__handle_message Exception: '%s'" % str(e)) diff --git a/server/package.json b/server/package.json index dd92674..64d80f2 100644 --- a/server/package.json +++ b/server/package.json @@ -20,6 +20,7 @@ "homepage": "https://github.com/hastic/hastic-server#readme", "dependencies": {}, "devDependencies": { + "@types/es6-promisify": "^6.0.0", "@types/koa": "^2.0.45", "@types/koa-bodyparser": "^4.2.0", "@types/koa-router": "^7.0.28", diff --git a/server/src/config.ts b/server/src/config.ts index fc8f133..66930e4 100644 --- a/server/src/config.ts +++ b/server/src/config.ts @@ -13,6 +13,7 @@ export const DATA_PATH = path.join(__dirname, '../../data'); export const ANALYTIC_UNITS_DATABASE_PATH = path.join(DATA_PATH, 'analyticUnits.db'); export const METRICS_DATABASE_PATH = path.join(DATA_PATH, 'metrics.db'); export const SEGMENTS_DATABASE_PATH = path.join(DATA_PATH, 'segments.db'); +export const FILES_DATABASE_PATH = path.join(DATA_PATH, 'files.db'); export const DATASETS_PATH = path.join(DATA_PATH, 'datasets'); export const ANALYTIC_UNITS_PATH = path.join(DATA_PATH, 'analytic_units'); diff --git a/server/src/controllers/analytics_controller.ts b/server/src/controllers/analytics_controller.ts index aa3d77a..75ee87a 100644 --- a/server/src/controllers/analytics_controller.ts +++ b/server/src/controllers/analytics_controller.ts @@ -1,7 +1,8 @@ +import * as DataService from '../services/data_service'; import { getTarget } from './metrics_controler'; -import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller' +import { getLabeledSegments, insertSegments, removeSegments } from './segments_controller'; import * as AnalyticUnit from '../models/analytic_unit' -import { AnalyticsService } from '../services/analytics_service' +import { AnalyticsService, AnalyticsMessage } from '../services/analytics_service'; const taskMap = {}; @@ -9,20 +10,58 @@ let nextTaskId = 0; let analyticsService = undefined; -function onResponse(response: any) { - let taskId = response._taskId; - let status = response.status; - if(status === 'success' || status === 'failed') { + +function onTaskResult(taskResult: any) { + let taskId = taskResult._taskId; + let status = taskResult.status; + if(status === 'SUCCESS' || status === 'FAILED') { if(taskId in taskMap) { let resolver = taskMap[taskId]; - resolver(response); + resolver(taskResult); delete taskMap[taskId]; } } } +async function onFileSave(payload: any): Promise { + return DataService.saveFile(payload.filename, payload.content); +} + +async function onFileLoad(payload: any): Promise { + return DataService.loadFile(payload.filename); +} + +async function onMessage(message: AnalyticsMessage) { + let responsePayload = null; + let resolvedMethod = false; + + if(message.method === 'TASK_RESULT') { + onTaskResult(JSON.parse(message.payload)); + resolvedMethod = true; + } + + if(message.method === 'FILE_SAVE') { + responsePayload = await onFileSave(message.payload); + resolvedMethod = true; + } + if(message.method === 'FILE_LOAD') { + responsePayload = await onFileLoad(message.payload); + resolvedMethod = true; + } + + if(!resolvedMethod) { + throw new TypeError('Unknown method ' + message.method); + } + + // TODO: catch exception and send error in this case + if(message.requestId !== undefined) { + message.payload = responsePayload; + analyticsService.sendMessage(message); + } +} + export function init() { - analyticsService = new AnalyticsService(onResponse); + analyticsService = new AnalyticsService(onMessage); } export function terminate() { @@ -51,19 +90,19 @@ export async function runLearning(id: AnalyticUnit.AnalyticUnitId) { let pattern = unit.type; let task = { analyticUnitId: id, - type: 'learn', + type: 'LEARN', pattern, segments: segments }; let result = await runTask(task); - if (result.status === 'success') { - AnalyticUnit.setStatus(id, 'ready'); + if (result.status === 'SUCCESS') { + AnalyticUnit.setStatus(id, 'READY'); insertSegments(id, result.segments, false); AnalyticUnit.setPredictionTime(id, result.lastPredictionTime); } else { - AnalyticUnit.setStatus(id, 'failed', result.error); + AnalyticUnit.setStatus(id, 'FAILED', result.error); } } @@ -79,7 +118,7 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { }; let result = await runTask(task); - if(result.status === 'failed') { + if(result.status === 'FAILED') { return []; } // Merging segments @@ -101,4 +140,4 @@ export async function runPredict(id: AnalyticUnit.AnalyticUnitId) { export function isAnalyticReady(): boolean { return analyticsService.ready; -} \ No newline at end of file +} diff --git a/server/src/index.ts b/server/src/index.ts index 499d41f..a094837 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -35,8 +35,8 @@ rootRouter.use('/analyticUnits', anomaliesRouter.routes(), anomaliesRouter.allow rootRouter.use('/segments', segmentsRouter.routes(), segmentsRouter.allowedMethods()); rootRouter.use('/alerts', alertsRouter.routes(), alertsRouter.allowedMethods()); rootRouter.get('/', async (ctx) => { - ctx.response.body = { - server: 'Ok', + ctx.response.body = { + server: 'OK', analyticsReady: AnalyticsController.isAnalyticReady(), version: process.env.npm_package_version }; diff --git a/server/src/services/analytics_service.ts b/server/src/services/analytics_service.ts index 703146d..210dd58 100644 --- a/server/src/services/analytics_service.ts +++ b/server/src/services/analytics_service.ts @@ -7,6 +7,23 @@ import * as fs from 'fs'; import * as path from 'path'; +export class AnalyticsMessage { + public constructor(public method: string, public payload?: string, public requestId?: number) { + + } + + static fromJSON(obj: any): AnalyticsMessage { + if(obj.method === undefined) { + throw new Error('No method in obj:' + obj); + } + return new AnalyticsMessage(obj.method, obj.payload, obj.requestId); + } +} + +function analyticsMessageFromJson(obj: any): AnalyticsMessage { + return new AnalyticsMessage(obj); +} + export class AnalyticsService { private _requester: any; @@ -17,8 +34,7 @@ export class AnalyticsService { private _analyticsPinger: NodeJS.Timer = null; private _isClosed = false; - - constructor(private _onResponse: (response: any) => void) { + constructor(private _onMessage: (message: AnalyticsMessage) => void) { this._init(); } @@ -26,13 +42,20 @@ export class AnalyticsService { if(!this._ready) { return Promise.reject("Analytics is not ready"); } - let message = JSON.stringify(msgObj); + let message = { + method: 'task', + payload: JSON.stringify(msgObj) + } return this.sendMessage(message); } - public async sendMessage(message: string): Promise { + public async sendMessage(message: AnalyticsMessage): Promise { + let strMessage = JSON.stringify(message); + if(message.method === 'PING') { + strMessage = 'PING'; + } return new Promise((resolve, reject) => { - this._requester.send(message, undefined, (err) => { + this._requester.send(strMessage, undefined, (err) => { if(err) { reject(err); } else { @@ -160,8 +183,8 @@ export class AnalyticsService { } } - private _onAnalyticsMessage(text: any, error) { - if(text.toString() === 'pong') { + private _onAnalyticsMessage(data: any, error) { + if(data.toString() === 'PONG') { this._pingResponded = true; if(!this._ready) { this._ready = true; @@ -169,7 +192,9 @@ export class AnalyticsService { } return; } - console.log(`analytics message: "${text}"`); + + + let text = data.toString(); let response; try { response = JSON.parse(text); @@ -178,7 +203,7 @@ export class AnalyticsService { console.error(text); throw new Error('Unexpected response'); } - this._onResponse(response); + this._onMessage(AnalyticsMessage.fromJSON(response)); } private async _runAlalyticsPinger() { @@ -192,7 +217,7 @@ export class AnalyticsService { } this._pingResponded = false; // TODO: set life limit for this ping - this.sendMessage('ping'); + this.sendMessage({ method: 'PING' }); }, config.ANLYTICS_PING_INTERVAL); } diff --git a/server/src/services/data_service.ts b/server/src/services/data_service.ts index 24851b6..a6005ea 100644 --- a/server/src/services/data_service.ts +++ b/server/src/services/data_service.ts @@ -1,4 +1,4 @@ -import * as config from '../config' +import * as config from '../config'; import * as nedb from 'nedb'; import * as fs from 'fs'; @@ -6,10 +6,42 @@ import * as fs from 'fs'; export const db = { analyticUnits: new nedb({ filename: config.ANALYTIC_UNITS_DATABASE_PATH, autoload: true }), metrics: new nedb({ filename: config.METRICS_DATABASE_PATH, autoload: true }), - segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }) + segments: new nedb({ filename: config.SEGMENTS_DATABASE_PATH, autoload: true }), + files: new nedb({ filename: config.FILES_DATABASE_PATH, autoload: true }) }; -// see analytics/pattern_detection_model.py with folders available + +let dbUpsertFile = (query: any, updateQuery: any) => { + return new Promise((resolve, reject) => { + db.files.update(query, updateQuery, { upsert: true }, (err: Error) => { + if(err) { + reject(err); + } else { + console.log('saved shit with query '); + console.log(query); + console.log('saved shit with updateQuery '); + console.log(updateQuery); + resolve(); + } + }); + }); +} + +let dbLoadFile = (query: any) => { + return new Promise((resolve, reject) => { + db.files.findOne(query, (err, doc) => { + if(err) { + reject(err); + } else { + console.log('got shit with query'); + console.log(query); + console.log('doc'); + console.log(doc); + resolve(doc); + } + }); + }); +} function maybeCreate(path: string): void { if(fs.existsSync(path)) { @@ -20,6 +52,18 @@ function maybeCreate(path: string): void { console.log('exists: ' + fs.existsSync(path)); } +export async function saveFile(filename: string, content: string): Promise { + return dbUpsertFile({ filename } , { filename, content }); +} + +export async function loadFile(filename: string): Promise { + let doc = await dbLoadFile({ filename }); + if(doc === null) { + return null; + } + return doc.content; +} + export function checkDataFolders(): void { [ config.DATA_PATH,