From 660498492ed07afacb5fa94f16274bfc220b47c5 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 26 Oct 2021 11:30:06 +0200 Subject: [PATCH 1/2] Fast failing discovery on certain errors. The driver tries its best to fetch a routing table. It will try all possible routers while skipping routers on most errors. However, there are a few errors that are caused by the client. Those errors should be surfaced to the user for a better UX/DX and should fail fast: there is no reason to try another router if we expect it tho return the same error. Those errors are: - `Neo.ClientError.Database.DatabaseNotFound` - all `Neo.ClientError.Security.*` - except `Neo.ClientError.Security.AuthorizationExpired` - `Neo.ClientError.Transaction.InvalidBookmark` - `Neo.ClientError.Transaction.InvalidBookmarkMixture` --- neo4j/__init__.py | 10 +++++--- neo4j/_exceptions.py | 5 ---- neo4j/exceptions.py | 15 ++++++++++++ neo4j/io/__init__.py | 46 ++++++++++++----------------------- neo4j/io/_bolt3.py | 9 +------ neo4j/io/_bolt4.py | 44 +++------------------------------ tests/unit/test_exceptions.py | 1 - 7 files changed, 42 insertions(+), 88 deletions(-) diff --git a/neo4j/__init__.py b/neo4j/__init__.py index 707d0c437..cd0139b65 100644 --- a/neo4j/__init__.py +++ b/neo4j/__init__.py @@ -435,8 +435,11 @@ def verify_connectivity(self, **config): return self._verify_routing_connectivity() def _verify_routing_connectivity(self): - from neo4j.exceptions import ServiceUnavailable - from neo4j._exceptions import BoltHandshakeError + from neo4j.exceptions import ( + Neo4jError, + ServiceUnavailable, + SessionExpired, + ) table = self._pool.get_routing_table_for_default_database() routing_info = {} @@ -450,9 +453,8 @@ def _verify_routing_connectivity(self): timeout=self._default_workspace_config .connection_acquisition_timeout ) - except BoltHandshakeError as error: + except (ServiceUnavailable, SessionExpired, Neo4jError): routing_info[ix] = None - for key, val in routing_info.items(): if val is not None: return routing_info diff --git a/neo4j/_exceptions.py b/neo4j/_exceptions.py index d50978547..67db7f6cf 100644 --- a/neo4j/_exceptions.py +++ b/neo4j/_exceptions.py @@ -102,11 +102,6 @@ class BoltTransactionError(BoltError): # TODO: pass the transaction object in as an argument -class BoltRoutingError(BoltError): - """ Raised when a fault occurs with obtaining a routing table. - """ - - class BoltFailure(BoltError): """ Holds a Cypher failure. """ diff --git a/neo4j/exceptions.py b/neo4j/exceptions.py index efd3174bd..c69c75c73 100644 --- a/neo4j/exceptions.py +++ b/neo4j/exceptions.py @@ -130,6 +130,21 @@ def _extract_error_class(cls, classification, code): def invalidates_all_connections(self): return self.code == "Neo.ClientError.Security.AuthorizationExpired" + def is_fatal_during_discovery(self): + # checks if the code is an error that is caused by the client. In this + # case the driver should fail fast during discovery. + if not isinstance(self.code, str): + return False + if self.code in ("Neo.ClientError.Database.DatabaseNotFound", + "Neo.ClientError.Transaction.InvalidBookmark", + "Neo.ClientError.Transaction.InvalidBookmarkMixture"): + return True + if (self.code.startswith("Neo.ClientError.Security.") + and self.code != "Neo.ClientError.Security." + "AuthorizationExpired"): + return True + return False + def __str__(self): return "{{code: {code}}} {{message: {message}}}".format(code=self.code, message=self.message) diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 71a35b247..863c73dfb 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -66,7 +66,6 @@ BoltError, BoltHandshakeError, BoltProtocolError, - BoltRoutingError, BoltSecurityError, ) from neo4j.addressing import Address @@ -937,35 +936,15 @@ def fetch_routing_info(self, address, database, imp_user, bookmarks, :raise ServiceUnavailable: if the server does not support routing, or if routing support is broken or outdated """ + cx = self._acquire(address, timeout) try: - cx = self._acquire(address, timeout) - try: - routing_table = cx.route( - database or self.workspace_config.database, - imp_user or self.workspace_config.impersonated_user, - bookmarks - ) - finally: - self.release(cx) - except BoltRoutingError as error: - # Connection was successful, but routing support is - # broken. This may indicate that the routing procedure - # does not exist (for protocol versions prior to 4.3). - # This error is converted into ServiceUnavailable, - # therefore surfacing to the application as a signal that - # routing is broken. - log.debug("Routing is broken (%s)", error) - raise ServiceUnavailable(*error.args) - except (ServiceUnavailable, SessionExpired) as error: - # The routing table request suffered a connection - # failure. This should return a null routing table, - # signalling to the caller to retry the request - # elsewhere. - log.debug("Routing is unavailable (%s)", error) - routing_table = None - # If the routing table is empty, deactivate the address. - if not routing_table: - self.deactivate(address) + routing_table = cx.route( + database or self.workspace_config.database, + imp_user or self.workspace_config.impersonated_user, + bookmarks + ) + finally: + self.release(cx) return routing_table def fetch_routing_table(self, *, address, timeout, database, imp_user, @@ -984,12 +963,19 @@ def fetch_routing_table(self, *, address, timeout, database, imp_user, :return: a new RoutingTable instance or None if the given router is currently unable to provide routing information """ + new_routing_info = None try: new_routing_info = self.fetch_routing_info( address, database, imp_user, bookmarks, timeout ) + except Neo4jError as e: + # checks if the code is an error that is caused by the client. In + # this case there is no sense in trying to fetch a RT from another + # router. Hence, the driver should fail fast during discovery. + if e.is_fatal_during_discovery(): + raise except (ServiceUnavailable, SessionExpired): - new_routing_info = None + pass if not new_routing_info: log.debug("Failed to fetch routing info %s", address) return None diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index f0e49dce5..704d8740b 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -186,13 +186,6 @@ def route(self, database=None, imp_user=None, bookmarks=None): metadata = {} records = [] - def fail(md): - from neo4j._exceptions import BoltRoutingError - if md.get("code") == "Neo.ClientError.Procedure.ProcedureNotFound": - raise BoltRoutingError("Server does not support routing", self.unresolved_address) - else: - raise BoltRoutingError("Routing support broken on server", self.unresolved_address) - # Ignoring database and bookmarks because there is no multi-db support. # The bookmarks are only relevant for making sure a previously created # db exists before querying a routing table for it. @@ -200,7 +193,7 @@ def fail(md): "CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering. {"context": self.routing_context}, mode="r", # Bolt Protocol Version(3, 0) supports mode="r" - on_success=metadata.update, on_failure=fail + on_success=metadata.update ) self.pull(on_success=metadata.update, on_records=records.extend) self.send_all() diff --git a/neo4j/io/_bolt4.py b/neo4j/io/_bolt4.py index 4a9c47ab5..3bb90ab69 100644 --- a/neo4j/io/_bolt4.py +++ b/neo4j/io/_bolt4.py @@ -135,16 +135,6 @@ def route(self, database=None, imp_user=None, bookmarks=None): metadata = {} records = [] - def fail(md): - from neo4j._exceptions import BoltRoutingError - code = md.get("code") - if code == "Neo.ClientError.Database.DatabaseNotFound": - return # surface this error to the user - elif code == "Neo.ClientError.Procedure.ProcedureNotFound": - raise BoltRoutingError("Server does not support routing", self.unresolved_address) - else: - raise BoltRoutingError("Routing support broken on server", self.unresolved_address) - if database is None: # default database self.run( "CALL dbms.routing.getRoutingTable($context)", @@ -152,7 +142,7 @@ def fail(md): mode="r", bookmarks=bookmarks, db=SYSTEM_DATABASE, - on_success=metadata.update, on_failure=fail + on_success=metadata.update ) else: self.run( @@ -161,7 +151,7 @@ def fail(md): mode="r", bookmarks=bookmarks, db=SYSTEM_DATABASE, - on_success=metadata.update, on_failure=fail + on_success=metadata.update ) self.pull(on_success=metadata.update, on_records=records.extend) self.send_all() @@ -409,18 +399,6 @@ def route(self, database=None, imp_user=None, bookmarks=None): ) ) - def fail(md): - from neo4j._exceptions import BoltRoutingError - code = md.get("code") - if code == "Neo.ClientError.Database.DatabaseNotFound": - return # surface this error to the user - elif code == "Neo.ClientError.Procedure.ProcedureNotFound": - raise BoltRoutingError("Server does not support routing", - self.unresolved_address) - else: - raise BoltRoutingError("Routing support broken on server", - self.unresolved_address) - routing_context = self.routing_context or {} log.debug("[#%04X] C: ROUTE %r %r %r", self.local_port, routing_context, bookmarks, database) @@ -431,8 +409,7 @@ def fail(md): bookmarks = list(bookmarks) self._append(b"\x66", (routing_context, bookmarks, database), response=Response(self, "route", - on_success=metadata.update, - on_failure=fail)) + on_success=metadata.update)) self.send_all() self.fetch_all() return [metadata.get("rt")] @@ -476,18 +453,6 @@ class Bolt4x4(Bolt4x3): PROTOCOL_VERSION = Version(4, 4) def route(self, database=None, imp_user=None, bookmarks=None): - def fail(md): - from neo4j._exceptions import BoltRoutingError - code = md.get("code") - if code == "Neo.ClientError.Database.DatabaseNotFound": - return # surface this error to the user - elif code == "Neo.ClientError.Procedure.ProcedureNotFound": - raise BoltRoutingError("Server does not support routing", - self.unresolved_address) - else: - raise BoltRoutingError("Routing support broken on server", - self.unresolved_address) - routing_context = self.routing_context or {} db_context = {} if database is not None: @@ -503,8 +468,7 @@ def fail(md): bookmarks = list(bookmarks) self._append(b"\x66", (routing_context, bookmarks, db_context), response=Response(self, "route", - on_success=metadata.update, - on_failure=fail)) + on_success=metadata.update)) self.send_all() self.fetch_all() return [metadata.get("rt")] diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py index 3681a82b8..2152a5b6d 100644 --- a/tests/unit/test_exceptions.py +++ b/tests/unit/test_exceptions.py @@ -55,7 +55,6 @@ from neo4j._exceptions import ( BoltError, BoltHandshakeError, - BoltRoutingError, BoltConnectionError, BoltSecurityError, BoltConnectionBroken, From aceeefc3ba35eef4ea06609e55bd47a1a7157ff0 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Tue, 26 Oct 2021 14:15:27 +0200 Subject: [PATCH 2/2] Properly hydrate auth error as HELLO response --- neo4j/io/_common.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/neo4j/io/_common.py b/neo4j/io/_common.py index 7dab8e49d..0eec2e2d5 100644 --- a/neo4j/io/_common.py +++ b/neo4j/io/_common.py @@ -242,11 +242,12 @@ class InitResponse(Response): def on_failure(self, metadata): code = metadata.get("code") - message = metadata.get("message", "Connection initialisation failed") if code == "Neo.ClientError.Security.Unauthorized": - raise AuthError(message) + raise Neo4jError.hydrate(**metadata) else: - raise ServiceUnavailable(message) + raise ServiceUnavailable( + metadata.get("message", "Connection initialisation failed") + ) class CommitResponse(Response):