From d8f14ffaf7af9a573f96cd0ea306af45c7c944dd Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sat, 6 Apr 2019 22:45:55 +0300 Subject: [PATCH] AsyncZmqThread and AsyncZmqActor better method names and docs (#522) * AsyncZmqThread and AsyncZmqActor better method names and docs * add comment about ZMQ_THREAD_ACTOR_ADDR * __zmq_socket_addr -> __zmq_socket_type in AsyncZmqThread * comment about MRO for AsyncZmqThread(threading.Thread, ABC) --- analytics/analytics/utils/concurrent.py | 92 +++++++++++++++---------- 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/analytics/analytics/utils/concurrent.py b/analytics/analytics/utils/concurrent.py index e003799..356c24e 100644 --- a/analytics/analytics/utils/concurrent.py +++ b/analytics/analytics/utils/concurrent.py @@ -5,35 +5,44 @@ import zmq.asyncio from abc import ABC, abstractmethod +# This const defines Thread <-> Actor zmq one-to-one connection +# We create a seperate zmq context, so zqm address 'inproc://xxx' doesn't matter +# It is default address and you may want to use AsyncZmqThread another way +ZMQ_THREAD_ACTOR_ADDR = 'inproc://xxx' + + +# Inherience order (threading.Thread, ABC) is essential. Otherwise it's a MRO error. class AsyncZmqThread(threading.Thread, ABC): """Class for wrapping zmq socket into a thread with it's own asyncio event loop """ - def __init__(self, zmq_context: zmq.asyncio.Context, zmq_socket_addr: str, socket_type = zmq.PAIR): + def __init__(self, + zmq_context: zmq.asyncio.Context, + zmq_socket_addr: str, + zmq_socket_type = zmq.PAIR + ): super(AsyncZmqThread, self).__init__() - self.zmq_socket_addr = zmq_socket_addr - self.socket_type = socket_type - self._zmq_context = zmq_context - + self._zmq_context = zmq_context # you can use it in child classes + self.__zmq_socket_addr = zmq_socket_addr + self.__zmq_socket_type = zmq_socket_type self.__asyncio_loop = None self.__zmq_socket = None - async def __message_loop(self): + async def __message_recv_loop(self): while True: text = await self.__zmq_socket.recv_string() - asyncio.ensure_future(self._on_message(text)) + asyncio.ensure_future(self._on_message_to_thread(text)) - async def _send_message(self, message: str): + async def _send_message_from_thread(self, message: str): await self.__zmq_socket.send_string(message) @abstractmethod - async def _on_message(self, message: str): + async def _on_message_to_thread(self, message: str): """Override this method to receive messages""" - pass @abstractmethod - async def _run(self): + async def _run_thread(self): """Override this method to do some async work. This method uses a separate thread. @@ -42,22 +51,22 @@ class AsyncZmqThread(threading.Thread, ABC): Example: ``` - async def _run(self): + async def _run_thread(self): i = 0 while True: await asyncio.sleep(1) i += 1 - await self._send_message(f'{self.name}: ping {i}') + await self._send_message_from_thread(f'{self.name}: ping {i}') ``` """ def run(self): self.__asyncio_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.__asyncio_loop) - self.__zmq_socket = self._zmq_context.socket(self.socket_type) - self.__zmq_socket.connect(self.zmq_socket_addr) - asyncio.ensure_future(self.__message_loop()) - self.__asyncio_loop.run_until_complete(self._run()) + self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type) + self.__zmq_socket.connect(self.__zmq_socket_addr) + asyncio.ensure_future(self.__message_recv_loop()) + self.__asyncio_loop.run_until_complete(self._run_thread()) # TODO: implement stop signal handling @@ -67,42 +76,55 @@ class AsyncZmqActor(AsyncZmqThread): override following: ``` - async def _run(self) - async def _on_message(self, message: str) - ``` - + async def _run_thread(self) + async def _on_message_to_thread(self, message: str) + ``` + + both methods run in actor's thread + + you can call `self._send_message_from_thread('txt')` + + to receive it later in `self._recv_message_from_thread()`. + Example: ``` class MyActor(AsyncZmqActor): - async def _run(self): + async def _run_thread(self): self.counter = 0 # runs in a different thread - await self._send_message('some_txt_message_to_outer_world') + await self._send_message_from_thread('some_txt_message_to_actor') - def async _on_message(self, message): + def async _on_message_to_thread(self, message): # runs in Thread-actor self.counter++ - + asyncZmqActor = MyActor() asyncZmqActor.start() ``` """ def __init__(self): - # we have a seperate zmq context, so zqm address 'inproc://xxx' doesn't matter - super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), 'inproc://xxx') + super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR) self.__actor_socket = self._zmq_context.socket(zmq.PAIR) - self.__actor_socket.bind(self.zmq_socket_addr) + self.__actor_socket.bind(ZMQ_THREAD_ACTOR_ADDR) + + async def _put_message_to_thread(self, message: str): + """It "sends" `message` to thread, + + but we can't await it's `AsyncZmqThread._on_message_to_thread()` - async def put_message(self, message: str): + so it's "put", not "send" + """ await self.__actor_socket.send_string(message) - - def __aiter__(self): - return self - - async def __anext__(self) -> str: + + async def _recv_message_from_thread(self) -> str: + """Returns next message ``'txt'`` from thread sent by + + ``AsyncZmqActor._send_message_from_thread('txt')`` + + """ return await self.__actor_socket.recv_string() - + # TODO: implement graceful stopping