diff --git a/docs/source/index.rst b/docs/source/index.rst index b4789517..d7bcb23f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -154,6 +154,7 @@ Argument Renaming Changes * :code:`Transaction.run(statement, ...` is now :code:`Transaction.run(query, ...` * :code:`StatementResultSummary.statement` is now :code:`ResultSummary.query` * :code:`StatementResultSummary.statement_type` is now :code:`ResultSummary.query_type` +* :code:`StatementResultSummary.protocol_version` is now :code:`ResultSummary.server.protocol_version` Dependency Changes diff --git a/docs/source/usage_patterns.rst b/docs/source/usage_patterns.rst index a98c5d5b..3bcfa99f 100644 --- a/docs/source/usage_patterns.rst +++ b/docs/source/usage_patterns.rst @@ -266,6 +266,6 @@ Transaction Object Work Pattern query = Query("RETURN 1 AS x, timeout=10, metadata={"hello": 123}) - tx = session.begin_transaction(bookmark=None, metadata=None, timeout=None) + tx = session.begin_transaction(metadata=None, timeout=None) tx.run(query) tx.commit() diff --git a/neo4j/__init__.py b/neo4j/__init__.py index 3210af22..efffeb96 100644 --- a/neo4j/__init__.py +++ b/neo4j/__init__.py @@ -134,10 +134,6 @@ def driver(cls, uri, *, auth=None, **config): URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE, URI_SCHEME_NEO4J_SECURE, ) - from neo4j.conf import ( - TRUST_ALL_CERTIFICATES, - TRUST_SYSTEM_CA_SIGNED_CERTIFICATES - ) driver_type, security_type, parsed = parse_neo4j_uri(uri) @@ -252,8 +248,7 @@ def parse_targets(cls, *targets): targets = " ".join(targets) if not targets: targets = cls.default_targets - addresses = Address.parse_list(targets, default_host=cls.default_host, - default_port=cls.default_port) + addresses = Address.parse_list(targets, default_host=cls.default_host, default_port=cls.default_port) return addresses @@ -314,6 +309,26 @@ def verify_connectivity(self, **config): """ raise NotImplementedError + @experimental("Feature support query, based on Bolt Protocol Version and Neo4j Server Version will change in the future.") + def supports_multi_db(self): + """ Check if the server or cluster supports multi-databases. + + :return: Returns true if the server or cluster the driver connects to supports multi-databases, otherwise false. + :rtype: bool + """ + from neo4j.io._bolt4x0 import Bolt4x0 + + multi_database = False + cx = self._pool.acquire(access_mode=READ_ACCESS, timeout=self._pool.workspace_config.connection_acquisition_timeout, database=self._pool.workspace_config.database) + + # TODO: This logic should be inside the Bolt subclasses, because it can change depending on Bolt Protocol Version. + if cx.PROTOCOL_VERSION >= Bolt4x0.PROTOCOL_VERSION and cx.server_info.version_info() >= Version(4, 0, 0): + multi_database = True + + self._pool.release(cx) + + return multi_database + class BoltDriver(Direct, Driver): """ A :class:`.BoltDriver` is created from a ``bolt`` URI and addresses @@ -326,10 +341,18 @@ class BoltDriver(Direct, Driver): @classmethod def open(cls, target, *, auth=None, **config): + """ + :param target: + :param auth: + :param config: The values that can be specified are found in :class: `neo4j.PoolConfig` and :class: `neo4j.WorkspaceConfig` + + :return: + :rtype: :class: `neo4j.BoltDriver` + """ from neo4j.io import BoltPool address = cls.parse_target(target) pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) - pool = BoltPool.open(address, auth=auth, **pool_config) + pool = BoltPool.open(address, auth=auth, pool_config=pool_config, workspace_config=default_workspace_config) return cls(pool, default_workspace_config) def __init__(self, pool, default_workspace_config): @@ -338,6 +361,12 @@ def __init__(self, pool, default_workspace_config): self._default_workspace_config = default_workspace_config def session(self, **config): + """ + :param config: The values that can be specified are found in :class: `neo4j.SessionConfig` + + :return: + :rtype: :class: `neo4j.Session` + """ from neo4j.work.simple import Session session_config = SessionConfig(self._default_workspace_config, config) SessionConfig.consume(config) # Consume the config @@ -372,16 +401,15 @@ def open(cls, *targets, auth=None, routing_context=None, **config): from neo4j.io import Neo4jPool addresses = cls.parse_targets(*targets) pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) - pool = Neo4jPool.open(*addresses, auth=auth, routing_context=routing_context, **pool_config) + pool = Neo4jPool.open(*addresses, auth=auth, routing_context=routing_context, pool_config=pool_config, workspace_config=default_workspace_config) return cls(pool, default_workspace_config) def __init__(self, pool, default_workspace_config): - Routing.__init__(self, pool.routing_table.initial_routers) + Routing.__init__(self, pool.get_default_database_initial_router_addresses()) Driver.__init__(self, pool) self._default_workspace_config = default_workspace_config def session(self, **config): - from neo4j.work.simple import Session session_config = SessionConfig(self._default_workspace_config, config) SessionConfig.consume(config) # Consume the config return Session(self._pool, session_config) @@ -392,9 +420,6 @@ def pipeline(self, **config): PipelineConfig.consume(config) # Consume the config return Pipeline(self._pool, pipeline_config) - def get_routing_table(self): - return self._pool.routing_table - def verify_connectivity(self, **config): """ :raise ServiceUnavailable: raised if the server does not support routing or if routing support is broken. @@ -406,11 +431,11 @@ def _verify_routing_connectivity(self): from neo4j.exceptions import ServiceUnavailable from neo4j._exceptions import BoltHandshakeError - table = self.get_routing_table() + table = self._pool.get_routing_table_for_default_database() routing_info = {} for ix in list(table.routers): try: - routing_info[ix] = self._pool.fetch_routing_info(table.routers[0]) + routing_info[ix] = self._pool.fetch_routing_info(address=table.routers[0], timeout=self._default_workspace_config.connection_acquisition_timeout, database=self._default_workspace_config.database) except BoltHandshakeError as error: routing_info[ix] = None diff --git a/neo4j/api.py b/neo4j/api.py index 148e6114..20f82d92 100644 --- a/neo4j/api.py +++ b/neo4j/api.py @@ -53,7 +53,7 @@ TRUST_ALL_CERTIFICATES = "TRUST_ALL_CERTIFICATES" SYSTEM_DATABASE = "system" -DEFAULT_DATABASE = None +DEFAULT_DATABASE = None # Must be a non string hashable value class Auth: diff --git a/neo4j/conf.py b/neo4j/conf.py index 223d6b79..426dbf83 100644 --- a/neo4j/conf.py +++ b/neo4j/conf.py @@ -304,14 +304,15 @@ class SessionConfig(WorkspaceConfig): #: Bookmarks bookmarks = () - # Default AccessMode + #: Default AccessMode default_access_mode = WRITE_ACCESS # access_mode = DeprecatedAlias("default_access_mode") class TransactionConfig(Config): - """ Transaction configuration. + """ Transaction configuration. This is internal for now. + neo4j.session.begin_transaction neo4j.Query neo4j.unit_of_work @@ -322,3 +323,18 @@ class TransactionConfig(Config): #: Timeout timeout = None # seconds + + +class RoutingConfig(Config): + """ Neo4jDriver routing settings. This is internal for now. + """ + + #: Routing Table Purge_Delay + routing_table_purge_delay = 30.0 # seconds + # The TTL + routing_table_purge_delay should be used to check if the database routing table should be removed. + + #: Max Routing Failures + # max_routing_failures = 1 + + #: Retry Timeout Delay + # retry_timeout_delay = 5.0 # seconds diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index cebe6124..f69ec3ec 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -78,8 +78,13 @@ SessionExpired, ReadServiceUnavailable, WriteServiceUnavailable, + ConfigurationError, ) from neo4j.routing import RoutingTable +from neo4j.conf import ( + PoolConfig, + WorkspaceConfig, +) # Set up logger log = getLogger("neo4j") @@ -123,8 +128,7 @@ def protocol_handlers(cls, protocol_version=None): :raise TypeError: if protocol version is not passed in a tuple """ - # Carry out subclass imports locally to avoid circular - # dependency issues. + # Carry out Bolt subclass imports locally to avoid circular dependency issues. from neo4j.io._bolt3 import Bolt3 from neo4j.io._bolt4x0 import Bolt4x0 @@ -167,32 +171,34 @@ def ping(cls, address, *, timeout=None, **config): return protocol_version @classmethod - def open(cls, address, *, auth=None, timeout=None, **config): + def open(cls, address, *, auth=None, timeout=None, **pool_config): """ Open a new Bolt connection to a given server address. :param address: :param auth: - :param timeout: - :param config: + :param timeout: The connection timeout + :param pool_config: :return: :raise BoltHandshakeError: raised if the Bolt Protocol can not negotiate a protocol version. :raise ServiceUnavailable: raised if there was a connection issue. """ - config = PoolConfig.consume(config) - s, config.protocol_version, handshake, data = connect( + pool_config = PoolConfig.consume(pool_config) + s, pool_config.protocol_version, handshake, data = connect( address, timeout=timeout, - custom_resolver=config.resolver, - ssl_context=config.get_ssl_context(), - keep_alive=config.keep_alive, + custom_resolver=pool_config.resolver, + ssl_context=pool_config.get_ssl_context(), + keep_alive=pool_config.keep_alive, ) - if config.protocol_version == (3, 0): + if pool_config.protocol_version == (3, 0): + # Carry out Bolt subclass imports locally to avoid circular dependency issues. from neo4j.io._bolt3 import Bolt3 - connection = Bolt3(address, s, auth=auth, **config) - elif config.protocol_version == (4, 0): + connection = Bolt3(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent) + elif pool_config.protocol_version == (4, 0): + # Carry out Bolt subclass imports locally to avoid circular dependency issues. from neo4j.io._bolt4x0 import Bolt4x0 - connection = Bolt4x0(address, s, auth=auth, **config) + connection = Bolt4x0(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent) else: log.debug("[#%04X] S: ", s.getpeername()[1]) s.shutdown(SHUT_RDWR) @@ -327,15 +333,15 @@ def defunct(self): class IOPool: """ A collection of connections to one or more server addresses. """ - from neo4j.conf import PoolConfig, WorkspaceConfig - _default_acquire_timeout = WorkspaceConfig.connection_acquisition_timeout - - def __init__(self, opener, pool_config): + def __init__(self, opener, pool_config, workspace_config): assert callable(opener) assert isinstance(pool_config, PoolConfig) + assert isinstance(workspace_config, WorkspaceConfig) + self.opener = opener self.pool_config = pool_config + self.workspace_config = workspace_config self.connections = {} self.lock = RLock() self.cond = Condition(self.lock) @@ -355,7 +361,7 @@ def _acquire(self, address, timeout): """ t0 = perf_counter() if timeout is None: - timeout = self._default_acquire_timeout + timeout = self.workspace_config.connection_acquisition_timeout with self.lock: try: @@ -453,7 +459,7 @@ def deactivate(self, address): if not connections: self.remove(address) - def on_write_failure(self, address): + def on_write_failure(self, *, address): raise WriteServiceUnavailable("No write service available for pool {}".format(self)) def remove(self, address): @@ -482,25 +488,25 @@ def close(self): class BoltPool(IOPool): @classmethod - def open(cls, address, *, auth=None, **config): - pool_config = PoolConfig.consume(config) + def open(cls, address, *, auth=None, pool_config, workspace_config): def opener(addr, timeout): return Bolt.open(addr, auth=auth, timeout=timeout, **pool_config) - pool = cls(opener, pool_config, address) + pool = cls(opener, pool_config, workspace_config, address) seeds = [pool.acquire() for _ in range(pool_config.init_size)] pool.release(*seeds) return pool - def __init__(self, opener, pool_config, address): - super(BoltPool, self).__init__(opener, pool_config) + def __init__(self, opener, pool_config, workspace_config, address): + super(BoltPool, self).__init__(opener, pool_config, workspace_config) self.address = address def __repr__(self): return "<{} address={!r}>".format(self.__class__.__name__, self.address) - def acquire(self, access_mode=None, timeout=None): + def acquire(self, *, access_mode=None, timeout=None, database=None): + # The access_mode and database is not needed for a direct connection, its just there for consistency. return self._acquire(self.address, timeout) @@ -509,46 +515,111 @@ class Neo4jPool(IOPool): """ @classmethod - def open(cls, *addresses, auth=None, routing_context=None, **config): - pool_config = PoolConfig.consume(config) + def open(cls, *addresses, auth=None, routing_context=None, pool_config=None, workspace_config=None): def opener(addr, timeout): return Bolt.open(addr, auth=auth, timeout=timeout, **pool_config) - pool = cls(opener, pool_config, addresses, routing_context) + pool = cls(opener, pool_config, workspace_config, routing_context, addresses) + try: - pool.update_routing_table() + pool.update_routing_table(database=workspace_config.database) except Exception: pool.close() raise else: return pool - def __init__(self, opener, pool_config, addresses, routing_context): - super(Neo4jPool, self).__init__(opener, pool_config) - self.routing_table = RoutingTable(addresses) + def __init__(self, opener, pool_config, workspace_config, routing_context, addresses): + super(Neo4jPool, self).__init__(opener, pool_config, workspace_config) + # Each database have a routing table, the default database is a special case. + log.debug("[#0000] C: routing addresses %r", addresses) + self.routing_tables = {workspace_config.database: RoutingTable(database=workspace_config.database, routers=addresses)} self.routing_context = routing_context - self.missing_writer = False + # self.missing_writer = False self.refresh_lock = Lock() def __repr__(self): - return "<{} addresses={!r}>".format(self.__class__.__name__, - self.routing_table.initial_routers) + """ The representation shows the initial routing addresses. + + :return: The representation + :rtype: str + """ + return "<{} addresses={!r}>".format(self.__class__.__name__, self.get_default_database_initial_router_addresses()) @property - def initial_address(self): - return self.routing_table.initial_routers[0] + def first_initial_routing_address(self): + return self.get_default_database_initial_router_addresses()[0] + + def get_default_database_initial_router_addresses(self): + """ Get the initial router addresses for the default database. + + :return: + :rtype: OrderedSet + """ + return self.get_routing_table_for_default_database().initial_routers + + def get_default_database_router_addresses(self): + """ Get the router addresses for the default database. + + :return: + :rtype: OrderedSet + """ + return self.get_routing_table_for_default_database().routers + + def get_routing_table_for_default_database(self): + return self.routing_tables[self.workspace_config.database] - def fetch_routing_info(self, address, timeout=None): + def create_routing_table(self, database): + if database not in self.routing_tables: + self.routing_tables[database] = RoutingTable(database=database, routers=self.get_default_database_initial_router_addresses()) + + def fetch_routing_info(self, *, address, timeout, database): """ Fetch raw routing info from a given router address. :param address: router address :param timeout: seconds + :param database: the data base name to get routing table for :return: list of routing records or None if no connection could be established :raise ServiceUnavailable: if the server does not support routing or if routing support is broken """ + + # The name of system database is fixed and named as "system". + # It cannot be changed for a single instance or a cluster. (We can reliably assume that the system db exists on each instance.) + # + # Database name is NOT case sensitive. + # + # Each cluster member will host the exact same databases. For example, if a cluster member A has databases "foo" and + # "system", then all other members in the cluster should also have and only have "foo" and "system". + # However at a certain time, the cluster members may or may not up-to-date, as a result, cluster members may contain different databases. + # + # Maintain a routing table for each database. + # + # Default database is named "neo4j", (this can be renamed on the Neo4j server). + # + # Any core member in a cluster can provide a routing table for any database inside the cluster. + # The seed_url can be used to find all databases in the cluster. + # + # If the driver failed to refresh routing table with all known routers, then the driver should retry a few times before it raises a ServiceUnavailable. + # + # A valid routing table should at least have one router and one reader. + # + # To prevent the routing tables from growing infinitely. + # Stale/Aged routing tables is removed when there is a failure to obtain a routing table. + # Remove a routing table if it have been aged, timeout = TTL + RoutingConfig.routing_table_purge_delay + + # Carry out Bolt subclass imports locally to avoid circular dependency issues. + from neo4j.io._bolt3 import Bolt3 + from neo4j.io._bolt4x0 import Bolt4x0 + + from neo4j.api import ( + SYSTEM_DATABASE, + DEFAULT_DATABASE, + READ_ACCESS, + ) + metadata = {} records = [] @@ -560,10 +631,43 @@ def fail(md): try: with self._acquire(address, timeout) as cx: - _, _, server_version = (cx.server.agent or "").partition("/") + _, _, server_version = (cx.server_info.agent or "").partition("/") log.debug("[#%04X] C: query=%r", cx.local_port, self.routing_context or {}) - cx.run("CALL dbms.cluster.routing.getRoutingTable($context)", - {"context": self.routing_context}, on_success=metadata.update, on_failure=fail) + + if database is None: + database = self.workspace_config.database + + # TODO: This logic should be inside the Bolt subclasses, because it can change depending on Bolt Protocol Version. + if cx.PROTOCOL_VERSION == Bolt3.PROTOCOL_VERSION: + if database != DEFAULT_DATABASE: + raise ConfigurationError("Database name parameter for selecting database is not supported in Bolt Protocol {!r}. Database name {!r}. Server Agent {!r}.".format( + Bolt3.PROTOCOL_VERSION, database, cx.server_info.agent)) + cx.run( + "CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering. + {"context": self.routing_context}, + mode="r", # Bolt Protocol Version(3, 0) supports mode + on_success=metadata.update, + on_failure=fail, + ) + elif cx.PROTOCOL_VERSION == Bolt4x0.PROTOCOL_VERSION: + if database == DEFAULT_DATABASE: + cx.run( + "CALL dbms.routing.getRoutingTable($context)", + {"context": self.routing_context}, + mode="r", + db=SYSTEM_DATABASE, + on_success=metadata.update, + on_failure=fail, + ) + else: + cx.run( + "CALL dbms.routing.getRoutingTable($context, $database)", + {"context": self.routing_context, "database": database}, + mode="r", + db=SYSTEM_DATABASE, + on_success=metadata.update, + on_failure=fail, + ) cx.pull(on_success=metadata.update, on_records=records.extend) cx.send_all() cx.fetch_all() @@ -573,20 +677,24 @@ def fail(md): except BoltRoutingError as error: raise ServiceUnavailable(*error.args) except ServiceUnavailable: - self.deactivate(address) + self.deactivate(address=address) return None - def fetch_routing_table(self, address, timeout=None): + def fetch_routing_table(self, *, address, timeout, database): """ Fetch a routing table from a given router address. :param address: router address :param timeout: seconds + :param database: the database name + :type: str + :return: a new RoutingTable instance or None if the given router is currently unable to provide routing information - :raise ServiceUnavailable: if no writers are available - :raise BoltProtocolError: if the routing information received is unusable + + :raise neo4j.exceptions.ServiceUnavailable: if no writers are available + :raise neo4j._exceptions.BoltProtocolError: if the routing information received is unusable """ - new_routing_info = self.fetch_routing_info(address, timeout) + new_routing_info = self.fetch_routing_info(address=address, timeout=timeout, database=database) if new_routing_info is None: return None elif not new_routing_info: @@ -594,17 +702,15 @@ def fetch_routing_table(self, address, timeout=None): else: servers = new_routing_info[0]["servers"] ttl = new_routing_info[0]["ttl"] - new_routing_table = RoutingTable.parse_routing_info(servers, ttl) + new_routing_table = RoutingTable.parse_routing_info(database=database, servers=servers, ttl=ttl) # Parse routing info and count the number of each type of server num_routers = len(new_routing_table.routers) num_readers = len(new_routing_table.readers) - num_writers = len(new_routing_table.writers) - # No writers are available. This likely indicates a temporary state, + # num_writers = len(new_routing_table.writers) + # If no writers are available. This likely indicates a temporary state, # such as leader switching, so we should not signal an error. - # When no writers available, then we flag we are reading in absence of writer - self.missing_writer = (num_writers == 0) # No routers if num_routers == 0: @@ -617,54 +723,59 @@ def fetch_routing_table(self, address, timeout=None): # At least one of each is fine, so return this table return new_routing_table - def update_routing_table_from(self, *routers): + def update_routing_table_from(self, *routers, database=None): """ Try to update routing tables with the given routers. :return: True if the routing table is successfully updated, otherwise False """ - log.debug("Attempting to update routing table from " - "{}".format(", ".join(map(repr, routers)))) + log.debug("Attempting to update routing table from {}".format(", ".join(map(repr, routers)))) for router in routers: - new_routing_table = self.fetch_routing_table(router) + new_routing_table = self.fetch_routing_table(address=router, timeout=self.pool_config.connection_timeout, database=database) if new_routing_table is not None: - self.routing_table.update(new_routing_table) - log.debug("Successfully updated routing table from " - "{!r} ({!r})".format(router, self.routing_table)) + self.routing_tables[database].update(new_routing_table) + log.debug("[#0000] C: address={!r} ({!r})".format(router, self.routing_tables[database])) return True return False - def update_routing_table(self): + def update_routing_table(self, *, database): """ Update the routing table from the first router able to provide valid routing information. + + :param database: The database name + + :raise neo4j.exceptions.ServiceUnavailable: """ # copied because it can be modified - existing_routers = list(self.routing_table.routers) + existing_routers = list(self.routing_tables[database].routers) has_tried_initial_routers = False - if self.missing_writer: + if self.routing_tables[database].missing_fresh_writer(): + # TODO: Test this state has_tried_initial_routers = True - if self.update_routing_table_from(self.initial_address): + if self.update_routing_table_from(self.first_initial_routing_address, database=database): + # Why is only the first initial routing address used? return - if self.update_routing_table_from(*existing_routers): + if self.update_routing_table_from(*existing_routers, database=database): return - if not has_tried_initial_routers and self.initial_address not in existing_routers: - if self.update_routing_table_from(self.initial_address): + if not has_tried_initial_routers and self.first_initial_routing_address not in existing_routers: + if self.update_routing_table_from(self.first_initial_routing_address, database=database): + # Why is only the first initial routing address used? return # None of the routers have been successful, so just fail log.error("Unable to retrieve routing information") raise ServiceUnavailable("Unable to retrieve routing information") - def update_connection_pool(self): - servers = self.routing_table.servers() + def update_connection_pool(self, *, database): + servers = self.routing_tables[database].servers() for address in list(self.connections): if address not in servers: super(Neo4jPool, self).deactivate(address) - def ensure_routing_table_is_fresh(self, access_mode): + def ensure_routing_table_is_fresh(self, *, access_mode, database): """ Update the routing table if stale. This method performs two freshness checks, before and after acquiring @@ -678,27 +789,34 @@ def ensure_routing_table_is_fresh(self, access_mode): :return: `True` if an update was required, `False` otherwise. """ from neo4j.api import READ_ACCESS - if self.routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)): + if self.routing_tables[database].is_fresh(readonly=(access_mode == READ_ACCESS)): + # Readers are fresh. return False with self.refresh_lock: - if self.routing_table.is_fresh(readonly=(access_mode == READ_ACCESS)): - if access_mode == READ_ACCESS: - # if reader is fresh but writers is not fresh, then we are reading in absence of writer - self.missing_writer = not self.routing_table.is_fresh(readonly=False) - return False - self.update_routing_table() - self.update_connection_pool() + + self.update_routing_table(database=database) + self.update_connection_pool(database=database) + + for database in list(self.routing_tables.keys()): + # Remove unused databases in the routing table + # Remove the routing table after a timeout = TTL + 30s + log.debug("[#0000] C: database=%s", database) + if self.routing_tables[database].should_be_purged_from_memory() and database != self.workspace_config.database: + del self.routing_tables[database] + return True - def _select_address(self, access_mode=None): + def _select_address(self, *, access_mode, database): from neo4j.api import READ_ACCESS """ Selects the address with the fewest in-use connections. """ - self.ensure_routing_table_is_fresh(access_mode) + self.create_routing_table(database) + self.ensure_routing_table_is_fresh(access_mode=access_mode, database=database) + log.debug("[#0000] C: %r", self.routing_tables) if access_mode == READ_ACCESS: - addresses = self.routing_table.readers + addresses = self.routing_tables[database].readers else: - addresses = self.routing_table.writers + addresses = self.routing_tables[database].writers addresses_by_usage = {} for address in addresses: addresses_by_usage.setdefault(self.in_use_connection_count(address), []).append(address) @@ -709,22 +827,24 @@ def _select_address(self, access_mode=None): raise WriteServiceUnavailable("No write service currently available") return choice(addresses_by_usage[min(addresses_by_usage)]) - def acquire(self, access_mode=None, timeout=None): + def acquire(self, *, access_mode, timeout, database): from neo4j.api import check_access_mode access_mode = check_access_mode(access_mode) while True: try: - address = self._select_address(access_mode) + # Get an address for a connection that have the fewest in-use connections. + address = self._select_address(access_mode=access_mode, database=database) + log.debug("[#0000] C: database=%r address=%r", database, address) except (ReadServiceUnavailable, WriteServiceUnavailable) as err: raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err try: connection = self._acquire(address, timeout=timeout) # should always be a resolved address except ServiceUnavailable: - self.deactivate(address) + self.deactivate(address=address) else: return connection - def deactivate(self, address): + def deactivate(self, *, address): """ Deactivate an address from the connection pool, if present, remove from the routing table and also closing all idle connections to that address. @@ -732,18 +852,20 @@ def deactivate(self, address): log.debug("[#0000] C: Deactivating address %r", address) # We use `discard` instead of `remove` here since the former # will not fail if the address has already been removed. - self.routing_table.routers.discard(address) - self.routing_table.readers.discard(address) - self.routing_table.writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_table) + for database in self.routing_tables.keys(): + self.routing_tables[database].routers.discard(address) + self.routing_tables[database].readers.discard(address) + self.routing_tables[database].writers.discard(address) + log.debug("[#0000] C: table=%r", self.routing_tables) super(Neo4jPool, self).deactivate(address) - def on_write_failure(self, address): + def on_write_failure(self, *, address): """ Remove a writer address from the routing table, if present. """ log.debug("[#0000] C: Removing writer %r", address) - self.routing_table.writers.discard(address) - log.debug("[#0000] C: table=%r", self.routing_table) + for database in self.routing_tables.keys(): + self.routing_tables[database].writers.discard(address) + log.debug("[#0000] C: table=%r", self.routing_tables) def _connect(resolved_address, timeout, keep_alive): diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index e00bd3ee..7b1c200e 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -25,6 +25,8 @@ from time import perf_counter from neo4j.api import ( Version, + READ_ACCESS, + WRITE_ACCESS, ) from neo4j.io._courier import MessageInbox from neo4j.meta import get_user_agent @@ -36,6 +38,7 @@ NotALeader, ForbiddenOnReadOnlyDatabase, SessionExpired, + ConfigurationError, ) from neo4j._exceptions import ( BoltIncompleteCommitError, @@ -61,9 +64,6 @@ class Bolt3(Bolt): PROTOCOL_VERSION = Version(3, 0) - #: Server details for this connection - server = None - # The socket in_use = False @@ -76,21 +76,19 @@ class Bolt3(Bolt): #: The pool of which this connection is a member pool = None - def __init__(self, unresolved_address, sock, *, auth=None, **config): - self.config = PoolConfig.consume(config) + def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None): self.unresolved_address = unresolved_address self.socket = sock - self.server = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION) + self.server_info = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION) self.outbox = Outbox() self.inbox = Inbox(self.socket, on_error=self._set_defunct) self.packer = Packer(self.outbox) self.unpacker = Unpacker(self.inbox) self.responses = deque() - self._max_connection_lifetime = self.config.max_connection_lifetime + self._max_connection_lifetime = max_connection_lifetime self._creation_timestamp = perf_counter() # Determine the user agent - user_agent = self.config.user_agent if user_agent: self.user_agent = user_agent else: @@ -140,19 +138,18 @@ def hello(self): logged_headers["credentials"] = "*******" log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) self._append(b"\x01", (headers,), - response=InitResponse(self, on_success=self.server.metadata.update)) + response=InitResponse(self, on_success=self.server_info.metadata.update)) self.send_all() self.fetch_all() - def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, - timeout=None, db=None, **handlers): + def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): if db is not None: - raise ValueError("Database selection is not supported in Bolt 3") + raise ConfigurationError("Database name parameter for selecting database is not supported in Bolt Protocol {!r}. Database name {!r}.".format(Bolt3.PROTOCOL_VERSION, db)) if not parameters: parameters = {} extra = {} - if mode: - extra["mode"] = mode + if mode in (READ_ACCESS, "r"): + extra["mode"] = "r" # It will default to mode "w" if nothing is specified if bookmarks: try: extra["bookmarks"] = list(bookmarks) @@ -191,13 +188,12 @@ def pull(self, n=-1, qid=-1, **handlers): log.debug("[#%04X] C: PULL_ALL", self.local_port) self._append(b"\x3F", (), Response(self, **handlers)) - def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, - db=None, **handlers): + def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): if db is not None: - raise ValueError("Database selection is not supported in Bolt 3") + raise ConfigurationError("Database name parameter for selecting database is not supported in Bolt Protocol {!r}. Database name {!r}.".format(Bolt3.PROTOCOL_VERSION, db)) extra = {} - if mode: - extra["mode"] = mode + if mode in (READ_ACCESS, "r"): + extra["mode"] = "r" # It will default to mode "w" if nothing is specified if bookmarks: try: extra["bookmarks"] = list(bookmarks) @@ -260,11 +256,11 @@ def send_all(self): """ if self.closed(): raise ServiceUnavailable("Failed to write to closed connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) if self.defunct(): raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) try: self._send_all() @@ -272,7 +268,7 @@ def send_all(self): log.error("Failed to write data to connection " "{!r} ({!r}); ({!r})". format(self.unresolved_address, - self.server.address, + self.server_info.address, "; ".join(map(repr, error.args)))) if self.pool: self.pool.deactivate(self.unresolved_address) @@ -286,11 +282,11 @@ def fetch_message(self): """ if self._closed: raise ServiceUnavailable("Failed to read from closed connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) if self._defunct: raise ServiceUnavailable("Failed to read from defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) if not self.responses: return 0, 0 @@ -302,7 +298,7 @@ def fetch_message(self): log.error("Failed to read data from connection " "{!r} ({!r}); ({!r})". format(self.unresolved_address, - self.server.address, + self.server_info.address, "; ".join(map(repr, error.args)))) if self.pool: self.pool.deactivate(self.unresolved_address) @@ -329,11 +325,11 @@ def fetch_message(self): response.on_failure(summary_metadata or {}) except (ServiceUnavailable, DatabaseUnavailable): if self.pool: - self.pool.deactivate(self.unresolved_address), + self.pool.deactivate(address=self.unresolved_address), raise except (NotALeader, ForbiddenOnReadOnlyDatabase): if self.pool: - self.pool.on_write_failure(self.unresolved_address), + self.pool.on_write_failure(address=self.unresolved_address), raise else: raise BoltProtocolError("Unexpected response message with signature %02X" % summary_signature, address=self.unresolved_address) @@ -344,7 +340,7 @@ def _set_defunct(self, error=None): direct_driver = isinstance(self.pool, BoltPool) message = ("Failed to read from defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) log.error(message) # We were attempting to receive data but the connection @@ -354,7 +350,7 @@ def _set_defunct(self, error=None): self._defunct = True self.close() if self.pool: - self.pool.deactivate(self.unresolved_address) + self.pool.deactivate(address=self.unresolved_address) # Iterate through the outstanding responses, and if any correspond # to COMMIT requests then raise an error to signal that we are # unable to confirm that the COMMIT completed successfully. diff --git a/neo4j/io/_bolt4x0.py b/neo4j/io/_bolt4x0.py index 3f60eba6..4cf380fe 100644 --- a/neo4j/io/_bolt4x0.py +++ b/neo4j/io/_bolt4x0.py @@ -25,6 +25,8 @@ from time import perf_counter from neo4j.api import ( Version, + READ_ACCESS, + WRITE_ACCESS, ) from neo4j.io._courier import MessageInbox from neo4j.meta import get_user_agent @@ -61,9 +63,6 @@ class Bolt4x0(Bolt): PROTOCOL_VERSION = Version(4, 0) - #: Server details for this connection - server = None - # The socket in_use = False @@ -76,21 +75,19 @@ class Bolt4x0(Bolt): #: The pool of which this connection is a member pool = None - def __init__(self, unresolved_address, sock, *, auth=None, **config): - self.config = PoolConfig.consume(config) + def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None): self.unresolved_address = unresolved_address self.socket = sock - self.server = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION) + self.server_info = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION) self.outbox = Outbox() self.inbox = Inbox(self.socket, on_error=self._set_defunct) self.packer = Packer(self.outbox) self.unpacker = Unpacker(self.inbox) self.responses = deque() - self._max_connection_lifetime = self.config.max_connection_lifetime + self._max_connection_lifetime = max_connection_lifetime # self.pool_config.max_connection_lifetime self._creation_timestamp = perf_counter() # Determine the user agent - user_agent = self.config.user_agent if user_agent: self.user_agent = user_agent else: @@ -140,17 +137,16 @@ def hello(self): logged_headers["credentials"] = "*******" log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers) self._append(b"\x01", (headers,), - response=InitResponse(self, on_success=self.server.metadata.update)) + response=InitResponse(self, on_success=self.server_info.metadata.update)) self.send_all() self.fetch_all() - def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, - timeout=None, db=None, **handlers): + def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): if not parameters: parameters = {} extra = {} - if mode: - extra["mode"] = mode + if mode in (READ_ACCESS, "r"): + extra["mode"] = "r" # It will default to mode "w" if nothing is specified if db: extra["db"] = db if bookmarks: @@ -192,8 +188,8 @@ def pull(self, n=-1, qid=-1, **handlers): def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers): extra = {} - if mode: - extra["mode"] = mode + if mode in (READ_ACCESS, "r"): + extra["mode"] = "r" # It will default to mode "w" if nothing is specified if db: extra["db"] = db if bookmarks: @@ -258,11 +254,11 @@ def send_all(self): """ if self.closed(): raise ServiceUnavailable("Failed to write to closed connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) if self.defunct(): raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) try: self._send_all() @@ -270,7 +266,7 @@ def send_all(self): log.error("Failed to write data to connection " "{!r} ({!r}); ({!r})". format(self.unresolved_address, - self.server.address, + self.server_info.address, "; ".join(map(repr, error.args)))) if self.pool: self.pool.deactivate(self.unresolved_address) @@ -284,11 +280,11 @@ def fetch_message(self): """ if self._closed: raise ServiceUnavailable("Failed to read from closed connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) if self._defunct: raise ServiceUnavailable("Failed to read from defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) if not self.responses: return 0, 0 @@ -300,7 +296,7 @@ def fetch_message(self): log.error("Failed to read data from connection " "{!r} ({!r}); ({!r})". format(self.unresolved_address, - self.server.address, + self.server_info.address, "; ".join(map(repr, error.args)))) if self.pool: self.pool.deactivate(self.unresolved_address) @@ -327,11 +323,11 @@ def fetch_message(self): response.on_failure(summary_metadata or {}) except (ServiceUnavailable, DatabaseUnavailable): if self.pool: - self.pool.deactivate(self.unresolved_address), + self.pool.deactivate(address=self.unresolved_address), raise except (NotALeader, ForbiddenOnReadOnlyDatabase): if self.pool: - self.pool.on_write_failure(self.unresolved_address), + self.pool.on_write_failure(address=self.unresolved_address), raise else: raise BoltProtocolError("Unexpected response message with signature %02X" % summary_signature, self.unresolved_address) @@ -342,7 +338,7 @@ def _set_defunct(self, error=None): direct_driver = isinstance(self.pool, BoltPool) message = ("Failed to read from defunct connection {!r} ({!r})".format( - self.unresolved_address, self.server.address)) + self.unresolved_address, self.server_info.address)) log.error(message) # We were attempting to receive data but the connection @@ -352,7 +348,7 @@ def _set_defunct(self, error=None): self._defunct = True self.close() if self.pool: - self.pool.deactivate(self.unresolved_address) + self.pool.deactivate(address=self.unresolved_address) # Iterate through the outstanding responses, and if any correspond # to COMMIT requests then raise an error to signal that we are # unable to confirm that the COMMIT completed successfully. diff --git a/neo4j/routing.py b/neo4j/routing.py index 109a2395..b79c2cb0 100644 --- a/neo4j/routing.py +++ b/neo4j/routing.py @@ -81,7 +81,7 @@ def replace(self, elements=()): class RoutingTable: @classmethod - def parse_routing_info(cls, servers, ttl): + def parse_routing_info(cls, *, database, servers, ttl): """ Parse the records returned from the procedure call and return a new RoutingTable instance. """ @@ -103,18 +103,20 @@ def parse_routing_info(cls, servers, ttl): except (KeyError, TypeError): raise ValueError("Cannot parse routing info") else: - return cls(routers, readers, writers, ttl) + return cls(database=database, routers=routers, readers=readers, writers=writers, ttl=ttl) - def __init__(self, routers=(), readers=(), writers=(), ttl=0): + def __init__(self, *, database, routers=(), readers=(), writers=(), ttl=0): self.initial_routers = OrderedSet(routers) self.routers = OrderedSet(routers) self.readers = OrderedSet(readers) self.writers = OrderedSet(writers) self.last_updated_time = perf_counter() self.ttl = ttl + self.database = database def __repr__(self): - return "RoutingTable(routers=%r, readers=%r, writers=%r, last_updated_time=%r, ttl=%r)" % ( + return "RoutingTable(database=%r routers=%r, readers=%r, writers=%r, last_updated_time=%r, ttl=%r)" % ( + self.database, self.routers, self.readers, self.writers, @@ -140,6 +142,25 @@ def is_fresh(self, readonly=False): log.debug("[#0000] C: Table has_server_for_mode=%r", has_server_for_mode) return not expired and self.routers and has_server_for_mode + def missing_fresh_writer(self): + """ Check if the routing table have a fresh write address. + + :return: Return true if it does not have a fresh write address. + :rtype: bool + """ + return not self.is_fresh(readonly=False) + + def should_be_purged_from_memory(self): + """ Check if the routing table is stale and not used for a long time and should be removed from memory. + + :return: Returns true if it is old and not used for a while. + :rtype: bool + """ + from neo4j.conf import RoutingConfig + perf_time = perf_counter() + log.debug("[#0000] C: last_updated_time=%r perf_time=%r", self.last_updated_time, perf_time) + return self.last_updated_time + self.ttl + RoutingConfig.routing_table_purge_delay <= perf_time + def update(self, new_routing_table): """ Update the current routing table with new routing information from a replacement table. diff --git a/neo4j/work/__init__.py b/neo4j/work/__init__.py index ed4448ef..c97cc7ca 100644 --- a/neo4j/work/__init__.py +++ b/neo4j/work/__init__.py @@ -49,7 +49,7 @@ def _connect(self, access_mode): if access_mode == self._connection_access_mode: return self._disconnect(sync=True) - self._connection = self._pool.acquire(access_mode) + self._connection = self._pool.acquire(access_mode=access_mode, timeout=self._config.connection_acquisition_timeout, database=self._config.database) self._connection_access_mode = access_mode def _disconnect(self, sync): diff --git a/neo4j/work/pipelining.py b/neo4j/work/pipelining.py index 1470d0ef..ce5d0c9d 100644 --- a/neo4j/work/pipelining.py +++ b/neo4j/work/pipelining.py @@ -25,7 +25,9 @@ from neo4j.work import Workspace from neo4j.conf import WorkspaceConfig - +from neo4j.api import ( + WRITE_ACCESS, +) class PipelineConfig(WorkspaceConfig): @@ -38,7 +40,7 @@ class Pipeline(Workspace): def __init__(self, pool, config): assert isinstance(config, PipelineConfig) super(Pipeline, self).__init__(pool, config) - self._connect("WRITE") + self._connect(WRITE_ACCESS) self._flush_every = config.flush_every self._data = deque() self._pull_lock = Lock() diff --git a/neo4j/work/simple.py b/neo4j/work/simple.py index 23912bd5..2a64fa5c 100644 --- a/neo4j/work/simple.py +++ b/neo4j/work/simple.py @@ -97,7 +97,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() - def _connect(self, access_mode): + def _connect(self, access_mode, database): if access_mode is None: access_mode = self._config.default_access_mode if self._connection: @@ -105,7 +105,7 @@ def _connect(self, access_mode): self._connection.send_all() self._connection.fetch_all() self._disconnect() - self._connection = self._pool.acquire(access_mode, timeout=self._config.connection_acquisition_timeout) + self._connection = self._pool.acquire(access_mode=access_mode, timeout=self._config.connection_acquisition_timeout, database=database) def _disconnect(self): if self._connection: @@ -157,10 +157,10 @@ def run(self, query, parameters=None, **kwparameters): raise TypeError("query must be a string or a Query instance") if not self._connection: - self._connect(self._config.default_access_mode) + self._connect(self._config.default_access_mode, database=self._config.database) cx = self._connection protocol_version = cx.PROTOCOL_VERSION - server = cx.server + server_info = cx.server_info has_transaction = self.has_transaction() @@ -176,7 +176,7 @@ def fail(_): result_metadata = { "query": query_text, "parameters": parameters, - "server": server, + "server": server_info, "protocol_version": protocol_version, } run_metadata = { @@ -195,21 +195,40 @@ def done(summary_metadata): self._last_result = result = Result(self, hydrant, result_metadata) + access_mode = None + db = None + bookmarks = None + if has_transaction: + # Explicit Transaction Run does not carry any extra values. RUN "query" {parameters} {extra} if query_metadata: raise ValueError("Metadata can only be attached at transaction level") if query_timeout: raise ValueError("Timeouts only apply at transaction level") - # TODO: fail if explicit database name has been set + access_mode = None + db = None + bookmarks = None else: run_metadata["bookmarks"] = self._bookmarks_in - - # TODO: capture ValueError and surface as SessionError/TransactionError if - # TODO: explicit database selection has been made - cx.run(query_text, parameters, **run_metadata) + access_mode = self._config.default_access_mode + db = self._config.database + bookmarks = run_metadata.get("bookmarks", self._config.bookmarks) + + # BOLT RUN + cx.run( + query_text, + parameters=parameters, + mode=access_mode, + bookmarks=bookmarks, + metadata=run_metadata["metadata"], + timeout=run_metadata["timeout"], + db=db, + on_success=run_metadata["on_success"], + on_failure=run_metadata["on_failure"], + ) + # BOLT PULL cx.pull( - on_records=lambda records: result._records.extend( - hydrant.hydrate_records(result.keys(), records)), + on_records=lambda records: result._records.extend(hydrant.hydrate_records(result.keys(), records)), on_success=done, on_failure=fail, on_summary=lambda: result.detach(sync=False), @@ -291,35 +310,39 @@ def has_transaction(self): def _close_transaction(self): self._transaction = None - def begin_transaction(self, bookmark=None, metadata=None, timeout=None): - """ Create a new :class:`.Transaction` within this session. - Calling this method with a bookmark is equivalent to + def begin_transaction(self, metadata=None, timeout=None): + """ Begin a new unmanaged transaction. Creates a new :class:`.Transaction` within this session. + At most one transaction may exist in a session at any point in time. + To maintain multiple concurrent transactions, use multiple concurrent sessions. - :param bookmark: a bookmark to which the server should - synchronise before beginning the transaction :param metadata: :param timeout: - :returns: new :class:`.Transaction` instance. - :raise: :class:`.TransactionError` if a transaction is already open + + :returns: A new transaction instance. + :rtype: :class:`neo4j.Transaction` + + :raises TransactionError: :class:`neo4j.exceptions.TransactionError` if a transaction is already open. """ + # TODO: Implement TransactionConfig consumption + if self.has_transaction(): raise TransactionError("Explicit transaction already open") - self._open_transaction(metadata=metadata, timeout=timeout) + self._open_transaction(access_mode=self._config.default_access_mode, database=self._config.database, metadata=metadata, timeout=timeout) return self._transaction - def _open_transaction(self, access_mode=None, metadata=None, timeout=None): + def _open_transaction(self, *, access_mode, database, metadata=None, timeout=None): self._transaction = Transaction(self, on_close=self._close_transaction) - self._connect(access_mode) - # TODO: capture ValueError and surface as SessionError/TransactionError if - # TODO: explicit database selection has been made - self._connection.begin(bookmarks=self._bookmarks_in, metadata=metadata, timeout=timeout) + self._connect(access_mode=access_mode, database=database) + self._connection.begin(bookmarks=self._bookmarks_in, metadata=metadata, timeout=timeout, mode=access_mode, db=database) def commit_transaction(self): """ Commit the current transaction. :returns: the bookmark returned from the server, if any - :raise: :class:`.TransactionError` if no transaction is currently open + :rtype: :class: `neo4j.Bookmark` + + :raises TransactionError: :class:`neo4j.exceptions.TransactionError` if no transaction is currently open """ if not self._transaction: raise TransactionError("No transaction to commit") @@ -371,7 +394,7 @@ def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs): t0 = perf_counter() while True: try: - self._open_transaction(access_mode, metadata, timeout) + self._open_transaction(access_mode=access_mode, database=self._config.database, metadata=metadata, timeout=timeout) tx = self._transaction try: result = unit_of_work(tx, *args, **kwargs) diff --git a/neo4j/work/summary.py b/neo4j/work/summary.py index 82292de8..2dab7121 100644 --- a/neo4j/work/summary.py +++ b/neo4j/work/summary.py @@ -26,17 +26,19 @@ BOLT_VERSION_3 = 3 BOLT_VERSION_4 = 4 +# TODO: This logic should be inside the Bolt subclasses, because it can change depending on Bolt Protocol Version. + class ResultSummary: """ A summary of execution returned with a :class:`.Result` object. """ - #: The version of Bolt protocol over which this result was obtained. - protocol_version = None - - #: The server on which this result was generated. + #: A :class: `neo4j.ServerInfo` instance. Provides some basic information of the server where the result is obtained from. server = None + #: The database name where this summary is obtained from. + database = None + #: The query that was executed to produce this result. query = None @@ -46,10 +48,10 @@ class ResultSummary: #: The type of query (``'r'`` = read-only, ``'rw'`` = read/write). query_type = None - #: A set of statistical information held in a :class:`.Counters` instance. + #: A :class:`.Counters` instance. Counters for operations the query triggered. counters = None - #: A :class:`.Plan` instance + #: A :class:`.Plan` instance. This describes how the database will execute the query. plan = None #: A :class:`.ProfiledPlan` instance @@ -61,6 +63,7 @@ class ResultSummary: #: The time it took for the server to consume the result. (milliseconds) result_consumed_after = None + #: A list of :class: `.Notification` instances #: Notifications provide extra information for a user executing a statement. #: They can be warnings about problematic queries or other valuable information that can be #: presented in a client. @@ -69,13 +72,13 @@ class ResultSummary: def __init__(self, **metadata): self.metadata = metadata - self.protocol_version = metadata.get("protocol_version") self.server = metadata.get("server") + self.database = metadata.get("db") self.query = metadata.get("query") self.parameters = metadata.get("parameters") self.query_type = metadata.get("type") self.counters = SummaryCounters(metadata.get("stats", {})) - if self.protocol_version[0] < BOLT_VERSION_3: + if self.server.protocol_version[0] < BOLT_VERSION_3: self.result_available_after = metadata.get("result_available_after") self.result_consumed_after = metadata.get("result_consumed_after") else: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6eff3b4f..9ec7e8e4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -35,7 +35,6 @@ from neo4j.io import Bolt - NEO4J_RELEASES = getenv("NEO4J_RELEASES", "snapshot-enterprise 3.5-enterprise").split() NEO4J_HOST = "localhost" NEO4J_PORTS = { @@ -331,7 +330,7 @@ def session(bolt_driver): @pytest.fixture() def protocol_version(session): result = session.run("RETURN 1") - yield session._connection.protocol_version + yield session._connection.server_info.protocol_version result.consume() diff --git a/tests/integration/test_bolt_driver.py b/tests/integration/test_bolt_driver.py index 09480213..0f772c71 100644 --- a/tests/integration/test_bolt_driver.py +++ b/tests/integration/test_bolt_driver.py @@ -21,11 +21,16 @@ import pytest -from neo4j import GraphDatabase +from neo4j import ( + GraphDatabase, + BoltDriver, + Version, +) from neo4j.exceptions import ( ServiceUnavailable, AuthError, ConfigurationError, + ClientError, ) from neo4j._exceptions import BoltHandshakeError @@ -51,20 +56,6 @@ def test_bolt_uri(bolt_uri, auth): # assert value == 1 -def test_neo4j_uri(neo4j_uri, auth): - # python -m pytest tests/integration/test_bolt_driver.py -s -v -k test_neo4j_uri - try: - with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: - with driver.session() as session: - value = session.run("RETURN 1").single().value() - assert value == 1 - except ServiceUnavailable as error: - if error.args[0] == "Server does not support routing": - pytest.skip(error.args[0]) - elif isinstance(error.__cause__, BoltHandshakeError): - pytest.skip(error.args[0]) - - def test_normal_use_case(bolt_driver): # python -m pytest tests/integration/test_bolt_driver.py -s -v -k test_normal_use_case session = bolt_driver.session() @@ -129,3 +120,50 @@ def test_should_fail_on_incorrect_password(bolt_uri): except ServiceUnavailable as error: if isinstance(error.__cause__, BoltHandshakeError): pytest.skip(error.args[0]) + + +def test_supports_multi_db(bolt_uri, auth): + # python -m pytest tests/integration/test_bolt_driver.py -s -v -k test_supports_multi_db + try: + driver = GraphDatabase.driver(bolt_uri, auth=auth) + assert isinstance(driver, BoltDriver) + except ServiceUnavailable as error: + if isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + with driver.session() as session: + result = session.run("RETURN 1") + value = result.single().value() # Consumes the result + summary = result.summary() + server_info = summary.server + + result = driver.supports_multi_db() + driver.close() + + if server_info.version_info() >= Version(4, 0, 0) and server_info.protocol_version >= Version(4, 0): + assert result is True + assert summary.database == "neo4j" # This is the default database name if not set explicitly on the Neo4j Server + assert summary.query_type == "r" + else: + assert result is False + assert summary.database is None + assert summary.query_type == "r" + + +def test_test_multi_db_specify_database(bolt_uri, auth): + # python -m pytest tests/integration/test_bolt_driver.py -s -v -k test_test_multi_db_specify_database + try: + with GraphDatabase.driver(bolt_uri, auth=auth, database="test_database") as driver: + with driver.session() as session: + result = session.run("RETURN 1") + assert next(result) == 1 + summary = result.summary() + assert summary.database == "test_database" + except ServiceUnavailable as error: + if isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + except ConfigurationError as error: + assert "Database name parameter for selecting database is not supported in Bolt Protocol Version(3, 0)" in error.args[0] + except ClientError as error: + # FAILURE {'code': 'Neo.ClientError.Database.DatabaseNotFound' - This message is sent from the server + assert error.args[0] == "Database does not exist. Database name: 'test_database'." diff --git a/tests/integration/test_neo4j_driver.py b/tests/integration/test_neo4j_driver.py new file mode 100644 index 00000000..dc8de0cd --- /dev/null +++ b/tests/integration/test_neo4j_driver.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) 2002-2020 "Neo4j," +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from neo4j import ( + GraphDatabase, + Neo4jDriver, + Version, +) +from neo4j.exceptions import ( + ServiceUnavailable, + ConfigurationError, + ClientError, +) +from neo4j._exceptions import ( + BoltHandshakeError, +) +from neo4j.conf import ( + RoutingConfig, +) + +# python -m pytest tests/integration/test_neo4j_driver.py -s -v + + +def test_neo4j_uri(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_uri + try: + with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: + with driver.session() as session: + value = session.run("RETURN 1").single().value() + assert value == 1 + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + +def test_supports_multi_db(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_supports_multi_db + try: + driver = GraphDatabase.driver(neo4j_uri, auth=auth) + assert isinstance(driver, Neo4jDriver) + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + with driver.session() as session: + result = session.run("RETURN 1") + value = result.single().value() # Consumes the result + summary = result.summary() + server_info = summary.server + + result = driver.supports_multi_db() + driver.close() + + if server_info.version_info() >= Version(4, 0, 0) and server_info.protocol_version >= Version(4, 0): + assert result is True + assert summary.database == "neo4j" # This is the default database name if not set explicitly on the Neo4j Server + assert summary.query_type == "r" + else: + assert result is False + assert summary.database is None + assert summary.query_type == "r" + + +def test_test_multi_db_specify_database(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_test_multi_db_specify_database + try: + with GraphDatabase.driver(neo4j_uri, auth=auth, database="test_database") as driver: + with driver.session() as session: + result = session.run("RETURN 1") + assert next(result) == 1 + summary = result.summary() + assert summary.database == "test_database" + except ServiceUnavailable as error: + if isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + except ConfigurationError as error: + assert "Database name parameter for selecting database is not supported in Bolt Protocol Version(3, 0)." in error.args[0] + except ClientError as error: + # FAILURE {'code': 'Neo.ClientError.Database.DatabaseNotFound' - This message is sent from the server + assert error.args[0] == "Unable to get a routing table for database 'test_database' because this database does not exist" + + +def test_neo4j_multi_database_support_create(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_support_create + try: + with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: + with driver.session(database="system") as session: + session.run("DROP DATABASE test IF EXISTS").consume() + result = session.run("SHOW DATABASES") + databases = set() + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j"} + + session.run("CREATE DATABASE test").consume() + result = session.run("SHOW DATABASES") + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j", "test"} + with driver.session(database="system") as session: + session.run("DROP DATABASE test IF EXISTS").consume() + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + +def test_neo4j_multi_database_support_different(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_support_different + try: + with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: + with driver.session() as session: + # Test that default database is empty + session.run("MATCH (n) DETACH DELETE n").consume() + result = session.run("MATCH (p:Person) RETURN p") + names = set() + for ix in result: + names.add(ix["p"].get("name")) + assert names == set() # THIS FAILS? + with driver.session(database="system") as session: + session.run("DROP DATABASE testa IF EXISTS").consume() + session.run("DROP DATABASE testb IF EXISTS").consume() + with driver.session(database="system") as session: + result = session.run("SHOW DATABASES") + databases = set() + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j"} + result = session.run("CREATE DATABASE testa") + result.consume() + result = session.run("CREATE DATABASE testb") + result.consume() + with driver.session(database="testa") as session: + result = session.run('CREATE (p:Person {name: "ALICE"})') + result.consume() + with driver.session(database="testb") as session: + result = session.run('CREATE (p:Person {name: "BOB"})') + result.consume() + with driver.session() as session: + # Test that default database is still empty + result = session.run("MATCH (p:Person) RETURN p") + names = set() + for ix in result: + names.add(ix["p"].get("name")) + assert names == set() # THIS FAILS? + with driver.session(database="testa") as session: + result = session.run("MATCH (p:Person) RETURN p") + names = set() + for ix in result: + names.add(ix["p"].get("name")) + assert names == {"ALICE", } + with driver.session(database="testb") as session: + result = session.run("MATCH (p:Person) RETURN p") + names = set() + for ix in result: + names.add(ix["p"].get("name")) + assert names == {"BOB", } + with driver.session(database="system") as session: + session.run("DROP DATABASE testa IF EXISTS").consume() + with driver.session(database="system") as session: + session.run("DROP DATABASE testb IF EXISTS").consume() + with driver.session() as session: + session.run("MATCH (n) DETACH DELETE n").consume() + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + +def test_neo4j_multi_database_test_routing_table_creates_new_if_deleted(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_test_routing_table_creates_new_if_deleted + try: + with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: + with driver.session(database="system") as session: + result = session.run("DROP DATABASE test IF EXISTS") + result.consume() + result = session.run("SHOW DATABASES") + databases = set() + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j"} + + result = session.run("CREATE DATABASE test") + result.consume() + result = session.run("SHOW DATABASES") + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j", "test"} + with driver.session(database="test") as session: + result = session.run("RETURN 1 AS x") + result.consume() + del driver._pool.routing_tables["test"] + with driver.session(database="test") as session: + result = session.run("RETURN 1 AS x") + result.consume() + with driver.session(database="system") as session: + result = session.run("DROP DATABASE test IF EXISTS") + result.consume() + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + +def test_neo4j_multi_database_test_routing_table_updates_if_stale(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_test_routing_table_updates_if_stale + try: + with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: + with driver.session(database="system") as session: + result = session.run("DROP DATABASE test IF EXISTS") + result.consume() + result = session.run("SHOW DATABASES") + databases = set() + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j"} + + result = session.run("CREATE DATABASE test") + result.consume() + result = session.run("SHOW DATABASES") + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j", "test"} + with driver.session(database="test") as session: + result = session.run("RETURN 1 AS x") + result.consume() + driver._pool.routing_tables["test"].ttl = 0 + old_value = driver._pool.routing_tables["test"].last_updated_time + with driver.session(database="test") as session: + result = session.run("RETURN 1 AS x") + result.consume() + with driver.session(database="system") as session: + result = session.run("DROP DATABASE test IF EXISTS") + result.consume() + assert driver._pool.routing_tables["test"].last_updated_time > old_value + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) + + +def test_neo4j_multi_database_test_routing_table_removes_aged(neo4j_uri, auth, target): + # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_test_routing_table_removes_aged + try: + with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: + with driver.session(database="system") as session: + result = session.run("DROP DATABASE testa IF EXISTS") + result.consume() + result = session.run("DROP DATABASE testb IF EXISTS") + result.consume() + result = session.run("SHOW DATABASES") + databases = set() + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j"} + + result = session.run("CREATE DATABASE testa") + result.consume() + result = session.run("CREATE DATABASE testb") + result.consume() + result = session.run("SHOW DATABASES") + for record in result: + databases.add(record.get("name")) + assert databases == {"system", "neo4j", "testa", "testb"} + with driver.session(database="testa") as session: + result = session.run("RETURN 1 AS x") + result.consume() + with driver.session(database="testb") as session: + result = session.run("RETURN 1 AS x") + result.consume() + driver._pool.routing_tables["testa"].ttl = 0 + driver._pool.routing_tables["testb"].ttl = -1 * RoutingConfig.routing_table_purge_delay + old_value = driver._pool.routing_tables["testa"].last_updated_time + with driver.session(database="testa") as session: + # This will refresh the routing table for "testa" and the refresh will trigger a cleanup of aged routing tables + result = session.run("RETURN 1 AS x") + result.consume() + with driver.session(database="system") as session: + result = session.run("DROP DATABASE testa IF EXISTS") + result.consume() + result = session.run("DROP DATABASE testb IF EXISTS") + result.consume() + assert driver._pool.routing_tables["testa"].last_updated_time > old_value + assert "testb" not in driver._pool.routing_tables + except ServiceUnavailable as error: + if error.args[0] == "Server does not support routing": + # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call + pytest.skip(error.args[0]) + elif isinstance(error.__cause__, BoltHandshakeError): + pytest.skip(error.args[0]) diff --git a/tests/integration/test_summary.py b/tests/integration/test_summary.py index 19bc3b3f..3e0b0ed1 100644 --- a/tests/integration/test_summary.py +++ b/tests/integration/test_summary.py @@ -107,8 +107,8 @@ def test_contains_time_information(session): def test_protocol_version_information(session): summary = session.run("UNWIND range(1,100) AS n RETURN n AS number").consume() - assert isinstance(summary.protocol_version, tuple) - assert isinstance(summary.protocol_version[0], int) - assert isinstance(summary.protocol_version[1], int) + assert isinstance(summary.server.protocol_version, tuple) + assert isinstance(summary.server.protocol_version[0], int) + assert isinstance(summary.server.protocol_version[1], int) diff --git a/tests/integration/test_tx_functions.py b/tests/integration/test_tx_functions.py index 6baafb2f..99f325ce 100644 --- a/tests/integration/test_tx_functions.py +++ b/tests/integration/test_tx_functions.py @@ -26,6 +26,8 @@ from neo4j.work.simple import unit_of_work from neo4j.exceptions import ClientError +# python -m pytest tests/integration/test_tx_functions.py -s -v + def test_simple_read(session): @@ -47,6 +49,12 @@ def work(tx, x): def test_read_with_arg_and_metadata(session): + # TODO: Investigate the Query object work pattern + # from neo4j import Query + # def work(tx, *args, **kwargs): + # query = Query("CALL dbms.getTXMetaData", timeout=10, metadata={"foo": "bar"}) + # return tx.run(query).single().value() + @unit_of_work(timeout=25, metadata={"foo": "bar"}) def work(tx): return tx.run("CALL dbms.getTXMetaData").single().value() @@ -79,6 +87,13 @@ def work(tx, x): def test_write_with_arg_and_metadata(session): + # TODO: Investigate the Query object work pattern + # TODO: Raise TypeError you are doing it wrong dont input a Query instance use a string query. + # from neo4j import Query + # def work(tx, x, **kwargs): + # query = Query("CREATE (a {x: $x}) RETURN a.x", timeout=10, metadata={"foo": "bar"}) # Session.run(Query, ) + # return tx.run(query, x=x).single().value() + @unit_of_work(timeout=25, metadata={"foo": "bar"}) def work(tx, x): return tx.run("CREATE (a {x: $x}) RETURN a.x", x=x).single().value() diff --git a/tests/stub/scripts/v3/bookmark_chain.script b/tests/stub/scripts/v3/bookmark_chain.script index f7a11e0e..dac23edb 100644 --- a/tests/stub/scripts/v3/bookmark_chain.script +++ b/tests/stub/scripts/v3/bookmark_chain.script @@ -4,12 +4,12 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {"bookmarks": ["bookmark:0", "bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:0", "bookmark:1"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:2"} -C: BEGIN {"bookmarks": ["bookmark:2"]} +C: BEGIN {"bookmarks": ["bookmark:2"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:3"} diff --git a/tests/stub/scripts/v3/bookmark_chain_with_autocommit.script b/tests/stub/scripts/v3/bookmark_chain_with_autocommit.script index f928eb75..87714ebb 100644 --- a/tests/stub/scripts/v3/bookmark_chain_with_autocommit.script +++ b/tests/stub/scripts/v3/bookmark_chain_with_autocommit.script @@ -4,17 +4,17 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {"bookmarks": ["bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:1"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:2"} -C: RUN "RETURN 1" {} {"bookmarks": ["bookmark:2"]} +C: RUN "RETURN 1" {} {"bookmarks": ["bookmark:2"], "mode": "r"} PULL_ALL S: SUCCESS {"bookmark": "bookmark:3"} SUCCESS {} -C: BEGIN {"bookmarks": ["bookmark:3"]} +C: BEGIN {"bookmarks": ["bookmark:3"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:4"} diff --git a/tests/stub/scripts/v3/database_unavailable.script b/tests/stub/scripts/v3/database_unavailable.script index 705d2e8f..4bfe832f 100644 --- a/tests/stub/scripts/v3/database_unavailable.script +++ b/tests/stub/scripts/v3/database_unavailable.script @@ -7,7 +7,7 @@ !: AUTO ROLLBACK !: PORT 9004 -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} C: PULL_ALL S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database is busy doing store copy"} S: IGNORED diff --git a/tests/stub/scripts/v3/dbms_cluster_routing_get_routing_table_system.script b/tests/stub/scripts/v3/dbms_cluster_routing_get_routing_table_system.script new file mode 100644 index 00000000..8e4061ae --- /dev/null +++ b/tests/stub/scripts/v3/dbms_cluster_routing_get_routing_table_system.script @@ -0,0 +1,14 @@ +!: BOLT 3 +!: PORT 9001 + +C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} +S: SUCCESS {"server": "Neo4j/3.5.0", "connection_id": "12345678-1234-1234-1234-123456789000"} + +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [1234, [{"role":"WRITE", "addresses":["127.0.0.1:9001"]}, {"role":"READ", "addresses":["127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}]] + SUCCESS {} + +C: GOODBYE +S: \ No newline at end of file diff --git a/tests/stub/scripts/v3/disconnect_on_pull_all.script b/tests/stub/scripts/v3/disconnect_on_pull_all.script index 899e2d91..e509e432 100644 --- a/tests/stub/scripts/v3/disconnect_on_pull_all.script +++ b/tests/stub/scripts/v3/disconnect_on_pull_all.script @@ -4,6 +4,6 @@ !: AUTO RESET !: PORT 9001 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: diff --git a/tests/stub/scripts/v3/disconnect_on_pull_all_9004.script b/tests/stub/scripts/v3/disconnect_on_pull_all_9004.script index 761754c5..d97d2d4f 100644 --- a/tests/stub/scripts/v3/disconnect_on_pull_all_9004.script +++ b/tests/stub/scripts/v3/disconnect_on_pull_all_9004.script @@ -4,6 +4,6 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: diff --git a/tests/stub/scripts/v3/disconnect_on_run.script b/tests/stub/scripts/v3/disconnect_on_run.script index 22dca06e..ca1640f6 100644 --- a/tests/stub/scripts/v3/disconnect_on_run.script +++ b/tests/stub/scripts/v3/disconnect_on_run.script @@ -4,5 +4,5 @@ !: AUTO RESET !: PORT 9001 -C: RUN "RETURN 1 AS x" {} {} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} S: diff --git a/tests/stub/scripts/v3/disconnect_on_run_9004.script b/tests/stub/scripts/v3/disconnect_on_run_9004.script index a60c49b7..ca17f705 100644 --- a/tests/stub/scripts/v3/disconnect_on_run_9004.script +++ b/tests/stub/scripts/v3/disconnect_on_run_9004.script @@ -4,5 +4,5 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} S: diff --git a/tests/stub/scripts/v3/error_in_read_tx.script b/tests/stub/scripts/v3/error_in_read_tx.script index a78e7407..aeceb5c0 100644 --- a/tests/stub/scripts/v3/error_in_read_tx.script +++ b/tests/stub/scripts/v3/error_in_read_tx.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "X" {} {} diff --git a/tests/stub/scripts/v3/get_routing_table.script b/tests/stub/scripts/v3/get_routing_table.script index 7dd3ba0e..4a8f04c0 100644 --- a/tests/stub/scripts/v3/get_routing_table.script +++ b/tests/stub/scripts/v3/get_routing_table.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "ROUTE"}]] diff --git a/tests/stub/scripts/v3/get_routing_table_with_context.script b/tests/stub/scripts/v3/get_routing_table_with_context.script index 8e4ba1b7..a79a24c7 100644 --- a/tests/stub/scripts/v3/get_routing_table_with_context.script +++ b/tests/stub/scripts/v3/get_routing_table_with_context.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"name": "molly", "age": "1"}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"name": "molly", "age": "1"}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "ROUTE"}]] diff --git a/tests/stub/scripts/v3/non_router.script b/tests/stub/scripts/v3/non_router.script index 802b2401..b6b58e2b 100644 --- a/tests/stub/scripts/v3/non_router.script +++ b/tests/stub/scripts/v3/non_router.script @@ -2,7 +2,7 @@ !: AUTO HELLO !: AUTO GOODBYE -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode":"r"} PULL_ALL S: FAILURE {"code": "Neo.ClientError.Procedure.ProcedureNotFound", "message": "Not a router"} IGNORED diff --git a/tests/stub/scripts/v3/readonly_true.script b/tests/stub/scripts/v3/readonly_true.script index 016638b1..0b80d8bc 100644 --- a/tests/stub/scripts/v3/readonly_true.script +++ b/tests/stub/scripts/v3/readonly_true.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "RETURN 1" {} {"mode": "R"} +C: RUN "RETURN 1" {} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["1"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1.script b/tests/stub/scripts/v3/return_1.script index 752a366e..70891f7e 100644 --- a/tests/stub/scripts/v3/return_1.script +++ b/tests/stub/scripts/v3/return_1.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1_four_times.script b/tests/stub/scripts/v3/return_1_four_times.script index 4573d607..42ffdd8c 100644 --- a/tests/stub/scripts/v3/return_1_four_times.script +++ b/tests/stub/scripts/v3/return_1_four_times.script @@ -4,25 +4,25 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] SUCCESS {} -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] SUCCESS {} -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] SUCCESS {} -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1_in_read_tx.script b/tests/stub/scripts/v3/return_1_in_read_tx.script index 75492b42..a759ac20 100644 --- a/tests/stub/scripts/v3/return_1_in_read_tx.script +++ b/tests/stub/scripts/v3/return_1_in_read_tx.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/scripts/v3/return_1_in_read_tx_twice.script b/tests/stub/scripts/v3/return_1_in_read_tx_twice.script index a01605a9..19c2dc9f 100644 --- a/tests/stub/scripts/v3/return_1_in_read_tx_twice.script +++ b/tests/stub/scripts/v3/return_1_in_read_tx_twice.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} @@ -16,7 +16,7 @@ S: SUCCESS {"fields": ["1"]} C: COMMIT S: SUCCESS {"bookmark": "bookmark:1"} -C: BEGIN {"bookmarks": ["bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:1"], "mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/scripts/v3/return_1_on_9002.script b/tests/stub/scripts/v3/return_1_on_9002.script index 119fab20..6877f6ba 100644 --- a/tests/stub/scripts/v3/return_1_on_9002.script +++ b/tests/stub/scripts/v3/return_1_on_9002.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9002 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1_on_9005.script b/tests/stub/scripts/v3/return_1_on_9005.script index 8ddd630a..90942149 100644 --- a/tests/stub/scripts/v3/return_1_on_9005.script +++ b/tests/stub/scripts/v3/return_1_on_9005.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9005 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1_port_9001.script b/tests/stub/scripts/v3/return_1_port_9001.script index cb87bd3a..a73cbf14 100644 --- a/tests/stub/scripts/v3/return_1_port_9001.script +++ b/tests/stub/scripts/v3/return_1_port_9001.script @@ -5,7 +5,7 @@ C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} S: SUCCESS {"server": "Neo4j/3.0.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"} -C: RUN "RETURN 1 AS x" {} {} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1_twice.script b/tests/stub/scripts/v3/return_1_twice.script index 6bbaa5b7..7807a6a0 100644 --- a/tests/stub/scripts/v3/return_1_twice.script +++ b/tests/stub/scripts/v3/return_1_twice.script @@ -4,13 +4,13 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] SUCCESS {} -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v3/return_1_twice_in_read_tx.script b/tests/stub/scripts/v3/return_1_twice_in_read_tx.script index d59112cc..d6055cef 100644 --- a/tests/stub/scripts/v3/return_1_twice_in_read_tx.script +++ b/tests/stub/scripts/v3/return_1_twice_in_read_tx.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN $x" {"x": 1} {} diff --git a/tests/stub/scripts/v3/return_2_in_read_tx.script b/tests/stub/scripts/v3/return_2_in_read_tx.script index 4da1938d..bbda4f3d 100644 --- a/tests/stub/scripts/v3/return_2_in_read_tx.script +++ b/tests/stub/scripts/v3/return_2_in_read_tx.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {"bookmarks": ["bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:1"], "mode": "r"} S: SUCCESS {} C: RUN "RETURN 2" {} {} diff --git a/tests/stub/scripts/v3/router.script b/tests/stub/scripts/v3/router.script index 8703d73a..a0901bc4 100644 --- a/tests/stub/scripts/v3/router.script +++ b/tests/stub/scripts/v3/router.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9001 -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]] diff --git a/tests/stub/scripts/v3/router_no_readers.script b/tests/stub/scripts/v3/router_no_readers.script index c744988e..44794cca 100644 --- a/tests/stub/scripts/v3/router_no_readers.script +++ b/tests/stub/scripts/v3/router_no_readers.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":[]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]] diff --git a/tests/stub/scripts/v3/router_no_routers.script b/tests/stub/scripts/v3/router_no_routers.script index b47ef7c4..9bcb05c9 100644 --- a/tests/stub/scripts/v3/router_no_routers.script +++ b/tests/stub/scripts/v3/router_no_routers.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":[]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]] diff --git a/tests/stub/scripts/v3/router_no_writers.script b/tests/stub/scripts/v3/router_no_writers.script index fa14868d..1b6fb694 100644 --- a/tests/stub/scripts/v3/router_no_writers.script +++ b/tests/stub/scripts/v3/router_no_writers.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":[]}]] diff --git a/tests/stub/scripts/v3/router_with_multiple_servers.script b/tests/stub/scripts/v3/router_with_multiple_servers.script index 7b908a52..e338aa13 100644 --- a/tests/stub/scripts/v3/router_with_multiple_servers.script +++ b/tests/stub/scripts/v3/router_with_multiple_servers.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002"]},{"role":"READ","addresses":["127.0.0.1:9001","127.0.0.1:9003"]},{"role":"WRITE","addresses":["127.0.0.1:9004"]}]] diff --git a/tests/stub/scripts/v3/router_with_multiple_writers.script b/tests/stub/scripts/v3/router_with_multiple_writers.script index 9995e628..eda5069b 100644 --- a/tests/stub/scripts/v3/router_with_multiple_writers.script +++ b/tests/stub/scripts/v3/router_with_multiple_writers.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006","127.0.0.1:9007"]}]] diff --git a/tests/stub/scripts/v3/rude_reader.script b/tests/stub/scripts/v3/rude_reader.script index 97e6ee6e..055c06f1 100644 --- a/tests/stub/scripts/v3/rude_reader.script +++ b/tests/stub/scripts/v3/rude_reader.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL_ALL S: diff --git a/tests/stub/scripts/v3/rude_router.script b/tests/stub/scripts/v3/rude_router.script index f5a0181f..9c135e1c 100644 --- a/tests/stub/scripts/v3/rude_router.script +++ b/tests/stub/scripts/v3/rude_router.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: diff --git a/tests/stub/scripts/v3/silent_router.script b/tests/stub/scripts/v3/silent_router.script index fedf2c25..6fde3a0c 100644 --- a/tests/stub/scripts/v3/silent_router.script +++ b/tests/stub/scripts/v3/silent_router.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["ttl", "servers"]} SUCCESS {} diff --git a/tests/stub/scripts/v3/user_canceled_read.script b/tests/stub/scripts/v3/user_canceled_read.script index 80871850..2beeb89d 100644 --- a/tests/stub/scripts/v3/user_canceled_read.script +++ b/tests/stub/scripts/v3/user_canceled_read.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/scripts/v4x0/dbms_routing_get_routing_table_system_default.script b/tests/stub/scripts/v4x0/dbms_routing_get_routing_table_system_default.script new file mode 100644 index 00000000..0e22da90 --- /dev/null +++ b/tests/stub/scripts/v4x0/dbms_routing_get_routing_table_system_default.script @@ -0,0 +1,14 @@ +!: BOLT 4 +!: PORT 9001 + +C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} +S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "12345678-1234-1234-1234-123456789000"} + +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [1234, [{"role":"WRITE", "addresses":["127.0.0.1:9001"]}, {"role":"READ", "addresses":["127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}]] + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"} + +C: GOODBYE +S: \ No newline at end of file diff --git a/tests/stub/scripts/v4x0/dbms_routing_get_routing_table_system_neo4j.script b/tests/stub/scripts/v4x0/dbms_routing_get_routing_table_system_neo4j.script new file mode 100644 index 00000000..ba136984 --- /dev/null +++ b/tests/stub/scripts/v4x0/dbms_routing_get_routing_table_system_neo4j.script @@ -0,0 +1,14 @@ +!: BOLT 4 +!: PORT 9001 + +C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} +S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "12345678-1234-1234-1234-123456789000"} + +C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": {}, "database": "neo4j"} {"mode": "r", "db": "system"} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [1234, [{"role":"WRITE", "addresses":["127.0.0.1:9001"]}, {"role":"READ", "addresses":["127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}]] + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "r", "t_last": 15, "db": "neo4j"} + +C: GOODBYE +S: \ No newline at end of file diff --git a/tests/stub/scripts/v4x0/disconnect_on_pull.script b/tests/stub/scripts/v4x0/disconnect_on_pull.script index 9cc33086..9200226e 100644 --- a/tests/stub/scripts/v4x0/disconnect_on_pull.script +++ b/tests/stub/scripts/v4x0/disconnect_on_pull.script @@ -4,6 +4,6 @@ !: AUTO RESET !: PORT 9001 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL {"n": -1} S: diff --git a/tests/stub/scripts/v4x0/disconnect_on_pull_port_9004.script b/tests/stub/scripts/v4x0/disconnect_on_pull_port_9004.script index 12c9d51e..59f5727f 100644 --- a/tests/stub/scripts/v4x0/disconnect_on_pull_port_9004.script +++ b/tests/stub/scripts/v4x0/disconnect_on_pull_port_9004.script @@ -4,6 +4,6 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL {"n": -1} S: diff --git a/tests/stub/scripts/v4x0/disconnect_on_run.script b/tests/stub/scripts/v4x0/disconnect_on_run.script index d60b9022..5508ee78 100644 --- a/tests/stub/scripts/v4x0/disconnect_on_run.script +++ b/tests/stub/scripts/v4x0/disconnect_on_run.script @@ -4,5 +4,5 @@ !: AUTO RESET !: PORT 9001 -C: RUN "RETURN 1 AS x" {} {} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} S: diff --git a/tests/stub/scripts/v4x0/disconnect_on_run_port_9004.script b/tests/stub/scripts/v4x0/disconnect_on_run_port_9004.script index 23b3884f..42cfdde2 100644 --- a/tests/stub/scripts/v4x0/disconnect_on_run_port_9004.script +++ b/tests/stub/scripts/v4x0/disconnect_on_run_port_9004.script @@ -4,5 +4,5 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} S: diff --git a/tests/stub/scripts/v4x0/return_1_four_times_port_9004.script b/tests/stub/scripts/v4x0/return_1_four_times_port_9004.script index f983c267..47698633 100644 --- a/tests/stub/scripts/v4x0/return_1_four_times_port_9004.script +++ b/tests/stub/scripts/v4x0/return_1_four_times_port_9004.script @@ -4,25 +4,25 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["1"]} RECORD [1] SUCCESS {} -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["1"]} RECORD [1] SUCCESS {} -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["1"]} RECORD [1] SUCCESS {} -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["1"]} RECORD [1] diff --git a/tests/stub/scripts/v4x0/return_1_port_9001.script b/tests/stub/scripts/v4x0/return_1_port_9001.script index 6d90053c..773afd76 100644 --- a/tests/stub/scripts/v4x0/return_1_port_9001.script +++ b/tests/stub/scripts/v4x0/return_1_port_9001.script @@ -5,7 +5,7 @@ C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"} -C: RUN "RETURN 1 AS x" {} {} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v4x0/return_1_port_9002.script b/tests/stub/scripts/v4x0/return_1_port_9002.script index fdd74916..c2778b13 100644 --- a/tests/stub/scripts/v4x0/return_1_port_9002.script +++ b/tests/stub/scripts/v4x0/return_1_port_9002.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9002 -C: RUN "RETURN 1 AS x" {} {} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v4x0/return_1_port_9004.script b/tests/stub/scripts/v4x0/return_1_port_9004.script index 8fb52de3..f523408c 100644 --- a/tests/stub/scripts/v4x0/return_1_port_9004.script +++ b/tests/stub/scripts/v4x0/return_1_port_9004.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN $x" {"x": 1} {} +C: RUN "RETURN $x" {"x": 1} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v4x0/return_1_port_9005.script b/tests/stub/scripts/v4x0/return_1_port_9005.script index 10b15265..d08f7246 100644 --- a/tests/stub/scripts/v4x0/return_1_port_9005.script +++ b/tests/stub/scripts/v4x0/return_1_port_9005.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9005 -C: RUN "RETURN 1 AS x" {} {} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["x"]} RECORD [1] diff --git a/tests/stub/scripts/v4x0/return_1_twice_port_9004.script b/tests/stub/scripts/v4x0/return_1_twice_port_9004.script index 42405213..b97142b3 100644 --- a/tests/stub/scripts/v4x0/return_1_twice_port_9004.script +++ b/tests/stub/scripts/v4x0/return_1_twice_port_9004.script @@ -4,13 +4,13 @@ !: AUTO RESET !: PORT 9004 -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["1"]} RECORD [1] SUCCESS {} -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} PULL {"n": -1} S: SUCCESS {"fields": ["1"]} RECORD [1] diff --git a/tests/stub/scripts/v4x0/router.script b/tests/stub/scripts/v4x0/router.script index 7b2f180d..af0e8c19 100644 --- a/tests/stub/scripts/v4x0/router.script +++ b/tests/stub/scripts/v4x0/router.script @@ -4,8 +4,8 @@ !: AUTO RESET !: PORT 9001 -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]] - SUCCESS {} + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"} diff --git a/tests/stub/scripts/v4x0/router_get_routing_table_with_context.script b/tests/stub/scripts/v4x0/router_get_routing_table_with_context.script index 630b0744..15cbf13d 100644 --- a/tests/stub/scripts/v4x0/router_get_routing_table_with_context.script +++ b/tests/stub/scripts/v4x0/router_get_routing_table_with_context.script @@ -4,8 +4,8 @@ !: AUTO RESET !: PORT 9001 -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"name": "molly", "age": "1"}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"name": "molly", "age": "1"}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [302, [{"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002"]}, {"role":"READ", "addresses":["127.0.0.1:9002"]}, {"role":"WRITE", "addresses":["127.0.0.1:9001"]}]] - SUCCESS {} + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"} diff --git a/tests/stub/scripts/v4x0/router_port_9001_one_read_port_9004_one_write_port_9006.script b/tests/stub/scripts/v4x0/router_port_9001_one_read_port_9004_one_write_port_9006.script index 83a82ee2..ac7887d7 100644 --- a/tests/stub/scripts/v4x0/router_port_9001_one_read_port_9004_one_write_port_9006.script +++ b/tests/stub/scripts/v4x0/router_port_9001_one_read_port_9004_one_write_port_9006.script @@ -6,8 +6,8 @@ C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"} -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context":{}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context":{}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [300, [{"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002"]}, {"role":"READ", "addresses":["127.0.0.1:9004"]}, {"role":"WRITE", "addresses":["127.0.0.1:9006"]}]] - SUCCESS {"bookmark": "neo4j:bookmark-test-1", type": "r", "t_last": 5, "db": "system"} \ No newline at end of file + SUCCESS {"bookmark": "neo4j:bookmark-test-1", type": "s", "t_last": 5, "db": "system"} \ No newline at end of file diff --git a/tests/stub/scripts/v4x0/router_role_route_share_port_with_role_read_and_role_write.script b/tests/stub/scripts/v4x0/router_role_route_share_port_with_role_read_and_role_write.script index 74f2cb02..d0a130c9 100644 --- a/tests/stub/scripts/v4x0/router_role_route_share_port_with_role_read_and_role_write.script +++ b/tests/stub/scripts/v4x0/router_role_route_share_port_with_role_read_and_role_write.script @@ -4,8 +4,8 @@ !: AUTO RESET !: PORT 9001 -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [302, [{"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002"]}, {"role":"READ", "addresses":["127.0.0.1:9002"]}, {"role":"WRITE", "addresses":["127.0.0.1:9001"]}]] - SUCCESS {} + SUCCESS {"bookmark": "neo4j:bookmark-test-1", type": "s", "t_last": 5, "db": "system"} diff --git a/tests/stub/scripts/v4x0/router_with_no_role_read.script b/tests/stub/scripts/v4x0/router_with_no_role_read.script index 0874a03b..4dc0dbb9 100644 --- a/tests/stub/scripts/v4x0/router_with_no_role_read.script +++ b/tests/stub/scripts/v4x0/router_with_no_role_read.script @@ -4,8 +4,8 @@ !: AUTO RESET !: PORT 9001 -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [304, [{"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"READ", "addresses":[]}, {"role":"WRITE", "addresses":["127.0.0.1:9006"]}]] - SUCCESS {} + SUCCESS {"bookmark": "neo4j:bookmark-test-1", type": "s", "t_last": 5, "db": "system"} diff --git a/tests/stub/scripts/v4x0/router_with_no_role_write.script b/tests/stub/scripts/v4x0/router_with_no_role_write.script index 6fcd87a8..f2ca7c40 100644 --- a/tests/stub/scripts/v4x0/router_with_no_role_write.script +++ b/tests/stub/scripts/v4x0/router_with_no_role_write.script @@ -4,8 +4,8 @@ !: AUTO RESET !: PORT 9001 -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [303, [{"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"READ", "addresses":["127.0.0.1:9004", "127.0.0.1:9005"]}, {"role":"WRITE", "addresses":[]}]] - SUCCESS {} + SUCCESS {"bookmark": "neo4j:bookmark-test-1", type": "s", "t_last": 5, "db": "system"} diff --git a/tests/stub/scripts/v4x0/routing_table_failure_not_a_router.script b/tests/stub/scripts/v4x0/routing_table_failure_not_a_router.script index e0cf6ae7..eaf28300 100644 --- a/tests/stub/scripts/v4x0/routing_table_failure_not_a_router.script +++ b/tests/stub/scripts/v4x0/routing_table_failure_not_a_router.script @@ -2,7 +2,7 @@ !: AUTO HELLO !: AUTO GOODBYE -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} PULL {"n": -1} S: FAILURE {"code": "Neo.ClientError.Procedure.ProcedureNotFound", "message": "Not a router"} IGNORED diff --git a/tests/stub/scripts/v4x0/routing_table_silent_router.script b/tests/stub/scripts/v4x0/routing_table_silent_router.script index 064e6b3a..439a086d 100644 --- a/tests/stub/scripts/v4x0/routing_table_silent_router.script +++ b/tests/stub/scripts/v4x0/routing_table_silent_router.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: AUTO RESET -C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"} PULL {"n": -1} S: SUCCESS {"fields": ["ttl", "servers"]} SUCCESS {} diff --git a/tests/stub/scripts/v4x0/run_with_failure_database_unavailable.script b/tests/stub/scripts/v4x0/run_with_failure_database_unavailable.script index 2c1997f6..5796291a 100644 --- a/tests/stub/scripts/v4x0/run_with_failure_database_unavailable.script +++ b/tests/stub/scripts/v4x0/run_with_failure_database_unavailable.script @@ -7,7 +7,7 @@ !: AUTO ROLLBACK !: PORT 9004 -C: RUN "RETURN 1" {} {} +C: RUN "RETURN 1" {} {"mode": "r"} C: PULL {"n": -1} S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database is busy doing store copy"} S: IGNORED diff --git a/tests/stub/scripts/v4x0/tx_bookmark_chain.script b/tests/stub/scripts/v4x0/tx_bookmark_chain.script index 207cba82..aa0e1270 100644 --- a/tests/stub/scripts/v4x0/tx_bookmark_chain.script +++ b/tests/stub/scripts/v4x0/tx_bookmark_chain.script @@ -4,12 +4,12 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {"bookmarks": ["bookmark:0", "bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:0", "bookmark:1"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:2"} -C: BEGIN {"bookmarks": ["bookmark:2"]} +C: BEGIN {"bookmarks": ["bookmark:2"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:3"} diff --git a/tests/stub/scripts/v4x0/tx_bookmark_chain_with_autocommit.script b/tests/stub/scripts/v4x0/tx_bookmark_chain_with_autocommit.script index 28f3ed5c..f26d109a 100644 --- a/tests/stub/scripts/v4x0/tx_bookmark_chain_with_autocommit.script +++ b/tests/stub/scripts/v4x0/tx_bookmark_chain_with_autocommit.script @@ -4,17 +4,17 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {"bookmarks": ["bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:1"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:2"} -C: RUN "RETURN 1" {} {"bookmarks": ["bookmark:2"]} +C: RUN "RETURN 1" {} {"bookmarks": ["bookmark:2"], "mode": "r"} PULL {"n": -1} S: SUCCESS {"bookmark": "bookmark:3"} SUCCESS {} -C: BEGIN {"bookmarks": ["bookmark:3"]} +C: BEGIN {"bookmarks": ["bookmark:3"], "mode": "r"} S: SUCCESS {} C: COMMIT S: SUCCESS {"bookmark": "bookmark:4"} diff --git a/tests/stub/scripts/v4x0/tx_return_1_port_9004.script b/tests/stub/scripts/v4x0/tx_return_1_port_9004.script index 954ba756..872b1ee2 100644 --- a/tests/stub/scripts/v4x0/tx_return_1_port_9004.script +++ b/tests/stub/scripts/v4x0/tx_return_1_port_9004.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/scripts/v4x0/tx_return_1_reset_port_9004.script b/tests/stub/scripts/v4x0/tx_return_1_reset_port_9004.script index 5cae442c..f655e90a 100644 --- a/tests/stub/scripts/v4x0/tx_return_1_reset_port_9004.script +++ b/tests/stub/scripts/v4x0/tx_return_1_reset_port_9004.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/scripts/v4x0/tx_return_1_twice_port_9004.script b/tests/stub/scripts/v4x0/tx_return_1_twice_port_9004.script index 42e9fa0e..cf17f264 100644 --- a/tests/stub/scripts/v4x0/tx_return_1_twice_port_9004.script +++ b/tests/stub/scripts/v4x0/tx_return_1_twice_port_9004.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/scripts/v4x0/tx_return_2_with_bookmark_port_9004.script b/tests/stub/scripts/v4x0/tx_return_2_with_bookmark_port_9004.script index 11336229..e6589258 100644 --- a/tests/stub/scripts/v4x0/tx_return_2_with_bookmark_port_9004.script +++ b/tests/stub/scripts/v4x0/tx_return_2_with_bookmark_port_9004.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {"bookmarks": ["bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:1"], "mode": "r"} S: SUCCESS {} C: RUN "RETURN 2" {} {} diff --git a/tests/stub/scripts/v4x0/tx_run_with_failure_syntax_error_port_9004.script b/tests/stub/scripts/v4x0/tx_run_with_failure_syntax_error_port_9004.script index 502c8a5d..8bd71132 100644 --- a/tests/stub/scripts/v4x0/tx_run_with_failure_syntax_error_port_9004.script +++ b/tests/stub/scripts/v4x0/tx_run_with_failure_syntax_error_port_9004.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "X" {} {} diff --git a/tests/stub/scripts/v4x0/tx_two_subsequent_return_1_port_9004.script b/tests/stub/scripts/v4x0/tx_two_subsequent_return_1_port_9004.script index 6e195ec9..95afab57 100644 --- a/tests/stub/scripts/v4x0/tx_two_subsequent_return_1_port_9004.script +++ b/tests/stub/scripts/v4x0/tx_two_subsequent_return_1_port_9004.script @@ -4,7 +4,7 @@ !: AUTO RESET !: PORT 9004 -C: BEGIN {} +C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} @@ -16,7 +16,7 @@ S: SUCCESS {"fields": ["1"]} C: COMMIT S: SUCCESS {"bookmark": "bookmark:1"} -C: BEGIN {"bookmarks": ["bookmark:1"]} +C: BEGIN {"bookmarks": ["bookmark:1"], "mode": "r"} S: SUCCESS {} C: RUN "RETURN 1" {} {} diff --git a/tests/stub/test_bookmarking.py b/tests/stub/test_bookmarking.py index 5c7ef94a..9e993103 100644 --- a/tests/stub/test_bookmarking.py +++ b/tests/stub/test_bookmarking.py @@ -91,8 +91,7 @@ def test_should_automatically_chain_bookmarks(driver_info, test_scripts): with StubCluster(*test_scripts): uri = "neo4j://localhost:9001" with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, - bookmarks=["bookmark:0", "bookmark:1"]) as session: + with driver.session(default_access_mode=READ_ACCESS, bookmarks=["bookmark:0", "bookmark:1"]) as session: with session.begin_transaction(): pass assert session.last_bookmark() == "bookmark:2" @@ -113,8 +112,7 @@ def test_autocommit_transaction_included_in_chain(driver_info, test_scripts): with StubCluster(*test_scripts): uri = "neo4j://localhost:9001" with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - with driver.session(default_access_mode=READ_ACCESS, - bookmarks=["bookmark:1"]) as session: + with driver.session(default_access_mode=READ_ACCESS, bookmarks=["bookmark:1"]) as session: with session.begin_transaction(): pass assert session.last_bookmark() == "bookmark:2" diff --git a/tests/stub/test_directdriver.py b/tests/stub/test_directdriver.py index 47f836db..0d833178 100644 --- a/tests/stub/test_directdriver.py +++ b/tests/stub/test_directdriver.py @@ -35,6 +35,7 @@ BoltDriver, Query, WRITE_ACCESS, + READ_ACCESS, TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, ) @@ -57,7 +58,7 @@ session_config = { - "default_access_mode": WRITE_ACCESS, + "default_access_mode": READ_ACCESS, "connection_acquisition_timeout": 1.0, "max_retry_time": 1.0, "initial_retry_delay": 1.0, @@ -124,7 +125,7 @@ def test_direct_verify_connectivity(driver_info, test_script, test_expected): uri = "bolt://127.0.0.1:9001" with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: assert isinstance(driver, BoltDriver) - assert driver.verify_connectivity() == test_expected + assert driver.verify_connectivity(default_access_mode=READ_ACCESS) == test_expected @pytest.mark.parametrize( @@ -140,7 +141,7 @@ def test_direct_verify_connectivity_disconnect_on_run(driver_info, test_script): uri = "bolt://127.0.0.1:9001" with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: with pytest.raises(ServiceUnavailable): - driver.verify_connectivity() + driver.verify_connectivity(default_access_mode=READ_ACCESS) @pytest.mark.parametrize( diff --git a/tests/stub/test_multi_database.py b/tests/stub/test_multi_database.py new file mode 100644 index 00000000..77679851 --- /dev/null +++ b/tests/stub/test_multi_database.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) 2002-2020 "Neo4j," +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from neo4j import ( + GraphDatabase, + Neo4jDriver, + DEFAULT_DATABASE, +) +from tests.stub.conftest import StubCluster + +# python -m pytest tests/stub/test_multi_database.py -s -v + + +@pytest.mark.parametrize( + "test_script, test_database", + [ + ("v3/dbms_cluster_routing_get_routing_table_system.script", DEFAULT_DATABASE), + ("v4x0/dbms_routing_get_routing_table_system_default.script", DEFAULT_DATABASE), + ("v4x0/dbms_routing_get_routing_table_system_neo4j.script", "neo4j"), + ] +) +def test_dbms_cluster_routing_get_routing_table(driver_info, test_script, test_database): + # python -m pytest tests/stub/test_multi_database.py -s -v -k test_dbms_cluster_routing_get_routing_table + + test_config = { + "user_agent": "test", + "database": test_database, + } + + with StubCluster(test_script): + uri = "neo4j://127.0.0.1:9001" + driver = GraphDatabase.driver(uri, auth=driver_info["auth_token"], **test_config) + assert isinstance(driver, Neo4jDriver) + driver.close() diff --git a/tests/stub/test_routingdriver.py b/tests/stub/test_routingdriver.py index da026163..90c52c11 100644 --- a/tests/stub/test_routingdriver.py +++ b/tests/stub/test_routingdriver.py @@ -26,6 +26,7 @@ Neo4jDriver, TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, + DEFAULT_DATABASE, ) from neo4j.api import ( READ_ACCESS, @@ -230,7 +231,7 @@ def test_should_discover_servers_on_driver_construction(driver_info, test_script with StubCluster(test_script): uri = "neo4j://127.0.0.1:9001" with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: - table = driver._pool.routing_table + table = driver._pool.routing_tables[DEFAULT_DATABASE] assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)} @@ -370,6 +371,31 @@ def test_should_disconnect_after_explicit_commit(driver_info, test_scripts, test assert session._connection is None +@pytest.mark.parametrize( + "test_scripts, test_run_args", + [ + (("v3/router.script", "v3/return_1_twice_in_read_tx.script"), ("RETURN $x", {"x": 1})), + (("v4x0/router.script", "v4x0/tx_return_1_twice_port_9004.script"), ("RETURN 1", )), + ] +) +def test_default_access_mode_defined_at_session_level(driver_info, test_scripts, test_run_args): + # python -m pytest tests/stub/test_routingdriver.py -s -v -k test_default_access_mode_defined_at_session_level + with StubCluster(*test_scripts): + uri = "neo4j://127.0.0.1:9001" + with GraphDatabase.driver(uri, auth=driver_info["auth_token"]) as driver: + with driver.session(default_access_mode=READ_ACCESS) as session: + with session.begin_transaction() as tx: + result = tx.run(*test_run_args) + assert session._connection is not None + result.consume() + assert session._connection is not None + result = tx.run(*test_run_args) + assert session._connection is not None + result.consume() + assert session._connection is not None + assert session._connection is None + + @pytest.mark.parametrize( "test_scripts, test_run_args", [ @@ -539,7 +565,7 @@ def test_forgets_address_on_not_a_leader_error(driver_info, test_scripts, test_r _ = session.run(*test_run_args) pool = driver._pool - table = pool.routing_table + table = driver._pool.routing_tables[DEFAULT_DATABASE] # address might still have connections in the pool, failed instance just can't serve writes assert ('127.0.0.1', 9006) in pool.connections @@ -566,7 +592,7 @@ def test_forgets_address_on_forbidden_on_read_only_database_error(driver_info, t _ = session.run(*test_run_args) pool = driver._pool - table = pool.routing_table + table = driver._pool.routing_tables[DEFAULT_DATABASE] # address might still have connections in the pool, failed instance just can't serve writes assert ('127.0.0.1', 9006) in pool.connections @@ -591,7 +617,7 @@ def test_forgets_address_on_service_unavailable_error(driver_info, test_scripts, with driver.session(default_access_mode=READ_ACCESS) as session: pool = driver._pool - table = pool.routing_table + table = driver._pool.routing_tables[DEFAULT_DATABASE] table.readers.remove(('127.0.0.1', 9005)) with pytest.raises(SessionExpired): @@ -626,7 +652,7 @@ def test_forgets_address_on_database_unavailable_error(driver_info, test_scripts with driver.session(default_access_mode=READ_ACCESS) as session: pool = driver._pool - table = pool.routing_table + table = driver._pool.routing_tables[DEFAULT_DATABASE] table.readers.remove(('127.0.0.1', 9005)) with pytest.raises(TransientError) as raised: @@ -634,7 +660,7 @@ def test_forgets_address_on_database_unavailable_error(driver_info, test_scripts assert raised.exception.title == "DatabaseUnavailable" pool = driver._pool - table = pool.routing_table + table = driver._pool.routing_tables[DEFAULT_DATABASE] # address should not have connections in the pool, it has failed assert ('127.0.0.1', 9004) not in pool.connections diff --git a/tests/unit/io/test_class_bolt3.py b/tests/unit/io/test_class_bolt3.py index 2421be7a..491deb97 100644 --- a/tests/unit/io/test_class_bolt3.py +++ b/tests/unit/io/test_class_bolt3.py @@ -22,44 +22,53 @@ import pytest from neo4j.io._bolt3 import Bolt3 +from neo4j.conf import PoolConfig +from neo4j.exceptions import ( + ConfigurationError, +) + +# python -m pytest tests/unit/io/test_class_bolt3.py -s -v def test_conn_timed_out(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address), max_connection_lifetime=0) + max_connection_lifetime = 0 + connection = Bolt3(address, fake_socket(address), max_connection_lifetime) assert connection.timedout() is True def test_conn_not_timed_out_if_not_enabled(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address), max_connection_lifetime=-1) + max_connection_lifetime = -1 + connection = Bolt3(address, fake_socket(address), max_connection_lifetime) assert connection.timedout() is False def test_conn_not_timed_out(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address), max_connection_lifetime=999999999) + max_connection_lifetime = 999999999 + connection = Bolt3(address, fake_socket(address), max_connection_lifetime) assert connection.timedout() is False def test_db_extra_not_supported_in_begin(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address)) - with pytest.raises(ValueError): + connection = Bolt3(address, fake_socket(address), PoolConfig.max_connection_lifetime) + with pytest.raises(ConfigurationError): connection.begin(db="something") def test_db_extra_not_supported_in_run(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address)) - with pytest.raises(ValueError): + connection = Bolt3(address, fake_socket(address), PoolConfig.max_connection_lifetime) + with pytest.raises(ConfigurationError): connection.run("", db="something") def test_simple_discard(fake_socket): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt3(address, socket) + connection = Bolt3(address, socket, PoolConfig.max_connection_lifetime) connection.discard() connection.send_all() tag, fields = socket.pop_message() @@ -69,14 +78,14 @@ def test_simple_discard(fake_socket): def test_n_extra_not_supported_in_discard(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address)) + connection = Bolt3(address, fake_socket(address), PoolConfig.max_connection_lifetime) with pytest.raises(ValueError): connection.discard(n=666) def test_qid_extra_not_supported_in_discard(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address)) + connection = Bolt3(address, fake_socket(address), PoolConfig.max_connection_lifetime) with pytest.raises(ValueError): connection.discard(qid=666) @@ -84,7 +93,7 @@ def test_qid_extra_not_supported_in_discard(fake_socket): def test_simple_pull(fake_socket): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt3(address, socket) + connection = Bolt3(address, socket, PoolConfig.max_connection_lifetime) connection.pull() connection.send_all() tag, fields = socket.pop_message() @@ -94,14 +103,13 @@ def test_simple_pull(fake_socket): def test_n_extra_not_supported_in_pull(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address)) + connection = Bolt3(address, fake_socket(address), PoolConfig.max_connection_lifetime) with pytest.raises(ValueError): connection.pull(n=666) def test_qid_extra_not_supported_in_pull(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt3(address, fake_socket(address)) + connection = Bolt3(address, fake_socket(address), PoolConfig.max_connection_lifetime) with pytest.raises(ValueError): connection.pull(qid=666) - diff --git a/tests/unit/io/test_class_bolt4x0.py b/tests/unit/io/test_class_bolt4x0.py index 1db5c545..1886e5fc 100644 --- a/tests/unit/io/test_class_bolt4x0.py +++ b/tests/unit/io/test_class_bolt4x0.py @@ -20,31 +20,36 @@ import pytest + from neo4j.io._bolt4x0 import Bolt4x0 +from neo4j.conf import PoolConfig def test_conn_timed_out(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt4x0(address, fake_socket(address), max_connection_lifetime=0) + max_connection_lifetime = 0 + connection = Bolt4x0(address, fake_socket(address), max_connection_lifetime) assert connection.timedout() is True def test_conn_not_timed_out_if_not_enabled(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt4x0(address, fake_socket(address), max_connection_lifetime=-1) + max_connection_lifetime = -1 + connection = Bolt4x0(address, fake_socket(address), max_connection_lifetime) assert connection.timedout() is False def test_conn_not_timed_out(fake_socket): address = ("127.0.0.1", 7687) - connection = Bolt4x0(address, fake_socket(address), max_connection_lifetime=999999999) + max_connection_lifetime = 999999999 + connection = Bolt4x0(address, fake_socket(address), max_connection_lifetime) assert connection.timedout() is False def test_db_extra_in_begin(fake_socket): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.begin(db="something") connection.send_all() tag, fields = socket.pop_message() @@ -56,7 +61,7 @@ def test_db_extra_in_begin(fake_socket): def test_db_extra_in_run(fake_socket): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.run("", {}, db="something") connection.send_all() tag, fields = socket.pop_message() @@ -70,7 +75,7 @@ def test_db_extra_in_run(fake_socket): def test_n_extra_in_discard(fake_socket): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.discard(n=666) connection.send_all() tag, fields = socket.pop_message() @@ -89,7 +94,7 @@ def test_n_extra_in_discard(fake_socket): def test_qid_extra_in_discard(fake_socket, test_input, expected): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.discard(qid=test_input) connection.send_all() tag, fields = socket.pop_message() @@ -109,7 +114,7 @@ def test_n_and_qid_extras_in_discard(fake_socket, test_input, expected): # python -m pytest tests/unit/io/test_class_bolt4x0.py -s -k test_n_and_qid_extras_in_discard address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.discard(n=666, qid=test_input) connection.send_all() tag, fields = socket.pop_message() @@ -128,7 +133,7 @@ def test_n_and_qid_extras_in_discard(fake_socket, test_input, expected): def test_n_extra_in_pull(fake_socket, test_input, expected): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.pull(n=test_input) connection.send_all() tag, fields = socket.pop_message() @@ -148,7 +153,7 @@ def test_qid_extra_in_pull(fake_socket, test_input, expected): # python -m pytest tests/unit/io/test_class_bolt4x0.py -s -k test_qid_extra_in_pull address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.pull(qid=test_input) connection.send_all() tag, fields = socket.pop_message() @@ -160,7 +165,7 @@ def test_qid_extra_in_pull(fake_socket, test_input, expected): def test_n_and_qid_extras_in_pull(fake_socket): address = ("127.0.0.1", 7687) socket = fake_socket(address) - connection = Bolt4x0(address, socket) + connection = Bolt4x0(address, socket, PoolConfig.max_connection_lifetime) connection.pull(n=666, qid=777) connection.send_all() tag, fields = socket.pop_message() diff --git a/tests/unit/io/test_direct.py b/tests/unit/io/test_direct.py index 40975292..464605bb 100644 --- a/tests/unit/io/test_direct.py +++ b/tests/unit/io/test_direct.py @@ -25,7 +25,11 @@ Thread, Event, ) -from neo4j import PoolConfig +from neo4j import ( + Config, + PoolConfig, + WorkspaceConfig, +) from neo4j.io import ( Bolt, BoltPool, @@ -76,14 +80,14 @@ def timedout(self): class FakeBoltPool(IOPool): def __init__(self, address, *, auth=None, **config): - self.config = PoolConfig._consume(config) + self.pool_config, self.workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig) if config: raise ValueError("Unexpected config keys: %s" % ", ".join(config.keys())) def opener(addr, timeout): return QuickConnection(FakeSocket(addr)) - super().__init__(opener, self.config) + super().__init__(opener, self.pool_config, self.workspace_config) self.address = address def acquire(self, access_mode=None, timeout=None): diff --git a/tests/unit/io/test_routing.py b/tests/unit/io/test_routing.py index 05a32543..0380468b 100644 --- a/tests/unit/io/test_routing.py +++ b/tests/unit/io/test_routing.py @@ -21,9 +21,17 @@ from unittest import TestCase -from neo4j.io import Bolt, Neo4jPool -from neo4j.routing import OrderedSet, RoutingTable - +from neo4j.io import ( + Bolt, + Neo4jPool, +) +from neo4j.routing import ( + OrderedSet, + RoutingTable, +) +from neo4j.api import ( + DEFAULT_DATABASE, +) VALID_ROUTING_RECORD = { "ttl": 300, @@ -45,10 +53,6 @@ } -def opener(address, error_handler): - return Bolt.open(address, error_handler=error_handler, auth=("neotest", "neotest")) - - class OrderedSetTestCase(TestCase): def test_should_repr_as_set(self): s = OrderedSet([1, 2, 3]) @@ -129,23 +133,29 @@ def test_should_be_able_to_replace(self): class RoutingTableConstructionTestCase(TestCase): def test_should_be_initially_stale(self): - table = RoutingTable() + table = RoutingTable(database=DEFAULT_DATABASE) assert not table.is_fresh(readonly=True) assert not table.is_fresh(readonly=False) class RoutingTableParseRoutingInfoTestCase(TestCase): def test_should_return_routing_table_on_valid_record(self): - table = RoutingTable.parse_routing_info(VALID_ROUTING_RECORD["servers"], - VALID_ROUTING_RECORD["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=VALID_ROUTING_RECORD["servers"], + ttl=VALID_ROUTING_RECORD["ttl"], + ) assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)} assert table.writers == {('127.0.0.1', 9006)} assert table.ttl == 300 def test_should_return_routing_table_on_valid_record_with_extra_role(self): - table = RoutingTable.parse_routing_info(VALID_ROUTING_RECORD_WITH_EXTRA_ROLE["servers"], - VALID_ROUTING_RECORD_WITH_EXTRA_ROLE["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=VALID_ROUTING_RECORD_WITH_EXTRA_ROLE["servers"], + ttl=VALID_ROUTING_RECORD_WITH_EXTRA_ROLE["ttl"], + ) assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} assert table.readers == {('127.0.0.1', 9004), ('127.0.0.1', 9005)} assert table.writers == {('127.0.0.1', 9006)} @@ -162,34 +172,50 @@ def test_should_return_all_distinct_servers_in_routing_table(self): {"role": "WRITE", "addresses": ["127.0.0.1:9002"]}, ], } - table = RoutingTable.parse_routing_info(routing_table["servers"], routing_table["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=routing_table["servers"], + ttl=routing_table["ttl"], + ) assert table.servers() == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003), ('127.0.0.1', 9005)} class RoutingTableFreshnessTestCase(TestCase): def test_should_be_fresh_after_update(self): - table = RoutingTable.parse_routing_info(VALID_ROUTING_RECORD["servers"], - VALID_ROUTING_RECORD["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=VALID_ROUTING_RECORD["servers"], + ttl=VALID_ROUTING_RECORD["ttl"], + ) assert table.is_fresh(readonly=True) assert table.is_fresh(readonly=False) def test_should_become_stale_on_expiry(self): - table = RoutingTable.parse_routing_info(VALID_ROUTING_RECORD["servers"], - VALID_ROUTING_RECORD["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=VALID_ROUTING_RECORD["servers"], + ttl=VALID_ROUTING_RECORD["ttl"], + ) table.ttl = 0 assert not table.is_fresh(readonly=True) assert not table.is_fresh(readonly=False) def test_should_become_stale_if_no_readers(self): - table = RoutingTable.parse_routing_info(VALID_ROUTING_RECORD["servers"], - VALID_ROUTING_RECORD["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=VALID_ROUTING_RECORD["servers"], + ttl=VALID_ROUTING_RECORD["ttl"], + ) table.readers.clear() assert not table.is_fresh(readonly=True) assert table.is_fresh(readonly=False) def test_should_become_stale_if_no_writers(self): - table = RoutingTable.parse_routing_info(VALID_ROUTING_RECORD["servers"], - VALID_ROUTING_RECORD["ttl"]) + table = RoutingTable.parse_routing_info( + database=DEFAULT_DATABASE, + servers=VALID_ROUTING_RECORD["servers"], + ttl=VALID_ROUTING_RECORD["ttl"], + ) table.writers.clear() assert table.is_fresh(readonly=True) assert not table.is_fresh(readonly=False) @@ -198,10 +224,19 @@ def test_should_become_stale_if_no_writers(self): class RoutingTableUpdateTestCase(TestCase): def setUp(self): self.table = RoutingTable( - [("192.168.1.1", 7687), ("192.168.1.2", 7687)], [("192.168.1.3", 7687)], [], 0) + database=DEFAULT_DATABASE, + routers=[("192.168.1.1", 7687), ("192.168.1.2", 7687)], + readers=[("192.168.1.3", 7687)], + writers=[], + ttl=0, + ) self.new_table = RoutingTable( - [("127.0.0.1", 9001), ("127.0.0.1", 9002), ("127.0.0.1", 9003)], - [("127.0.0.1", 9004), ("127.0.0.1", 9005)], [("127.0.0.1", 9006)], 300) + database=DEFAULT_DATABASE, + routers=[("127.0.0.1", 9001), ("127.0.0.1", 9002), ("127.0.0.1", 9003)], + readers=[("127.0.0.1", 9004), ("127.0.0.1", 9005)], + writers=[("127.0.0.1", 9006)], + ttl=300, + ) def test_update_should_replace_routers(self): self.table.update(self.new_table) diff --git a/tests/unit/test_api.py b/tests/unit/test_api.py index 9ca16dcd..d7a3d5c6 100644 --- a/tests/unit/test_api.py +++ b/tests/unit/test_api.py @@ -245,7 +245,7 @@ def test_bookmark_initialization_with_invalid_strings(test_input, expected): (("3",), "3", "Version('3',)"), (("3", "0"), "3.0", "Version('3', '0')"), ((3,), "3", "Version(3,)"), - ((3,0), "3.0", "Version(3, 0)"), + ((3, 0), "3.0", "Version(3, 0)"), ((3, 0, 0), "3.0.0", "Version(3, 0, 0)"), ((3, 0, 0, 0), "3.0.0.0", "Version(3, 0, 0, 0)"), ]