Skip to content

Commit 1a2dcec

Browse files
committed
Fix race condition in websocket transport close (graphql-python#133)
1 parent bd72e79 commit 1a2dcec

File tree

1 file changed

+55
-5
lines changed

1 file changed

+55
-5
lines changed

gql/transport/websockets.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,8 @@ async def connect(self) -> None:
488488

489489
GRAPHQLWS_SUBPROTOCOL: Subprotocol = cast(Subprotocol, "graphql-ws")
490490

491+
log.debug("connect: starting")
492+
491493
if self.websocket is None and not self._connecting:
492494

493495
# Set connecting to True to avoid a race condition if user is trying
@@ -543,6 +545,8 @@ async def connect(self) -> None:
543545
else:
544546
raise TransportAlreadyConnected("Transport is already connected")
545547

548+
log.debug("connect: done")
549+
546550
async def _clean_close(self, e: Exception) -> None:
547551
"""Coroutine which will:
548552
@@ -575,35 +579,81 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
575579
- close the websocket connection
576580
- send the exception to all the remaining listeners
577581
"""
578-
if self.websocket:
582+
583+
log.debug("_close_coro: starting")
584+
585+
try:
586+
587+
# We should always have an active websocket connection here
588+
assert self.websocket is not None
579589

580590
# Saving exception to raise it later if trying to use the transport
581591
# after it has already closed.
582592
self.close_exception = e
583593

584594
if clean_close:
585-
await self._clean_close(e)
595+
log.debug("_close_coro: starting clean_close")
596+
try:
597+
await self._clean_close(e)
598+
except Exception as exc: # pragma: no cover
599+
log.warning("Ignoring exception in _clean_close: " + repr(exc))
600+
601+
log.debug("_close_coro: sending exception to listeners")
586602

587603
# Send an exception to all remaining listeners
588604
for query_id, listener in self.listeners.items():
589605
await listener.set_exception(e)
590606

607+
log.debug("_close_coro: close websocket connection")
608+
591609
await self.websocket.close()
592610

593-
self.websocket = None
611+
log.debug("_close_coro: websocket connection closed")
612+
613+
except Exception as exc: # pragma: no cover
614+
log.warning("Exception catched in _close_coro: " + repr(exc))
615+
616+
finally:
594617

618+
log.debug("_close_coro: start cleanup")
619+
620+
self.websocket = None
595621
self.close_task = None
622+
596623
self._wait_closed.set()
597624

625+
log.debug("_close_coro: exiting")
626+
598627
async def _fail(self, e: Exception, clean_close: bool = True) -> None:
628+
log.debug("_fail: starting with exception: " + repr(e))
629+
599630
if self.close_task is None:
600-
self.close_task = asyncio.shield(
601-
asyncio.ensure_future(self._close_coro(e, clean_close=clean_close))
631+
632+
if self.websocket is None:
633+
log.debug("_fail started with self.websocket == None -> already closed")
634+
else:
635+
self.close_task = asyncio.shield(
636+
asyncio.ensure_future(self._close_coro(e, clean_close=clean_close))
637+
)
638+
else:
639+
log.debug(
640+
"close_task is not None in _fail. Previous exception is: "
641+
+ repr(self.close_exception)
642+
+ " New exception is: "
643+
+ repr(e)
602644
)
603645

604646
async def close(self) -> None:
647+
log.debug("close: starting")
648+
605649
await self._fail(TransportClosed("Websocket GraphQL transport closed by user"))
606650
await self.wait_closed()
607651

652+
log.debug("close: done")
653+
608654
async def wait_closed(self) -> None:
655+
log.debug("wait_close: starting")
656+
609657
await self._wait_closed.wait()
658+
659+
log.debug("wait_close: done")

0 commit comments

Comments
 (0)