|
|
@ -1,6 +1,8 @@ |
|
|
|
import asyncio |
|
|
|
import asyncio |
|
|
|
import threading |
|
|
|
import threading |
|
|
|
import zmq |
|
|
|
import zmq |
|
|
|
|
|
|
|
import sys |
|
|
|
|
|
|
|
|
|
|
|
import zmq.asyncio |
|
|
|
import zmq.asyncio |
|
|
|
from abc import ABC, abstractmethod |
|
|
|
from abc import ABC, abstractmethod |
|
|
|
|
|
|
|
|
|
|
@ -18,11 +20,21 @@ class AsyncZmqThread(threading.Thread, ABC): |
|
|
|
""" |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, |
|
|
|
def __init__(self, |
|
|
|
|
|
|
|
run_until_complete: bool, |
|
|
|
zmq_context: zmq.asyncio.Context, |
|
|
|
zmq_context: zmq.asyncio.Context, |
|
|
|
zmq_socket_addr: str, |
|
|
|
zmq_socket_addr: str, |
|
|
|
zmq_socket_type = zmq.PAIR |
|
|
|
zmq_socket_type = zmq.PAIR |
|
|
|
): |
|
|
|
): |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
Parameters |
|
|
|
|
|
|
|
---------- |
|
|
|
|
|
|
|
run_until_complete : bool |
|
|
|
|
|
|
|
stop the thread when _run_thread completes. Set it to False |
|
|
|
|
|
|
|
to be abel to run _on_message_to_thread() and make work there after |
|
|
|
|
|
|
|
_run_thread completed |
|
|
|
|
|
|
|
""" |
|
|
|
super(AsyncZmqThread, self).__init__() |
|
|
|
super(AsyncZmqThread, self).__init__() |
|
|
|
|
|
|
|
self.run_until_complete = run_until_complete |
|
|
|
self._zmq_context = zmq_context # you can use it in child classes |
|
|
|
self._zmq_context = zmq_context # you can use it in child classes |
|
|
|
self.__zmq_socket_addr = zmq_socket_addr |
|
|
|
self.__zmq_socket_addr = zmq_socket_addr |
|
|
|
self.__zmq_socket_type = zmq_socket_type |
|
|
|
self.__zmq_socket_type = zmq_socket_type |
|
|
@ -40,6 +52,7 @@ class AsyncZmqThread(threading.Thread, ABC): |
|
|
|
@abstractmethod |
|
|
|
@abstractmethod |
|
|
|
async def _on_message_to_thread(self, message: str): |
|
|
|
async def _on_message_to_thread(self, message: str): |
|
|
|
"""Override this method to receive messages""" |
|
|
|
"""Override this method to receive messages""" |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod |
|
|
|
@abstractmethod |
|
|
|
async def _run_thread(self): |
|
|
|
async def _run_thread(self): |
|
|
@ -59,6 +72,7 @@ class AsyncZmqThread(threading.Thread, ABC): |
|
|
|
await self._send_message_from_thread(f'{self.name}: ping {i}') |
|
|
|
await self._send_message_from_thread(f'{self.name}: ping {i}') |
|
|
|
``` |
|
|
|
``` |
|
|
|
""" |
|
|
|
""" |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def run(self): |
|
|
|
def run(self): |
|
|
|
self.__asyncio_loop = asyncio.new_event_loop() |
|
|
|
self.__asyncio_loop = asyncio.new_event_loop() |
|
|
@ -66,7 +80,11 @@ class AsyncZmqThread(threading.Thread, ABC): |
|
|
|
self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type) |
|
|
|
self.__zmq_socket = self._zmq_context.socket(self.__zmq_socket_type) |
|
|
|
self.__zmq_socket.connect(self.__zmq_socket_addr) |
|
|
|
self.__zmq_socket.connect(self.__zmq_socket_addr) |
|
|
|
asyncio.ensure_future(self.__message_recv_loop()) |
|
|
|
asyncio.ensure_future(self.__message_recv_loop()) |
|
|
|
|
|
|
|
if self.run_until_complete: |
|
|
|
self.__asyncio_loop.run_until_complete(self._run_thread()) |
|
|
|
self.__asyncio_loop.run_until_complete(self._run_thread()) |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
asyncio.ensure_future(self._run_thread()) |
|
|
|
|
|
|
|
self.__asyncio_loop.run_forever() |
|
|
|
|
|
|
|
|
|
|
|
# TODO: implement stop signal handling |
|
|
|
# TODO: implement stop signal handling |
|
|
|
|
|
|
|
|
|
|
@ -86,26 +104,46 @@ class AsyncZmqActor(AsyncZmqThread): |
|
|
|
|
|
|
|
|
|
|
|
to receive it later in `self._recv_message_from_thread()`. |
|
|
|
to receive it later in `self._recv_message_from_thread()`. |
|
|
|
|
|
|
|
|
|
|
|
Example: |
|
|
|
Example 1: |
|
|
|
|
|
|
|
|
|
|
|
``` |
|
|
|
``` |
|
|
|
class MyActor(AsyncZmqActor): |
|
|
|
class MyActor(AsyncZmqActor): |
|
|
|
async def _run_thread(self): |
|
|
|
async def _run_thread(self): |
|
|
|
self.counter = 0 |
|
|
|
self.counter = 0 |
|
|
|
# runs in a different thread |
|
|
|
# runs in a different MyActor-thread |
|
|
|
await self._send_message_from_thread('some_txt_message_to_actor') |
|
|
|
await self._send_message_from_thread('some_txt_message_to_actor') |
|
|
|
|
|
|
|
|
|
|
|
def async _on_message_to_thread(self, message): |
|
|
|
async def _on_message_to_thread(self, message): |
|
|
|
# runs in Thread-actor |
|
|
|
# some work in handler in MyActor-thread |
|
|
|
self.counter++ |
|
|
|
self.counter++ |
|
|
|
|
|
|
|
|
|
|
|
asyncZmqActor = MyActor() |
|
|
|
asyncZmqActor = MyActor() |
|
|
|
asyncZmqActor.start() |
|
|
|
asyncZmqActor.start() |
|
|
|
``` |
|
|
|
``` |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Example 2 |
|
|
|
|
|
|
|
``` |
|
|
|
|
|
|
|
import time |
|
|
|
|
|
|
|
class MyActor(AsyncZmqActor): |
|
|
|
def __init__(self): |
|
|
|
def __init__(self): |
|
|
|
super(AsyncZmqActor, self).__init__(zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR) |
|
|
|
super(MyActor, self).__init__(False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _run_thread(self): |
|
|
|
|
|
|
|
pass # no work here |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _on_message_to_thread(self, message): |
|
|
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
print('work on message finished') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncZmqActor = MyActor() |
|
|
|
|
|
|
|
asyncZmqActor.start() |
|
|
|
|
|
|
|
``` |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, run_until_complete: bool = True): |
|
|
|
|
|
|
|
super(AsyncZmqActor, self).__init__( |
|
|
|
|
|
|
|
run_until_complete, zmq.asyncio.Context(), ZMQ_THREAD_ACTOR_ADDR |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
self.__actor_socket = self._zmq_context.socket(zmq.PAIR) |
|
|
|
self.__actor_socket = self._zmq_context.socket(zmq.PAIR) |
|
|
|
self.__actor_socket.bind(ZMQ_THREAD_ACTOR_ADDR) |
|
|
|
self.__actor_socket.bind(ZMQ_THREAD_ACTOR_ADDR) |
|
|
|