diff --git a/analytics/services/server_service.py b/analytics/services/server_service.py index ba80111..9b3db03 100644 --- a/analytics/services/server_service.py +++ b/analytics/services/server_service.py @@ -36,7 +36,6 @@ class ServerService(utils.concurrent.AsyncZmqActor): # this typing doesn't help vscode, maybe there is a mistake self.__server_socket: Optional[websockets.Connect] = None self.__request_next_id = 1 - self.__reconnecting = False self.__responses = dict() self.start() @@ -86,8 +85,6 @@ class ServerService(utils.concurrent.AsyncZmqActor): await self.__server_socket_recv_loop() async def _on_message_to_thread(self, message: str): - if self.__server_socket is None or self.__server_socket.closed: - await self.__reconnect() await self.__server_socket.send(message) async def __server_socket_recv_loop(self): @@ -98,31 +95,18 @@ class ServerService(utils.concurrent.AsyncZmqActor): else: asyncio.ensure_future(self._send_message_from_thread(received_string)) - async def __reconnect(self): - if not self.__reconnecting: - self.__reconnecting = True - else: - while self.__reconnecting: - await asyncio.sleep(1) - return - - if not self.__server_socket is None: - await self.__server_socket.close() - self.__server_socket = await websockets.connect(config.HASTIC_SERVER_URL) - first_message = await self.__server_socket.recv() - if first_message == 'EALREADYEXISTING': - raise ConnectionError('Can`t connect as a second analytics') - self.__reconnecting = False - async def __reconnect_recv(self) -> str: while not SERVER_SOCKET_RECV_LOOP_INTERRUPTED: try: - if self.__server_socket is None or self.__server_socket.closed: - await self.__reconnect() + if self.__server_socket is None: + self.__server_socket = await websockets.connect(config.HASTIC_SERVER_URL) + first_message = await self.__server_socket.recv() + if first_message == 'EALREADYEXISTING': + raise ConnectionError('Can`t connect as a second analytics') return await self.__server_socket.recv() except (ConnectionRefusedError, websockets.ConnectionClosedError): if not self.__server_socket is None: - await self.__server_socket.close() + self.__server_socket.close() # TODO: this logic increases the number of ThreadPoolExecutor self.__server_socket = None # TODO: move to config @@ -132,8 +116,7 @@ class ServerService(utils.concurrent.AsyncZmqActor): raise InterruptedError() async def __handle_ping(self): - if self.__server_socket is None or self.__server_socket.closed: - await self.__reconnect() + # TODO: self.__server_socket can be None await self.__server_socket.send('PONG') def __parse_message_or_save(self, text: str) -> Optional[ServerMessage]: