Skip to content

Fast failing discovery on certain errors. #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,11 @@ def verify_connectivity(self, **config):
return self._verify_routing_connectivity()

def _verify_routing_connectivity(self):
from neo4j.exceptions import ServiceUnavailable
from neo4j._exceptions import BoltHandshakeError
from neo4j.exceptions import (
Neo4jError,
ServiceUnavailable,
SessionExpired,
)

table = self._pool.get_routing_table_for_default_database()
routing_info = {}
Expand All @@ -450,9 +453,8 @@ def _verify_routing_connectivity(self):
timeout=self._default_workspace_config
.connection_acquisition_timeout
)
except BoltHandshakeError as error:
except (ServiceUnavailable, SessionExpired, Neo4jError):
routing_info[ix] = None

for key, val in routing_info.items():
if val is not None:
return routing_info
Expand Down
5 changes: 0 additions & 5 deletions neo4j/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ class BoltTransactionError(BoltError):
# TODO: pass the transaction object in as an argument


class BoltRoutingError(BoltError):
""" Raised when a fault occurs with obtaining a routing table.
"""


class BoltFailure(BoltError):
""" Holds a Cypher failure.
"""
Expand Down
15 changes: 15 additions & 0 deletions neo4j/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ def _extract_error_class(cls, classification, code):
def invalidates_all_connections(self):
return self.code == "Neo.ClientError.Security.AuthorizationExpired"

def is_fatal_during_discovery(self):
# checks if the code is an error that is caused by the client. In this
# case the driver should fail fast during discovery.
if not isinstance(self.code, str):
return False
if self.code in ("Neo.ClientError.Database.DatabaseNotFound",
"Neo.ClientError.Transaction.InvalidBookmark",
"Neo.ClientError.Transaction.InvalidBookmarkMixture"):
return True
if (self.code.startswith("Neo.ClientError.Security.")
and self.code != "Neo.ClientError.Security."
"AuthorizationExpired"):
return True
return False

def __str__(self):
return "{{code: {code}}} {{message: {message}}}".format(code=self.code, message=self.message)

Expand Down
46 changes: 16 additions & 30 deletions neo4j/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
BoltError,
BoltHandshakeError,
BoltProtocolError,
BoltRoutingError,
BoltSecurityError,
)
from neo4j.addressing import Address
Expand Down Expand Up @@ -937,35 +936,15 @@ def fetch_routing_info(self, address, database, imp_user, bookmarks,
:raise ServiceUnavailable: if the server does not support
routing, or if routing support is broken or outdated
"""
cx = self._acquire(address, timeout)
try:
cx = self._acquire(address, timeout)
try:
routing_table = cx.route(
database or self.workspace_config.database,
imp_user or self.workspace_config.impersonated_user,
bookmarks
)
finally:
self.release(cx)
except BoltRoutingError as error:
# Connection was successful, but routing support is
# broken. This may indicate that the routing procedure
# does not exist (for protocol versions prior to 4.3).
# This error is converted into ServiceUnavailable,
# therefore surfacing to the application as a signal that
# routing is broken.
log.debug("Routing is broken (%s)", error)
raise ServiceUnavailable(*error.args)
except (ServiceUnavailable, SessionExpired) as error:
# The routing table request suffered a connection
# failure. This should return a null routing table,
# signalling to the caller to retry the request
# elsewhere.
log.debug("Routing is unavailable (%s)", error)
routing_table = None
# If the routing table is empty, deactivate the address.
if not routing_table:
self.deactivate(address)
routing_table = cx.route(
database or self.workspace_config.database,
imp_user or self.workspace_config.impersonated_user,
bookmarks
)
finally:
self.release(cx)
return routing_table

def fetch_routing_table(self, *, address, timeout, database, imp_user,
Expand All @@ -984,12 +963,19 @@ def fetch_routing_table(self, *, address, timeout, database, imp_user,
:return: a new RoutingTable instance or None if the given router is
currently unable to provide routing information
"""
new_routing_info = None
try:
new_routing_info = self.fetch_routing_info(
address, database, imp_user, bookmarks, timeout
)
except Neo4jError as e:
# checks if the code is an error that is caused by the client. In
# this case there is no sense in trying to fetch a RT from another
# router. Hence, the driver should fail fast during discovery.
if e.is_fatal_during_discovery():
raise
except (ServiceUnavailable, SessionExpired):
new_routing_info = None
pass
if not new_routing_info:
log.debug("Failed to fetch routing info %s", address)
return None
Expand Down
9 changes: 1 addition & 8 deletions neo4j/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,14 @@ def route(self, database=None, imp_user=None, bookmarks=None):
metadata = {}
records = []

def fail(md):
from neo4j._exceptions import BoltRoutingError
if md.get("code") == "Neo.ClientError.Procedure.ProcedureNotFound":
raise BoltRoutingError("Server does not support routing", self.unresolved_address)
else:
raise BoltRoutingError("Routing support broken on server", self.unresolved_address)

# Ignoring database and bookmarks because there is no multi-db support.
# The bookmarks are only relevant for making sure a previously created
# db exists before querying a routing table for it.
self.run(
"CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering.
{"context": self.routing_context},
mode="r", # Bolt Protocol Version(3, 0) supports mode="r"
on_success=metadata.update, on_failure=fail
on_success=metadata.update
)
self.pull(on_success=metadata.update, on_records=records.extend)
self.send_all()
Expand Down
44 changes: 4 additions & 40 deletions neo4j/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,14 @@ def route(self, database=None, imp_user=None, bookmarks=None):
metadata = {}
records = []

def fail(md):
from neo4j._exceptions import BoltRoutingError
code = md.get("code")
if code == "Neo.ClientError.Database.DatabaseNotFound":
return # surface this error to the user
elif code == "Neo.ClientError.Procedure.ProcedureNotFound":
raise BoltRoutingError("Server does not support routing", self.unresolved_address)
else:
raise BoltRoutingError("Routing support broken on server", self.unresolved_address)

if database is None: # default database
self.run(
"CALL dbms.routing.getRoutingTable($context)",
{"context": self.routing_context},
mode="r",
bookmarks=bookmarks,
db=SYSTEM_DATABASE,
on_success=metadata.update, on_failure=fail
on_success=metadata.update
)
else:
self.run(
Expand All @@ -161,7 +151,7 @@ def fail(md):
mode="r",
bookmarks=bookmarks,
db=SYSTEM_DATABASE,
on_success=metadata.update, on_failure=fail
on_success=metadata.update
)
self.pull(on_success=metadata.update, on_records=records.extend)
self.send_all()
Expand Down Expand Up @@ -409,18 +399,6 @@ def route(self, database=None, imp_user=None, bookmarks=None):
)
)

