From 70e243f326e6aebcbf1cf64881fff21b41a9042d Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sun, 31 Mar 2019 12:10:36 +0300 Subject: [PATCH] async iter server_service (#492) --- .../analytics/services/server_service.py | 30 ++++++++++++------- analytics/bin/server | 6 ++-- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/analytics/analytics/services/server_service.py b/analytics/analytics/services/server_service.py index 26b8655..8a957ba 100644 --- a/analytics/analytics/services/server_service.py +++ b/analytics/analytics/services/server_service.py @@ -7,6 +7,7 @@ import logging import json import asyncio import traceback +from typing import Optional logger = logging.getLogger('SERVER_SERVICE') @@ -40,17 +41,22 @@ class ServerMessage: class ServerService: - def __init__(self, on_message_handler): - self.on_message_handler = on_message_handler - + def __init__(self): logger.info("Binding to %s ..." % config.ZMQ_CONNECTION_STRING) self.context = zmq.asyncio.Context() self.socket = self.context.socket(zmq.PAIR) self.socket.bind(config.ZMQ_CONNECTION_STRING) self.request_next_id = 1 self.responses = dict() + self._aiter_inited = False + + def __aiter__(self): + if self._aiter_inited: + raise RuntimeError('Can`t iterate twice') + _aiter_inited = True + return self - async def handle_loop(self): + async def __anext__(self) -> ServerMessage: while True: received_bytes = await self.socket.recv() text = received_bytes.decode('utf-8') @@ -58,7 +64,11 @@ class ServerService: if text == 'PING': asyncio.ensure_future(self.__handle_ping()) else: - asyncio.ensure_future(self.__handle_message(text)) + message = self.__parse_message_or_save(text) + if message is None: + continue + else: + return message async def send_message(self, message: ServerMessage): await self.socket.send_string(json.dumps(message.toJSON())) @@ -78,16 +88,14 @@ class ServerService: async def __handle_ping(self): await self.socket.send(b'PONG') - async def __handle_message(self, text: str): + def __parse_message_or_save(self, text: str) -> Optional[ServerMessage]: try: message_object = json.loads(text) message = ServerMessage.fromJSON(message_object) - if message.request_id is not None: self.responses[message_object['requestId']] = message.payload - return - - asyncio.ensure_future(self.on_message_handler(message)) - except Exception as e: + return None + return message + except Exception: error_text = traceback.format_exc() logger.error("__handle_message Exception: '%s'" % error_text) diff --git a/analytics/bin/server b/analytics/bin/server index 33cb2e7..d84647f 100755 --- a/analytics/bin/server +++ b/analytics/bin/server @@ -74,7 +74,6 @@ async def handle_data(task: object): await server_service.send_message(message) async def handle_message(message: services.ServerMessage): - payload = None if message.method == 'TASK': await handle_task(message.payload) if message.method == 'DATA': @@ -83,7 +82,7 @@ async def handle_message(message: services.ServerMessage): def init_services(): logger.info("Starting services...") logger.info("Server...") - server_service = services.ServerService(handle_message) + server_service = services.ServerService() logger.info("Ok") logger.info("Data service...") data_service = services.DataService(server_service) @@ -95,7 +94,8 @@ def init_services(): return server_service, data_service, analytic_unit_manager async def app_loop(): - await server_service.handle_loop() + async for message in server_service: + asyncio.ensure_future(handle_message(message)) if __name__ == "__main__":