From c7d02cd63e6289a2860398cd553286b36f63e80c Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Wed, 11 Aug 2021 01:46:22 +0200 Subject: [PATCH 1/3] Initialize heartbeat_task correctly --- gql/transport/phoenix_channel_websockets.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gql/transport/phoenix_channel_websockets.py b/gql/transport/phoenix_channel_websockets.py index 27e58f2a..26ab159b 100644 --- a/gql/transport/phoenix_channel_websockets.py +++ b/gql/transport/phoenix_channel_websockets.py @@ -34,6 +34,7 @@ def __init__( """ self.channel_name = channel_name self.heartbeat_interval = heartbeat_interval + self.heartbeat_task: Optional[asyncio.Future] = None self.subscription_ids_to_query_ids: Dict[str, int] = {} super(PhoenixChannelWebsocketsTransport, self).__init__(*args, **kwargs) From fdb8307245309229e71fd2d4c9527caebad3d769 Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Wed, 11 Aug 2021 01:47:52 +0200 Subject: [PATCH 2/3] Add debug logs to phoenix subscription test --- tests/test_phoenix_channel_subscription.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_phoenix_channel_subscription.py b/tests/test_phoenix_channel_subscription.py index ef46db47..16ce5c34 100644 --- a/tests/test_phoenix_channel_subscription.py +++ b/tests/test_phoenix_channel_subscription.py @@ -101,9 +101,13 @@ async def stopping_coro(): @pytest.mark.parametrize("server", [server_countdown], indirect=True) @pytest.mark.parametrize("subscription_str", [countdown_subscription_str]) async def test_phoenix_channel_subscription(event_loop, server, subscription_str): + import logging from gql.transport.phoenix_channel_websockets import ( PhoenixChannelWebsocketsTransport, ) + from gql.transport.websockets import log as websockets_logger + + websockets_logger.setLevel(logging.DEBUG) path = "/graphql" url = f"ws://{server.hostname}:{server.port}{path}" From f9e74e95bed343abc11528e04f409da66a71e259 Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Wed, 11 Aug 2021 01:48:51 +0200 Subject: [PATCH 3/3] Fix assertion error when transport is already closed when _receive is called --- gql/transport/websockets.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/gql/transport/websockets.py b/gql/transport/websockets.py index 7e26f31c..2dce0545 100644 --- a/gql/transport/websockets.py +++ b/gql/transport/websockets.py @@ -175,8 +175,9 @@ async def _receive(self) -> str: """Wait the next message from the websocket connection and log the answer """ - # We should always have an active websocket connection here - assert self.websocket is not None + # It is possible that the websocket has been already closed in another task + if self.websocket is None: + raise TransportClosed("Transport is already closed") # Wait for the next websocket frame. Can raise ConnectionClosed data: Data = await self.websocket.recv() @@ -387,6 +388,8 @@ async def _receive_data_loop(self) -> None: except (ConnectionClosed, TransportProtocolError) as e: await self._fail(e, clean_close=False) break + except TransportClosed: + break # Parse the answer try: