From 29a06e6781dfea22575929b920373fd25d7f6748 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 17 Jul 2020 20:54:11 +0200 Subject: [PATCH 1/3] actor with non stoping execution --- analytics/utils/concurrent.py | 52 ++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/analytics/utils/concurrent.py b/analytics/utils/concurrent.py index 356c24e..f1ea152 100755 --- a/analytics/utils/concurrent.py +++ b/analytics/utils/concurrent.py @@ -1,6 +1,8 @@ import asyncio import threading import zmq +import sys + import zmq.asyncio from abc import ABC, abstractmethod @@ -18,11 +20,21 @@ class AsyncZmqThread(threading.Thread, ABC): """ def __init__(self, + run_until_complete: bool, zmq_context: zmq.asyncio.Context, zmq_socket_addr: str, zmq_socket_type = zmq.PAIR ): + """ + Parameters + ---------- + run_until_complete : bool + stop the thread when _run_thread completes. Set it to False + to be abel to run _on_message_to_thread() and make work there after + _run_thread completed + """ super(AsyncZmqThread, self).__init__() + self.run_until_complete = run_until_complete self._zmq_context = zmq_context # you can use it in child classes self.__zmq_socket_addr = zmq_socket_addr self.__zmq_socket_type = zmq_socket_type @@ -40,6 +52,7 @@ class AsyncZmqThread(threading.Thread, ABC): @abstractmethod async def _on_message_to_thread(self, message: str): """Override this method to receive messages""" + pass @abstractmethod async def _run_thread(self): @@ -59,6 +72,7 @@ class AsyncZmqThread(threading.Thread, ABC): await self._send_message_from_thread(f'{self.name}: ping {i}') ``` """ + pass def run(self): self.__asyncio_loop = asyncio.new_event_loop() @@ -66,7 +80,11 @@ class AsyncZmqThread(threading.Thread, ABC): self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type) self.__zmq_socket.connect(self.__zmq_socket_addr) asyncio.ensure_future(self.__message_recv_loop()) - self.__asyncio_loop.run_until_complete(self._run_thread()) + if self.run_until_complete: + self.__asyncio_loop.run_until_complete(self._run_thread()) + else: + asyncio.ensure_future(self._run_thread()) + self.__asyncio_loop.run_forever() # TODO: implement stop signal handling @@ -86,26 +104,46 @@ class AsyncZmqActor(AsyncZmqThread): to receive it later in `self._recv_message_from_thread()`. - Example: + Example 1: ``` class MyActor(AsyncZmqActor): async def _run_thread(self): self.counter = 0 - # runs in a different thread + # runs in a different MyActor-thread await self._send_message_from_thread('some_txt_message_to_actor') - def async _on_message_to_thread(self, message): - # runs in Thread-actor + async def _on_message_to_thread(self, message): + # some work in handler in MyActor-thread self.counter++ + asyncZmqActor = MyActor() + asyncZmqActor.start() + ``` + + Example 2 + ``` + import time + class MyActor(AsyncZmqActor): + def __init__(self): + super(MyActor, self).__init__(False) + + async def _run_thread(self): + pass # no work here + + async def _on_message_to_thread(self, message): + time.sleep(1) + print('work on message finished') + asyncZmqActor = MyActor() asyncZmqActor.start() ``` """ - def __init__(self): - super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR) + def __init__(self, run_until_complete: bool = True): + super(AsyncZmqActor, self).__init__( + run_until_complete, zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR + ) self.__actor_socket = self._zmq_context.socket(zmq.PAIR) self.__actor_socket.bind(ZMQ_THREAD_ACTOR_ADDR) From 5d9fb4a394d9b68f59a629c558835d8f6e3380e2 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 17 Jul 2020 22:08:33 +0200 Subject: [PATCH 2/3] run_until_complete -> run_until_run_thread_complete --- analytics/utils/concurrent.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/analytics/utils/concurrent.py b/analytics/utils/concurrent.py index f1ea152..eb89342 100755 --- a/analytics/utils/concurrent.py +++ b/analytics/utils/concurrent.py @@ -20,7 +20,7 @@ class AsyncZmqThread(threading.Thread, ABC): """ def __init__(self, - run_until_complete: bool, + run_until_run_thread_complete: bool, zmq_context: zmq.asyncio.Context, zmq_socket_addr: str, zmq_socket_type = zmq.PAIR @@ -28,13 +28,13 @@ class AsyncZmqThread(threading.Thread, ABC): """ Parameters ---------- - run_until_complete : bool + run_until_run_thread_complete : bool stop the thread when _run_thread completes. Set it to False to be abel to run _on_message_to_thread() and make work there after _run_thread completed """ super(AsyncZmqThread, self).__init__() - self.run_until_complete = run_until_complete + self.run_until_run_thread_complete = run_until_run_thread_complete self._zmq_context = zmq_context # you can use it in child classes self.__zmq_socket_addr = zmq_socket_addr self.__zmq_socket_type = zmq_socket_type @@ -80,7 +80,7 @@ class AsyncZmqThread(threading.Thread, ABC): self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type) self.__zmq_socket.connect(self.__zmq_socket_addr) asyncio.ensure_future(self.__message_recv_loop()) - if self.run_until_complete: + if self.run_until_run_thread_complete: self.__asyncio_loop.run_until_complete(self._run_thread()) else: asyncio.ensure_future(self._run_thread()) From ef7b721843f4577ec96763be1001f6be68efe4c1 Mon Sep 17 00:00:00 2001 From: Alexey Velikiy Date: Fri, 17 Jul 2020 22:10:30 +0200 Subject: [PATCH 3/3] run_until_run_thread_complete --- analytics/utils/concurrent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics/utils/concurrent.py b/analytics/utils/concurrent.py index eb89342..19a87f5 100755 --- a/analytics/utils/concurrent.py +++ b/analytics/utils/concurrent.py @@ -140,9 +140,9 @@ class AsyncZmqActor(AsyncZmqThread): ``` """ - def __init__(self, run_until_complete: bool = True): + def __init__(self, run_until_run_thread_complete: bool = True): super(AsyncZmqActor, self).__init__( - run_until_complete, zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR + run_until_run_thread_complete, zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR ) self.__actor_socket = self._zmq_context.socket(zmq.PAIR)