From fbfa4cba5f5f5a1ebd13763da8611639f37e522f Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Mon, 17 Aug 2020 10:16:04 +0200 Subject: [PATCH 1/2] Adding debug logs + put _close_coro cleanup in finally block --- gql/transport/websockets.py | 69 ++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/gql/transport/websockets.py b/gql/transport/websockets.py index 4367aef6..4a56b6b2 100644 --- a/gql/transport/websockets.py +++ b/gql/transport/websockets.py @@ -569,35 +569,80 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None: - close the websocket connection - send the exception to all the remaining listeners """ + + log.info("_close_coro: starting") + if self.websocket: - # Saving exception to raise it later if trying to use the transport - # after it has already closed. - self.close_exception = e + try: + # Saving exception to raise it later if trying to use the transport + # after it has already closed. + self.close_exception = e + + if clean_close: + log.info("_close_coro: starting clean_close") + try: + await self._clean_close(e) + except Exception as exc: + log.debug("Ignoring exception in _clean_close: " + repr(exc)) + + log.info("_close_coro: sending exception to listeners") + + # Send an exception to all remaining listeners + for query_id, listener in self.listeners.items(): + await listener.set_exception(e) + + log.info("_close_coro: close websocket connection") + + await self.websocket.close() + + log.info("_close_coro: websocket connection closed") + + except Exception as exc: + log.debug("Exception catched in _close_coro: " + repr(exc)) - if clean_close: - await self._clean_close(e) + finally: + + log.info("_close_coro: start cleanup") - # Send an exception to all remaining listeners - for query_id, listener in self.listeners.items(): - await listener.set_exception(e) + self.websocket = None - await self.websocket.close() + self.close_task = None + self._wait_closed.set() - self.websocket = None + else: + log.debug( + "websocket is falsy in _close_coro: " + repr(self.close_exception) + ) - self.close_task = None - self._wait_closed.set() + log.info("_close_coro: exiting") async def _fail(self, e: Exception, clean_close: bool = True) -> None: + log.debug("_fail: starting with exception: " + repr(e)) + if self.close_task is None: self.close_task = asyncio.shield( asyncio.ensure_future(self._close_coro(e, clean_close=clean_close)) ) + else: + log.debug( + "close_task is not None in _fail. Previous exception is: " + + repr(self.close_exception) + + " New exception is: " + + repr(e) + ) async def close(self) -> None: + log.info("close: starting") + await self._fail(TransportClosed("Websocket GraphQL transport closed by user")) await self.wait_closed() + log.info("close: done") + async def wait_closed(self) -> None: + log.info("wait_close: starting") + await self._wait_closed.wait() + + log.info("wait_close: done") From 10f41920d69d31c62ce1b9775dfb151de0495cfb Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Mon, 17 Aug 2020 16:42:42 +0200 Subject: [PATCH 2/2] Remove race condition when _fail is started with self.websocket == None --- gql/transport/websockets.py | 81 ++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/gql/transport/websockets.py b/gql/transport/websockets.py index 4a56b6b2..07f8a5ff 100644 --- a/gql/transport/websockets.py +++ b/gql/transport/websockets.py @@ -482,6 +482,8 @@ async def connect(self) -> None: GRAPHQLWS_SUBPROTOCOL: Subprotocol = cast(Subprotocol, "graphql-ws") + log.debug("connect: starting") + if self.websocket is None and not self._connecting: # Set connecting to True to avoid a race condition if user is trying @@ -537,6 +539,8 @@ async def connect(self) -> None: else: raise TransportAlreadyConnected("Transport is already connected") + log.debug("connect: done") + async def _clean_close(self, e: Exception) -> None: """Coroutine which will: @@ -570,60 +574,61 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None: - send the exception to all the remaining listeners """ - log.info("_close_coro: starting") + log.debug("_close_coro: starting") - if self.websocket: + try: - try: - # Saving exception to raise it later if trying to use the transport - # after it has already closed. - self.close_exception = e + # We should always have an active websocket connection here + assert self.websocket is not None - if clean_close: - log.info("_close_coro: starting clean_close") - try: - await self._clean_close(e) - except Exception as exc: - log.debug("Ignoring exception in _clean_close: " + repr(exc)) + # Saving exception to raise it later if trying to use the transport + # after it has already closed. + self.close_exception = e - log.info("_close_coro: sending exception to listeners") + if clean_close: + log.debug("_close_coro: starting clean_close") + try: + await self._clean_close(e) + except Exception as exc: # pragma: no cover + log.warning("Ignoring exception in _clean_close: " + repr(exc)) - # Send an exception to all remaining listeners - for query_id, listener in self.listeners.items(): - await listener.set_exception(e) + log.debug("_close_coro: sending exception to listeners") - log.info("_close_coro: close websocket connection") + # Send an exception to all remaining listeners + for query_id, listener in self.listeners.items(): + await listener.set_exception(e) - await self.websocket.close() + log.debug("_close_coro: close websocket connection") - log.info("_close_coro: websocket connection closed") + await self.websocket.close() - except Exception as exc: - log.debug("Exception catched in _close_coro: " + repr(exc)) + log.debug("_close_coro: websocket connection closed") - finally: + except Exception as exc: # pragma: no cover + log.warning("Exception catched in _close_coro: " + repr(exc)) - log.info("_close_coro: start cleanup") + finally: - self.websocket = None + log.debug("_close_coro: start cleanup") - self.close_task = None - self._wait_closed.set() + self.websocket = None + self.close_task = None - else: - log.debug( - "websocket is falsy in _close_coro: " + repr(self.close_exception) - ) + self._wait_closed.set() - log.info("_close_coro: exiting") + log.debug("_close_coro: exiting") async def _fail(self, e: Exception, clean_close: bool = True) -> None: log.debug("_fail: starting with exception: " + repr(e)) if self.close_task is None: - self.close_task = asyncio.shield( - asyncio.ensure_future(self._close_coro(e, clean_close=clean_close)) - ) + + if self.websocket is None: + log.debug("_fail started with self.websocket == None -> already closed") + else: + self.close_task = asyncio.shield( + asyncio.ensure_future(self._close_coro(e, clean_close=clean_close)) + ) else: log.debug( "close_task is not None in _fail. Previous exception is: " @@ -633,16 +638,16 @@ async def _fail(self, e: Exception, clean_close: bool = True) -> None: ) async def close(self) -> None: - log.info("close: starting") + log.debug("close: starting") await self._fail(TransportClosed("Websocket GraphQL transport closed by user")) await self.wait_closed() - log.info("close: done") + log.debug("close: done") async def wait_closed(self) -> None: - log.info("wait_close: starting") + log.debug("wait_close: starting") await self._wait_closed.wait() - log.info("wait_close: done") + log.debug("wait_close: done")