Browse Source

AsyncZmqThread and AsyncZmqActor better method names and docs (#522)

* AsyncZmqThread and AsyncZmqActor better method names and docs

* add comment about ZMQ_THREAD_ACTOR_ADDR

* __zmq_socket_addr -> __zmq_socket_type in AsyncZmqThread

* comment about MRO for AsyncZmqThread(threading.Thread, ABC)
pull/1/head
Alexey Velikiy 5 years ago committed by GitHub
parent
commit
d8f14ffaf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 92
      analytics/analytics/utils/concurrent.py

92
analytics/analytics/utils/concurrent.py

@ -5,35 +5,44 @@ import zmq.asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
# This const defines Thread <-> Actor zmq one-to-one connection
# We create a seperate zmq context, so zqm address 'inproc://xxx' doesn't matter
# It is default address and you may want to use AsyncZmqThread another way
ZMQ_THREAD_ACTOR_ADDR = 'inproc://xxx'
# Inherience order (threading.Thread, ABC) is essential. Otherwise it's a MRO error.
class AsyncZmqThread(threading.Thread, ABC): class AsyncZmqThread(threading.Thread, ABC):
"""Class for wrapping zmq socket into a thread with it's own asyncio event loop """Class for wrapping zmq socket into a thread with it's own asyncio event loop
""" """
def __init__(self, zmq_context: zmq.asyncio.Context, zmq_socket_addr: str, socket_type = zmq.PAIR): def __init__(self,
zmq_context: zmq.asyncio.Context,
zmq_socket_addr: str,
zmq_socket_type = zmq.PAIR
):
super(AsyncZmqThread, self).__init__() super(AsyncZmqThread, self).__init__()
self.zmq_socket_addr = zmq_socket_addr self._zmq_context = zmq_context # you can use it in child classes
self.socket_type = socket_type self.__zmq_socket_addr = zmq_socket_addr
self._zmq_context = zmq_context self.__zmq_socket_type = zmq_socket_type
self.__asyncio_loop = None self.__asyncio_loop = None
self.__zmq_socket = None self.__zmq_socket = None
async def __message_loop(self): async def __message_recv_loop(self):
while True: while True:
text = await self.__zmq_socket.recv_string() text = await self.__zmq_socket.recv_string()
asyncio.ensure_future(self._on_message(text)) asyncio.ensure_future(self._on_message_to_thread(text))
async def _send_message(self, message: str): async def _send_message_from_thread(self, message: str):
await self.__zmq_socket.send_string(message) await self.__zmq_socket.send_string(message)
@abstractmethod @abstractmethod
async def _on_message(self, message: str): async def _on_message_to_thread(self, message: str):
"""Override this method to receive messages""" """Override this method to receive messages"""
pass
@abstractmethod @abstractmethod
async def _run(self): async def _run_thread(self):
"""Override this method to do some async work. """Override this method to do some async work.
This method uses a separate thread. This method uses a separate thread.
@ -42,22 +51,22 @@ class AsyncZmqThread(threading.Thread, ABC):
Example: Example:
``` ```
async def _run(self): async def _run_thread(self):
i = 0 i = 0
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
i += 1 i += 1
await self._send_message(f'{self.name}: ping {i}') await self._send_message_from_thread(f'{self.name}: ping {i}')
``` ```
""" """
def run(self): def run(self):
self.__asyncio_loop = asyncio.new_event_loop() self.__asyncio_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.__asyncio_loop) asyncio.set_event_loop(self.__asyncio_loop)
self.__zmq_socket = self._zmq_context.socket(self.socket_type) self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type)
self.__zmq_socket.connect(self.zmq_socket_addr) self.__zmq_socket.connect(self.__zmq_socket_addr)
asyncio.ensure_future(self.__message_loop()) asyncio.ensure_future(self.__message_recv_loop())
self.__asyncio_loop.run_until_complete(self._run()) self.__asyncio_loop.run_until_complete(self._run_thread())
# TODO: implement stop signal handling # TODO: implement stop signal handling
@ -67,42 +76,55 @@ class AsyncZmqActor(AsyncZmqThread):
override following: override following:
``` ```
async def _run(self) async def _run_thread(self)
async def _on_message(self, message: str) async def _on_message_to_thread(self, message: str)
``` ```
both methods run in actor's thread
you can call `self._send_message_from_thread('txt')`
to receive it later in `self._recv_message_from_thread()`.
Example: Example:
``` ```
class MyActor(AsyncZmqActor): class MyActor(AsyncZmqActor):
async def _run(self): async def _run_thread(self):
self.counter = 0 self.counter = 0
# runs in a different thread # runs in a different thread
await self._send_message('some_txt_message_to_outer_world') await self._send_message_from_thread('some_txt_message_to_actor')
def async _on_message(self, message): def async _on_message_to_thread(self, message):
# runs in Thread-actor # runs in Thread-actor
self.counter++ self.counter++
asyncZmqActor = MyActor() asyncZmqActor = MyActor()
asyncZmqActor.start() asyncZmqActor.start()
``` ```
""" """
def __init__(self): def __init__(self):
# we have a seperate zmq context, so zqm address 'inproc://xxx' doesn't matter super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR)
super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), 'inproc://xxx')
self.__actor_socket = self._zmq_context.socket(zmq.PAIR) self.__actor_socket = self._zmq_context.socket(zmq.PAIR)
self.__actor_socket.bind(self.zmq_socket_addr) self.__actor_socket.bind(ZMQ_THREAD_ACTOR_ADDR)
async def _put_message_to_thread(self, message: str):
"""It "sends" `message` to thread,
but we can't await it's `AsyncZmqThread._on_message_to_thread()`
async def put_message(self, message: str): so it's "put", not "send"
"""
await self.__actor_socket.send_string(message) await self.__actor_socket.send_string(message)
def __aiter__(self): async def _recv_message_from_thread(self) -> str:
return self """Returns next message ``'txt'`` from thread sent by
async def __anext__(self) -> str: ``AsyncZmqActor._send_message_from_thread('txt')``
"""
return await self.__actor_socket.recv_string() return await self.__actor_socket.recv_string()
# TODO: implement graceful stopping # TODO: implement graceful stopping

Loading…
Cancel
Save