From bed130bf46d129ce26ec233660757d48cc0d187b Mon Sep 17 00:00:00 2001 From: Ro'e Katz Date: Thu, 9 Jun 2022 16:28:47 +0300 Subject: [PATCH] Postgres backend: handle connection loss --- broadcaster/_backends/postgres.py | 8 ++++++-- broadcaster/_base.py | 8 ++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/broadcaster/_backends/postgres.py b/broadcaster/_backends/postgres.py index 47ef4f6..0fd20d5 100644 --- a/broadcaster/_backends/postgres.py +++ b/broadcaster/_backends/postgres.py @@ -14,6 +14,7 @@ def __init__(self, url: str): async def connect(self) -> None: self._conn = await asyncpg.connect(self._url) self._listen_queue: asyncio.Queue = asyncio.Queue() + self._conn.add_termination_listener(self._termination_listener) async def disconnect(self) -> None: await self._conn.close() @@ -27,10 +28,13 @@ async def unsubscribe(self, channel: str) -> None: async def publish(self, channel: str, message: str) -> None: await self._conn.execute("SELECT pg_notify($1, $2);", channel, message) - def _listener(self, *args: Any) -> None: + async def _listener(self, *args: Any) -> None: connection, pid, channel, payload = args event = Event(channel=channel, message=payload) - self._listen_queue.put_nowait(event) + await self._listen_queue.put(event) + + async def _termination_listener(self, *args: Any) -> None: + await self._listen_queue.put(None) async def next_published(self) -> Event: return await self._listen_queue.get() diff --git a/broadcaster/_base.py b/broadcaster/_base.py index c58cb1d..045999b 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -72,9 +72,17 @@ async def disconnect(self) -> None: async def _listener(self) -> None: while True: event = await self._backend.next_published() + if event is None: + # Backend is disconnected + break + for queue in list(self._subscribers.get(event.channel, [])): await queue.put(event) + # Ubsubscribe all + for queue in sum([list(qs) for qs in self._subscribers.values()], []): + await queue.put(None) + async def publish(self, channel: str, message: Any) -> None: await self._backend.publish(channel, message)