-
Notifications
You must be signed in to change notification settings - Fork 25
Description
valkey-py 6.1.0
appears to handle asyncio cluster connections uncleanly. As a result, I am seeing crashes due to Unclosed ClusterNode object
errors being raised in the event loop.
I assume this is what triggers it:
-
In
valkey.asyncio.cluster.NodesManager.initialize()
,.set_nodes()
is called withremove_old=True
argument, if certain conditions are satisfied:
valkey-py/valkey/asyncio/cluster.py
Line 1406 in ca5c7c5
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True) -
.set_nodes()
creates asyncio tasks to disconnect any 'old' cluster nodes, but fails to await those tasks (the noqa pragma might be telling):
valkey-py/valkey/asyncio/cluster.py
Lines 1199 to 1215 in ca5c7c5
def set_nodes( self, old: Dict[str, "ClusterNode"], new: Dict[str, "ClusterNode"], remove_old: bool = False, ) -> None: if remove_old: for name in list(old.keys()): if name not in new: task = asyncio.create_task(old.pop(name).disconnect()) # noqa for name, node in new.items(): if name in old: if old[name] is node: continue task = asyncio.create_task(old[name].disconnect()) # noqa old[name] = node -
The finalizer of
valkey.asyncio.cluster.ClusterNode
checks for any connections that have not been disconnected yet. Since the disconnection tasks from 2. are not awaited, the failure is triggered:
valkey-py/valkey/asyncio/cluster.py
Lines 1022 to 1036 in ca5c7c5
def __del__( self, _warn: Any = warnings.warn, _grl: Any = asyncio.get_running_loop, ) -> None: for connection in self._connections: if connection.is_connected: _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self) try: context = {"client": self, "message": self._DEL_MESSAGE} _grl().call_exception_handler(context) except RuntimeError: pass break
As a potential solution, set_nodes()
could be made into a coroutine function that awaits the disconnect tasks before returning.
Client: valkey-py 6.1.0, CPython 3.13.2, containerized Linux on arm64
Database: Amazon ElastiCache for Valkey 8.0.1, t4g based instances, cluster mode enabled