diff --git a/analytics/server.py b/analytics/server.py index d058bd4..e14b271 100644 --- a/analytics/server.py +++ b/analytics/server.py @@ -1,18 +1,18 @@ import config import json import logging -import zmq -import zmq.asyncio import sys import asyncio +import services.server_service from analytic_unit_worker import AnalyticUnitWorker root = logging.getLogger() logger = logging.getLogger('SERVER') -socket = None + worker = None +server_service = None root.setLevel(logging.DEBUG) @@ -24,28 +24,13 @@ ch.setFormatter(formatter) root.addHandler(ch) -async def server_handle_loop(): - while True: - received_bytes = await socket.recv() - text = received_bytes.decode('utf-8') - - if text == 'ping': - asyncio.ensure_future(handle_ping()) - else: - asyncio.ensure_future(handle_task(text)) - -async def server_send_message(string): - await socket.send_string(string) - -async def handle_ping(): - await socket.send(b'pong') async def handle_task(text): try: task = json.loads(text) logger.info("Command is OK") - await server_send_message(json.dumps({ + await server_service.send_message(json.dumps({ '_taskId': task['_taskId'], 'task': task['type'], 'analyticUnitId': task['analyticUnitId'], @@ -54,19 +39,17 @@ async def handle_task(text): res = await worker.do_task(task) res['_taskId'] = task['_taskId'] - await server_send_message(json.dumps(res)) + await server_service.send_message(json.dumps(res)) except Exception as e: logger.error("Exception: '%s'" % str(e)) if __name__ == "__main__": loop = asyncio.get_event_loop() + logger.info("Starting worker...") worker = AnalyticUnitWorker() - logger.info("Worker was started") - - logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) - context = zmq.asyncio.Context() - socket = context.socket(zmq.PAIR) - socket.bind(config.ZEROMQ_CONNECTION_STRING) logger.info("Ok") - loop.run_until_complete(server_handle_loop()) + logger.info("Starting server...") + server_service = services.server_service.ServerService(handle_task) + logger.info("Ok") + loop.run_until_complete(server_service.handle_loop()) diff --git a/analytics/services/server_service.py b/analytics/services/server_service.py new file mode 100644 index 0000000..41c3543 --- /dev/null +++ b/analytics/services/server_service.py @@ -0,0 +1,42 @@ +import config + +import zmq +import zmq.asyncio +import logging + +import asyncio + +logger = logging.getLogger('SERVER_SERVICE') + + +class ServerService: + + def __init__(self, on_message_handler): + self.on_message_handler = on_message_handler + + logger.info("Binding to %s ..." % config.ZEROMQ_CONNECTION_STRING) + self.context = zmq.asyncio.Context() + self.socket = self.context.socket(zmq.PAIR) + self.socket.bind(config.ZEROMQ_CONNECTION_STRING) + + async def handle_loop(self): + while True: + received_bytes = await self.socket.recv() + text = received_bytes.decode('utf-8') + + if text == 'ping': + asyncio.ensure_future(self.__handle_ping()) + else: + asyncio.ensure_future(self.__handle_message(text)) + + async def send_message(self, string): + await self.socket.send_string(string) + + async def __handle_ping(self): + await self.socket.send(b'pong') + + async def __handle_message(self, text): + try: + asyncio.ensure_future(self.on_message_handler(text)) + except Exception as e: + logger.error("Exception: '%s'" % str(e))