Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions src/neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import asyncio
import logging
import math
import sys
from collections import (
defaultdict,
deque,
Expand Down Expand Up @@ -53,6 +54,7 @@
DriverError,
Neo4jError,
ReadServiceUnavailable,
RoutingServiceUnavailable,
ServiceUnavailable,
SessionExpired,
WriteServiceUnavailable,
Expand Down Expand Up @@ -810,6 +812,7 @@ async def fetch_routing_table(
imp_user,
bookmarks,
auth,
ignored_errors=None,
):
"""
Fetch a routing table from a given router address.
Expand All @@ -823,6 +826,7 @@ async def fetch_routing_table(
:type imp_user: str or None
:param bookmarks: bookmarks used when fetching routing table
:param auth: auth
:param ignored_errors: optional list to accumulate ignored errors in

:returns: a new RoutingTable instance or None if the given router is
currently unable to provide routing information
Expand All @@ -843,15 +847,16 @@ async def fetch_routing_table(
# router. Hence, the driver should fail fast during discovery.
if e._is_fatal_during_discovery():
raise
except (ServiceUnavailable, SessionExpired):
pass
if ignored_errors is not None:
ignored_errors.append(e)
except (ServiceUnavailable, SessionExpired) as e:
if ignored_errors is not None:
ignored_errors.append(e)
if not new_routing_info:
log.debug(
"[#0000] _: <POOL> failed to fetch routing info from %r",
address,
)
# TODO: 7.0 - when Python 3.11+ is the minimum,
# use exception groups instead of swallowing discovery errors
return None
else:
servers = new_routing_info[0]["servers"]
Expand All @@ -876,6 +881,12 @@ async def fetch_routing_table(
"server %s",
address,
)
if ignored_errors is not None:
ignored_errors.append(
RoutingServiceUnavailable(
"Rejected routing table: no routers"
)
)
return None

# No readers
Expand All @@ -884,6 +895,12 @@ async def fetch_routing_table(
"[#0000] _: <POOL> no read servers returned from server %s",
address,
)
if ignored_errors is not None:
ignored_errors.append(
ReadServiceUnavailable(
"Rejected routing table: no readers"
)
)
return None

# At least one of each is fine, so return this table
Expand All @@ -898,6 +915,7 @@ async def _update_routing_table_from(
auth,
acquisition_timeout,
database_callback,
ignored_errors=None,
):
"""
Try to update routing tables with the given routers.
Expand All @@ -924,6 +942,7 @@ async def _update_routing_table_from(
imp_user=imp_user,
bookmarks=bookmarks,
auth=auth,
ignored_errors=ignored_errors,
)
if new_routing_table is not None:
new_database = new_routing_table.database
Expand Down Expand Up @@ -973,6 +992,7 @@ async def update_routing_table(
acquisition_timeout = acquisition_timeout_to_deadline(
acquisition_timeout
)
errors = []
async with self.refresh_lock:
routing_table = await self.get_routing_table(database)
if routing_table is not None:
Expand All @@ -997,6 +1017,7 @@ async def update_routing_table(
auth=auth,
acquisition_timeout=acquisition_timeout,
database_callback=database_callback,
ignored_errors=errors,
)
):
# Why is only the first initial routing address used?
Expand All @@ -1009,6 +1030,7 @@ async def update_routing_table(
auth=auth,
acquisition_timeout=acquisition_timeout,
database_callback=database_callback,
ignored_errors=errors,
):
return

Expand All @@ -1022,14 +1044,41 @@ async def update_routing_table(
auth=auth,
acquisition_timeout=acquisition_timeout,
database_callback=database_callback,
ignored_errors=errors,
)
):
# Why is only the first initial routing address used?
return

# None of the routers have been successful, so just fail
log.error("Unable to retrieve routing information")
raise ServiceUnavailable("Unable to retrieve routing information")
if sys.version_info >= (3, 11):
e = ExceptionGroup( # noqa: F821 # version guard in place
"All routing table requests failed", errors
)
else:
e = None
for error in errors:
if e is None:
e = error
continue
cause = error
seen_causes = {id(cause)}
while True:
next_cause = getattr(cause, "__cause__", None)
if next_cause is None:
break
if id(next_cause) in seen_causes:
# Avoid infinite recursion in case of circular
# references.
break
cause = next_cause
seen_causes.add(id(cause))
cause.__cause__ = e
e = error
raise ServiceUnavailable(
"Unable to retrieve routing information"
) from e

async def update_connection_pool(self):
async with self.refresh_lock:
Expand Down
59 changes: 54 additions & 5 deletions src/neo4j/_sync/io/_pool.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading