Browse Source

server_service rollback

master
CorpGlory Inc. 5 years ago
parent
commit
c9c179cdae
  1. 29
      analytics/services/server_service.py

29
analytics/services/server_service.py

@ -36,7 +36,6 @@ class ServerService(utils.concurrent.AsyncZmqActor):
# this typing doesn't help vscode, maybe there is a mistake # this typing doesn't help vscode, maybe there is a mistake
self.__server_socket: Optional[websockets.Connect] = None self.__server_socket: Optional[websockets.Connect] = None
self.__request_next_id = 1 self.__request_next_id = 1
self.__reconnecting = False
self.__responses = dict() self.__responses = dict()
self.start() self.start()
@ -86,8 +85,6 @@ class ServerService(utils.concurrent.AsyncZmqActor):
await self.__server_socket_recv_loop() await self.__server_socket_recv_loop()
async def _on_message_to_thread(self, message: str): async def _on_message_to_thread(self, message: str):
if self.__server_socket is None or self.__server_socket.closed:
await self.__reconnect()
await self.__server_socket.send(message) await self.__server_socket.send(message)
async def __server_socket_recv_loop(self): async def __server_socket_recv_loop(self):
@ -98,31 +95,18 @@ class ServerService(utils.concurrent.AsyncZmqActor):
else: else:
asyncio.ensure_future(self._send_message_from_thread(received_string)) asyncio.ensure_future(self._send_message_from_thread(received_string))
async def __reconnect(self): async def __reconnect_recv(self) -> str:
if not self.__reconnecting: while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED:
self.__reconnecting = True try:
else: if self.__server_socket is None:
while self.__reconnecting:
await asyncio.sleep(1)
return
if not self.__server_socket is None:
await self.__server_socket.close()
self.__server_socket = await websockets.connect(config.HASTIC_SERVER_URL) self.__server_socket = await websockets.connect(config.HASTIC_SERVER_URL)
first_message = await self.__server_socket.recv() first_message = await self.__server_socket.recv()
if first_message == 'EALREADYEXISTING': if first_message == 'EALREADYEXISTING':
raise ConnectionError('Can`t connect as a second analytics') raise ConnectionError('Can`t connect as a second analytics')
self.__reconnecting = False
async def __reconnect_recv(self) -> str:
while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED:
try:
if self.__server_socket is None or self.__server_socket.closed:
await self.__reconnect()
return await self.__server_socket.recv() return await self.__server_socket.recv()
except (ConnectionRefusedError, websockets.ConnectionClosedError): except (ConnectionRefusedError, websockets.ConnectionClosedError):
if not self.__server_socket is None: if not self.__server_socket is None:
await self.__server_socket.close() self.__server_socket.close()
# TODO: this logic increases the number of ThreadPoolExecutor # TODO: this logic increases the number of ThreadPoolExecutor
self.__server_socket = None self.__server_socket = None
# TODO: move to config # TODO: move to config
@ -132,8 +116,7 @@ class ServerService(utils.concurrent.AsyncZmqActor):
raise InterruptedError() raise InterruptedError()
async def __handle_ping(self): async def __handle_ping(self):
if self.__server_socket is None or self.__server_socket.closed: # TODO: self.__server_socket can be None
await self.__reconnect()
await self.__server_socket.send('PONG') await self.__server_socket.send('PONG')
def __parse_message_or_save(self, text: str) -> Optional[ServerMessage]: def __parse_message_or_save(self, text: str) -> Optional[ServerMessage]:

Loading…
Cancel
Save