From 209ffe8edce644f2df23ea78386542827905a32b Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Sat, 6 Apr 2019 13:49:58 +0300 Subject: [PATCH] add AsyncZmqThread and AsyncZmqActor classes (#515) * add AsyncZmqThread and AsyncZmqActor classes * Update analytics/analytics/utils/concurrent.py * Update analytics/analytics/utils/concurrent.py Co-Authored-By: jonyrock * Update analytics/analytics/utils/concurrent.py * abstract class + better comments * rm test AsyncZmqActor creation --- analytics/analytics/utils/concurrent.py | 108 ++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 analytics/analytics/utils/concurrent.py diff --git a/analytics/analytics/utils/concurrent.py b/analytics/analytics/utils/concurrent.py new file mode 100644 index 0000000..e003799 --- /dev/null +++ b/analytics/analytics/utils/concurrent.py @@ -0,0 +1,108 @@ +import asyncio +import threading +import zmq +import zmq.asyncio +from abc import ABC, abstractmethod + + +class AsyncZmqThread(threading.Thread, ABC): + """Class for wrapping zmq socket into a thread with it's own asyncio event loop + + """ + + def __init__(self, zmq_context: zmq.asyncio.Context, zmq_socket_addr: str, socket_type = zmq.PAIR): + super(AsyncZmqThread, self).__init__() + self.zmq_socket_addr = zmq_socket_addr + self.socket_type = socket_type + self._zmq_context = zmq_context + + self.__asyncio_loop = None + self.__zmq_socket = None + + async def __message_loop(self): + while True: + text = await self.__zmq_socket.recv_string() + asyncio.ensure_future(self._on_message(text)) + + async def _send_message(self, message: str): + await self.__zmq_socket.send_string(message) + + @abstractmethod + async def _on_message(self, message: str): + """Override this method to receive messages""" + pass + + @abstractmethod + async def _run(self): + """Override this method to do some async work. + This method uses a separate thread. + + You can block yourself here if you don't do any await. + + Example: + + ``` + async def _run(self): + i = 0 + while True: + await asyncio.sleep(1) + i += 1 + await self._send_message(f'{self.name}: ping {i}') + ``` + """ + + def run(self): + self.__asyncio_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.__asyncio_loop) + self.__zmq_socket = self._zmq_context.socket(self.socket_type) + self.__zmq_socket.connect(self.zmq_socket_addr) + asyncio.ensure_future(self.__message_loop()) + self.__asyncio_loop.run_until_complete(self._run()) + + # TODO: implement stop signal handling + + +class AsyncZmqActor(AsyncZmqThread): + """Threaded and Async Actor model based on ZMQ inproc communication + + override following: + ``` + async def _run(self) + async def _on_message(self, message: str) + ``` + + Example: + + ``` + class MyActor(AsyncZmqActor): + async def _run(self): + self.counter = 0 + # runs in a different thread + await self._send_message('some_txt_message_to_outer_world') + + def async _on_message(self, message): + # runs in Thread-actor + self.counter++ + + asyncZmqActor = MyActor() + asyncZmqActor.start() + ``` + """ + + def __init__(self): + # we have a seperate zmq context, so zqm address 'inproc://xxx' doesn't matter + super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), 'inproc://xxx') + + self.__actor_socket = self._zmq_context.socket(zmq.PAIR) + self.__actor_socket.bind(self.zmq_socket_addr) + + async def put_message(self, message: str): + await self.__actor_socket.send_string(message) + + def __aiter__(self): + return self + + async def __anext__(self) -> str: + return await self.__actor_socket.recv_string() + + # TODO: implement graceful stopping