Skip to content

Fix race condition in websocket transport close #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions gql/transport/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ async def connect(self) -> None:

GRAPHQLWS_SUBPROTOCOL: Subprotocol = cast(Subprotocol, "graphql-ws")

log.debug("connect: starting")

if self.websocket is None and not self._connecting:

# Set connecting to True to avoid a race condition if user is trying
Expand Down Expand Up @@ -537,6 +539,8 @@ async def connect(self) -> None:
else:
raise TransportAlreadyConnected("Transport is already connected")

log.debug("connect: done")

async def _clean_close(self, e: Exception) -> None:
"""Coroutine which will:

Expand Down Expand Up @@ -569,35 +573,81 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
- close the websocket connection
- send the exception to all the remaining listeners
"""
if self.websocket:

log.debug("_close_coro: starting")

try:

# We should always have an active websocket connection here
assert self.websocket is not None

# Saving exception to raise it later if trying to use the transport
# after it has already closed.
self.close_exception = e

if clean_close:
await self._clean_close(e)
log.debug("_close_coro: starting clean_close")
try:
await self._clean_close(e)
except Exception as exc: # pragma: no cover
log.warning("Ignoring exception in _clean_close: " + repr(exc))

log.debug("_close_coro: sending exception to listeners")

# Send an exception to all remaining listeners
for query_id, listener in self.listeners.items():
await listener.set_exception(e)

log.debug("_close_coro: close websocket connection")

await self.websocket.close()

self.websocket = None
log.debug("_close_coro: websocket connection closed")

except Exception as exc: # pragma: no cover
log.warning("Exception catched in _close_coro: " + repr(exc))

finally:

log.debug("_close_coro: start cleanup")

self.websocket = None
self.close_task = None

self._wait_closed.set()

log.debug("_close_coro: exiting")

async def _fail(self, e: Exception, clean_close: bool = True) -> None:
log.debug("_fail: starting with exception: " + repr(e))

if self.close_task is None:
self.close_task = asyncio.shield(
asyncio.ensure_future(self._close_coro(e, clean_close=clean_close))

if self.websocket is None:
log.debug("_fail started with self.websocket == None -> already closed")
else:
self.close_task = asyncio.shield(
asyncio.ensure_future(self._close_coro(e, clean_close=clean_close))
)
else:
log.debug(
"close_task is not None in _fail. Previous exception is: "
+ repr(self.close_exception)
+ " New exception is: "
+ repr(e)
)

async def close(self) -> None:
log.debug("close: starting")

await self._fail(TransportClosed("Websocket GraphQL transport closed by user"))
await self.wait_closed()

log.debug("close: done")

async def wait_closed(self) -> None:
log.debug("wait_close: starting")

await self._wait_closed.wait()

log.debug("wait_close: done")