From b053e336f3fa995ae9116592cddfd04eed957e8a Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Mon, 22 Apr 2019 22:48:17 +0300 Subject: [PATCH] Check python version in run time #592 (#595) --- analytics/analytics/server.py | 94 +++++++++++++++++++++++++++++++++ analytics/bin/server | 99 ++++------------------------------- 2 files changed, 105 insertions(+), 88 deletions(-) create mode 100644 analytics/analytics/server.py diff --git a/analytics/analytics/server.py b/analytics/analytics/server.py new file mode 100644 index 0000000..3ab76b4 --- /dev/null +++ b/analytics/analytics/server.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 + +import sys +import os + + +import config +import json +import logging +import asyncio +import traceback + +import services +from analytic_unit_manager import AnalyticUnitManager + + +server_service: services.ServerService = None +data_service: services.DataService = None +analytic_unit_manager: AnalyticUnitManager = None + +logger = logging.getLogger('SERVER') + + +async def handle_task(task: object): + try: + task_type = task['type'] + logger.info("Got {} task with id {}, analyticUnitId {}".format(task_type, task['_id'], task['analyticUnitId'])) + + task_result_payload = { + '_id': task['_id'], + 'task': task_type, + 'analyticUnitId': task['analyticUnitId'], + 'status': "IN_PROGRESS" + } + + if not task_type == 'PUSH': + message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) + await server_service.send_message_to_server(message) + + res = await analytic_unit_manager.handle_analytic_task(task) + res['_id'] = task['_id'] + + if not task_type == 'PUSH': + message = services.server_service.ServerMessage('TASK_RESULT', res) + await server_service.send_message_to_server(message) + + except Exception as e: + error_text = traceback.format_exc() + logger.error("handle_task Exception: '%s'" % error_text) + +async def handle_data(task: object): + res = await analytic_unit_manager.handle_analytic_task(task) + + if res['status'] == 'SUCCESS' and res['payload'] is not None: + res['_id'] = task['_id'] + message = services.server_service.ServerMessage('DETECT', res) + await server_service.send_message_to_server(message) + +async def handle_message(message: services.ServerMessage): + if message.method == 'TASK': + await handle_task(message.payload) + if message.method == 'DATA': + await handle_data(message.payload) + +def init_services(): + global server_service + global data_service + global analytic_unit_manager + + logger.info("Starting services...") + logger.info("Server...") + server_service = services.ServerService() + logger.info("Ok") + logger.info("Data service...") + data_service = services.DataService(server_service) + logger.info("Ok") + logger.info("Analytic unit manager...") + analytic_unit_manager = AnalyticUnitManager() + logger.info("Ok") + +async def app_loop(): + async for message in server_service: + asyncio.ensure_future(handle_message(message)) + + +def run_server(): + loop = asyncio.get_event_loop() + #loop.set_debug(True) + logger.info("Ok") + init_services() + print('Analytics process is running') # we need to print to stdout and flush + sys.stdout.flush() # because node.js expects it + + loop.run_until_complete(app_loop()) diff --git a/analytics/bin/server b/analytics/bin/server index a96b0c9..640e29a 100755 --- a/analytics/bin/server +++ b/analytics/bin/server @@ -3,107 +3,30 @@ import sys import os -#TODO: make wrapper script that set PYTHONPATH instead +if sys.version_info[:3] < (3, 6, 5) or sys.version_info[:2] >= (3, 7): + sys.stderr.write('Required python is >= 3.6.5 and < 3.7.0 \n') + sys.stderr.write('Your python version is: %d.%d.%d\n' % sys.version_info[:3]) + sys.exit(1) + +# #TODO: make wrapper script that set PYTHONPATH instead sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'analytics')) -import config -import json import logging -import asyncio -import traceback - -import services -from analytic_unit_manager import AnalyticUnitManager - - -root = logging.getLogger() -logger = logging.getLogger('SERVER') - -server_service: services.ServerService = None -data_service: services.DataService = None -analytic_unit_manager: AnalyticUnitManager = None - - -root.setLevel(logging.DEBUG) +root_logger = logging.getLogger() +root_logger.setLevel(logging.DEBUG) logging_formatter = logging.Formatter("%(asctime)s [Analytics] [%(levelname)-5.5s] %(message)s") logging_handler = logging.StreamHandler(sys.stdout) -#logging_handler = logging.FileHandler(config.DATA_FOLDER + '/analytics.log') logging_handler.setLevel(logging.DEBUG) logging_handler.setFormatter(logging_formatter) -root.addHandler(logging_handler) - - -async def handle_task(task: object): - try: - task_type = task['type'] - logger.info("Got {} task with id {}, analyticUnitId {}".format(task_type, task['_id'], task['analyticUnitId'])) - - task_result_payload = { - '_id': task['_id'], - 'task': task_type, - 'analyticUnitId': task['analyticUnitId'], - 'status': "IN_PROGRESS" - } +root_logger.addHandler(logging_handler) - if not task_type == 'PUSH': - message = services.server_service.ServerMessage('TASK_RESULT', task_result_payload) - await server_service.send_message_to_server(message) - - res = await analytic_unit_manager.handle_analytic_task(task) - res['_id'] = task['_id'] - - if not task_type == 'PUSH': - message = services.server_service.ServerMessage('TASK_RESULT', res) - await server_service.send_message_to_server(message) - - except Exception as e: - error_text = traceback.format_exc() - logger.error("handle_task Exception: '%s'" % error_text) - -async def handle_data(task: object): - res = await analytic_unit_manager.handle_analytic_task(task) - - if res['status'] == 'SUCCESS' and res['payload'] is not None: - res['_id'] = task['_id'] - message = services.server_service.ServerMessage('DETECT', res) - await server_service.send_message_to_server(message) - -async def handle_message(message: services.ServerMessage): - if message.method == 'TASK': - await handle_task(message.payload) - if message.method == 'DATA': - await handle_data(message.payload) - -def init_services(): - logger.info("Starting services...") - logger.info("Server...") - server_service = services.ServerService() - logger.info("Ok") - logger.info("Data service...") - data_service = services.DataService(server_service) - logger.info("Ok") - logger.info("Analytic unit manager...") - analytic_unit_manager = AnalyticUnitManager() - logger.info("Ok") - - return server_service, data_service, analytic_unit_manager - -async def app_loop(): - async for message in server_service: - asyncio.ensure_future(handle_message(message)) +from server import run_server if __name__ == "__main__": - loop = asyncio.get_event_loop() - #loop.set_debug(True) - logger.info("Ok") - server_service, data_service, analytic_unit_manager = init_services() - print('Analytics process is running') # we need to print to stdout and flush - sys.stdout.flush() # because node.js expects it - - loop.run_until_complete(app_loop()) + run_server()