def fail(md):
from neo4j._exceptions import BoltRoutingError
code = md.get("code")
if code == "Neo.ClientError.Database.DatabaseNotFound":
return # surface this error to the user
elif code == "Neo.ClientError.Procedure.ProcedureNotFound":
raise BoltRoutingError("Server does not support routing",
self.unresolved_address)
else:
raise BoltRoutingError("Routing support broken on server",
self.unresolved_address)

routing_context = self.routing_context or {}
log.debug("[#%04X] C: ROUTE %r %r %r", self.local_port,
routing_context, bookmarks, database)
Expand All @@ -431,8 +409,7 @@ def fail(md):
bookmarks = list(bookmarks)
self._append(b"\x66", (routing_context, bookmarks, database),
response=Response(self, "route",
on_success=metadata.update,
on_failure=fail))
on_success=metadata.update))
self.send_all()
self.fetch_all()
return [metadata.get("rt")]
Expand Down Expand Up @@ -476,18 +453,6 @@ class Bolt4x4(Bolt4x3):
PROTOCOL_VERSION = Version(4, 4)

def route(self, database=None, imp_user=None, bookmarks=None):
def fail(md):
from neo4j._exceptions import BoltRoutingError
code = md.get("code")
if code == "Neo.ClientError.Database.DatabaseNotFound":
return # surface this error to the user
elif code == "Neo.ClientError.Procedure.ProcedureNotFound":
raise BoltRoutingError("Server does not support routing",
self.unresolved_address)
else:
raise BoltRoutingError("Routing support broken on server",
self.unresolved_address)

routing_context = self.routing_context or {}
db_context = {}
if database is not None:
Expand All @@ -503,8 +468,7 @@ def fail(md):
bookmarks = list(bookmarks)
self._append(b"\x66", (routing_context, bookmarks, db_context),
response=Response(self, "route",
on_success=metadata.update,
on_failure=fail))
on_success=metadata.update))
self.send_all()
self.fetch_all()
return [metadata.get("rt")]
Expand Down
7 changes: 4 additions & 3 deletions neo4j/io/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ class InitResponse(Response):

def on_failure(self, metadata):
code = metadata.get("code")
message = metadata.get("message", "Connection initialisation failed")
if code == "Neo.ClientError.Security.Unauthorized":
raise AuthError(message)
raise Neo4jError.hydrate(**metadata)
else:
raise ServiceUnavailable(message)
raise ServiceUnavailable(
metadata.get("message", "Connection initialisation failed")
)


class CommitResponse(Response):
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from neo4j._exceptions import (
BoltError,
BoltHandshakeError,
BoltRoutingError,
BoltConnectionError,
BoltSecurityError,
BoltConnectionBroken,
Expand Down