diff --git a/analytics/utils/concurrent.py b/analytics/utils/concurrent.py index 356c24e..f1ea152 100755 --- a/analytics/utils/concurrent.py +++ b/analytics/utils/concurrent.py @@ -1,6 +1,8 @@ import asyncio import threading import zmq +import sys + import zmq.asyncio from abc import ABC, abstractmethod @@ -18,11 +20,21 @@ class AsyncZmqThread(threading.Thread, ABC): """ def __init__(self, + run_until_complete: bool, zmq_context: zmq.asyncio.Context, zmq_socket_addr: str, zmq_socket_type = zmq.PAIR ): + """ + Parameters + ---------- + run_until_complete : bool + stop the thread when _run_thread completes. Set it to False + to be abel to run _on_message_to_thread() and make work there after + _run_thread completed + """ super(AsyncZmqThread, self).__init__() + self.run_until_complete = run_until_complete self._zmq_context = zmq_context # you can use it in child classes self.__zmq_socket_addr = zmq_socket_addr self.__zmq_socket_type = zmq_socket_type @@ -40,6 +52,7 @@ class AsyncZmqThread(threading.Thread, ABC): @abstractmethod async def _on_message_to_thread(self, message: str): """Override this method to receive messages""" + pass @abstractmethod async def _run_thread(self): @@ -59,6 +72,7 @@ class AsyncZmqThread(threading.Thread, ABC): await self._send_message_from_thread(f'{self.name}: ping {i}') ``` """ + pass def run(self): self.__asyncio_loop = asyncio.new_event_loop() @@ -66,7 +80,11 @@ class AsyncZmqThread(threading.Thread, ABC): self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type) self.__zmq_socket.connect(self.__zmq_socket_addr) asyncio.ensure_future(self.__message_recv_loop()) - self.__asyncio_loop.run_until_complete(self._run_thread()) + if self.run_until_complete: + self.__asyncio_loop.run_until_complete(self._run_thread()) + else: + asyncio.ensure_future(self._run_thread()) + self.__asyncio_loop.run_forever() # TODO: implement stop signal handling @@ -86,26 +104,46 @@ class AsyncZmqActor(AsyncZmqThread): to receive it later in `self._recv_message_from_thread()`. - Example: + Example 1: ``` class MyActor(AsyncZmqActor): async def _run_thread(self): self.counter = 0 - # runs in a different thread + # runs in a different MyActor-thread await self._send_message_from_thread('some_txt_message_to_actor') - def async _on_message_to_thread(self, message): - # runs in Thread-actor + async def _on_message_to_thread(self, message): + # some work in handler in MyActor-thread self.counter++ + asyncZmqActor = MyActor() + asyncZmqActor.start() + ``` + + Example 2 + ``` + import time + class MyActor(AsyncZmqActor): + def __init__(self): + super(MyActor, self).__init__(False) + + async def _run_thread(self): + pass # no work here + + async def _on_message_to_thread(self, message): + time.sleep(1) + print('work on message finished') + asyncZmqActor = MyActor() asyncZmqActor.start() ``` """ - def __init__(self): - super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR) + def __init__(self, run_until_complete: bool = True): + super(AsyncZmqActor, self).__init__( + run_until_complete, zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR + ) self.__actor_socket = self._zmq_context.socket(zmq.PAIR) self.__actor_socket.bind(ZMQ_THREAD_ACTOR_ADDR)