From 73737174ceebe7b57076ee795df1efe5a9be31ad Mon Sep 17 00:00:00 2001
From: Dmitriy Tverdiakov <11927660+injectives@users.noreply.github.com>
Date: Tue, 5 Oct 2021 11:38:53 +0100
Subject: [PATCH 1/5] Fix sync and async testkit backends launch (#1027)
---
.../channel/handler/TestkitRequestProcessorHandler.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java
index 83b97b85be..bd5d0352ea 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java
@@ -51,13 +51,13 @@ public TestkitRequestProcessorHandler( BackendMode backendMode )
switch ( backendMode )
{
case ASYNC:
- processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
+ processorImpl = TestkitRequest::processAsync;
break;
case REACTIVE:
processorImpl = ( request, state ) -> request.processRx( state ).toFuture();
break;
default:
- processorImpl = TestkitRequest::processAsync;
+ processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
break;
}
}
From 9272db0ea3e7ad1f70828b9e1adecad5daa62963 Mon Sep 17 00:00:00 2001
From: Dmitriy Tverdiakov <11927660+injectives@users.noreply.github.com>
Date: Wed, 13 Oct 2021 12:56:00 +0100
Subject: [PATCH 2/5] Add toString to InternalSummaryCounters (#1031)
---
.../summary/InternalSummaryCounters.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java
index 63bb0c76c3..fb90ea1cb1 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java
@@ -204,4 +204,23 @@ private boolean isPositive( int value )
{
return value > 0;
}
+
+ @Override
+ public String toString()
+ {
+ return "InternalSummaryCounters{" +
+ "nodesCreated=" + nodesCreated +
+ ", nodesDeleted=" + nodesDeleted +
+ ", relationshipsCreated=" + relationshipsCreated +
+ ", relationshipsDeleted=" + relationshipsDeleted +
+ ", propertiesSet=" + propertiesSet +
+ ", labelsAdded=" + labelsAdded +
+ ", labelsRemoved=" + labelsRemoved +
+ ", indexesAdded=" + indexesAdded +
+ ", indexesRemoved=" + indexesRemoved +
+ ", constraintsAdded=" + constraintsAdded +
+ ", constraintsRemoved=" + constraintsRemoved +
+ ", systemUpdates=" + systemUpdates +
+ '}';
+ }
}
From 8fa0211d4b8928cb50e71f6b6932c822d58dbec6 Mon Sep 17 00:00:00 2001
From: Dmitriy Tverdiakov <11927660+injectives@users.noreply.github.com>
Date: Fri, 15 Oct 2021 08:55:25 +0100
Subject: [PATCH 3/5] Make AddressSet retain resolved addresses (#1034)
Routing table address set update may override resolved router address. This leads to routing connection pool closures. This update aims to optimise this.
---
.../driver/internal/cluster/AddressSet.java | 12 ++++++-
.../cluster/RoutingTableHandlerImpl.java | 4 ++-
.../internal/cluster/AddressSetTest.java | 31 +++++++++++++++++++
.../messages/requests/GetRoutingTable.java | 14 +++++----
4 files changed, 53 insertions(+), 8 deletions(-)
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java
index c4cc3f2b20..a08a914ea9 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java
@@ -40,13 +40,23 @@ public int size()
return addresses.length;
}
+ /**
+ * Updates addresses using the provided set.
+ *
+ * It aims to retain existing addresses by checking if they are present in the new set. To benefit from this, the provided set MUST contain specifically
+ * {@link BoltServerAddress} instances with equal host and connection host values.
+ *
+ * @param newAddresses the new address set.
+ */
public synchronized void retainAllAndAdd( Set newAddresses )
{
BoltServerAddress[] addressesArr = new BoltServerAddress[newAddresses.size()];
int insertionIdx = 0;
for ( BoltServerAddress address : addresses )
{
- if ( newAddresses.remove( address ) )
+ BoltServerAddress lookupAddress =
+ BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() );
+ if ( newAddresses.remove( lookupAddress ) )
{
addressesArr[insertionIdx] = address;
insertionIdx++;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java
index 7605b48ee8..0e8b1f0bf2 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java
@@ -112,6 +112,7 @@ private synchronized void freshClusterCompositionFetched( ClusterCompositionLook
{
try
{
+ log.debug( "Fetched cluster composition for database '%s'. %s", databaseName.description(), compositionLookupResult.getClusterComposition() );
routingTable.update( compositionLookupResult.getClusterComposition() );
routingTableRegistry.removeAged();
@@ -142,7 +143,8 @@ private synchronized void freshClusterCompositionFetched( ClusterCompositionLook
private synchronized void clusterCompositionLookupFailed( Throwable error )
{
- log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ), error );
+ log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ),
+ error );
routingTableRegistry.remove( databaseName );
CompletableFuture routingTableFuture = refreshRoutingTableFuture;
refreshRoutingTableFuture = null;
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java
index 57b80fac8a..a91d5676b6 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java
@@ -20,13 +20,18 @@
import org.junit.jupiter.api.Test;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.ResolvedBoltServerAddress;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
class AddressSetTest
{
@@ -142,6 +147,32 @@ void shouldHaveCorrectSize()
assertEquals( 2, addressSet.size() );
}
+ @Test
+ void shouldRetainExistingAddresses()
+ {
+ AddressSet addressSet = new AddressSet();
+ BoltServerAddress address0 = new BoltServerAddress( "node0", 7687 );
+ BoltServerAddress address1 = new ResolvedBoltServerAddress( "node1", 7687, new InetAddress[]{InetAddress.getLoopbackAddress()} );
+ BoltServerAddress address2 = new BoltServerAddress( "node2", 7687 );
+ BoltServerAddress address3 = new BoltServerAddress( "node3", 7687 );
+ BoltServerAddress address4 = new BoltServerAddress( "node4", 7687 );
+ addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( address0, address1, address2, address3, address4 ) ) );
+
+ BoltServerAddress sameAddress0 = new BoltServerAddress( "node0", 7687 );
+ BoltServerAddress sameAddress1 = new BoltServerAddress( "node1", 7687 );
+ BoltServerAddress differentAddress2 = new BoltServerAddress( "different-node2", 7687 );
+ BoltServerAddress sameAddress3 = new BoltServerAddress( "node3", 7687 );
+ BoltServerAddress sameAddress4 = new BoltServerAddress( "node4", 7687 );
+ addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( sameAddress0, sameAddress1, differentAddress2, sameAddress3, sameAddress4 ) ) );
+
+ assertEquals( 5, addressSet.size() );
+ assertSame( addressSet.toArray()[0], address0 );
+ assertSame( addressSet.toArray()[1], address1 );
+ assertSame( addressSet.toArray()[2], address3 );
+ assertSame( addressSet.toArray()[3], address4 );
+ assertSame( addressSet.toArray()[4], differentAddress2 );
+ }
+
private static Set addresses( String... strings )
{
Set set = new LinkedHashSet<>();
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
index 41d2bf0d66..99bea9d694 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
@@ -32,7 +32,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.DatabaseNameUtil;
import org.neo4j.driver.internal.cluster.AddressSet;
@@ -43,6 +42,11 @@
@Getter
public class GetRoutingTable implements TestkitRequest
{
+ private static final Function> ADDRESSES_TO_STRINGS =
+ ( addresses ) -> Arrays.stream( addresses.toArray() )
+ .map( address -> String.format( "%s:%d", address.host(), address.port() ) )
+ .collect( Collectors.toList() );
+
private GetRoutingTableBody data;
@Override
@@ -61,17 +65,15 @@ public TestkitResponse process( TestkitState testkitState )
String.format( "There is no routing table handler for the '%s' database.", databaseName.databaseName().orElse( "null" ) ) ) );
org.neo4j.driver.internal.cluster.RoutingTable routingTable = routingTableHandler.routingTable();
- Function> addressesToStrings = ( addresses ) -> Arrays.stream( addresses.toArray() )
- .map( BoltServerAddress::toString ).collect( Collectors.toList() );
return RoutingTable
.builder()
.data( RoutingTable.RoutingTableBody
.builder()
.database( databaseName.databaseName().orElse( null ) )
- .routers( addressesToStrings.apply( routingTable.routers() ) )
- .readers( addressesToStrings.apply( routingTable.readers() ) )
- .writers( addressesToStrings.apply( routingTable.writers() ) )
+ .routers( ADDRESSES_TO_STRINGS.apply( routingTable.routers() ) )
+ .readers( ADDRESSES_TO_STRINGS.apply( routingTable.readers() ) )
+ .writers( ADDRESSES_TO_STRINGS.apply( routingTable.writers() ) )
.build()
).build();
}
From c9198745c795b6d1bca4939918cd2c9486108bc4 Mon Sep 17 00:00:00 2001
From: Dmitriy Tverdiakov <11927660+injectives@users.noreply.github.com>
Date: Fri, 15 Oct 2021 12:11:54 +0100
Subject: [PATCH 4/5] Improve connection pool concurrent access (#1035)
This update introduces read/write lock for internal pool map access management. For instance, the `retainAll` method is executed with write lock.
---
.../async/pool/ConnectionPoolImpl.java | 153 ++++++++++++------
1 file changed, 107 insertions(+), 46 deletions(-)
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
index 3d3eeafe8f..551066d5a5 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
@@ -22,15 +22,20 @@
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
@@ -63,7 +68,8 @@ public class ConnectionPoolImpl implements ConnectionPool
private final MetricsListener metricsListener;
private final boolean ownsEventLoopGroup;
- private final ConcurrentMap pools = new ConcurrentHashMap<>();
+ private final ReadWriteLock addressToPoolLock = new ReentrantReadWriteLock();
+ private final Map addressToPool = new HashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final CompletableFuture closeFuture = new CompletableFuture<>();
private final ConnectionFactory connectionFactory;
@@ -126,25 +132,32 @@ public CompletionStage acquire( BoltServerAddress address )
@Override
public void retainAll( Set addressesToRetain )
{
- for ( BoltServerAddress address : pools.keySet() )
+ executeWithLock( addressToPoolLock.writeLock(), () ->
{
- if ( !addressesToRetain.contains( address ) )
+ Iterator> entryIterator = addressToPool.entrySet().iterator();
+ while ( entryIterator.hasNext() )
{
- int activeChannels = nettyChannelTracker.inUseChannelCount( address );
- if ( activeChannels == 0 )
+ Map.Entry entry = entryIterator.next();
+ BoltServerAddress address = entry.getKey();
+ if ( !addressesToRetain.contains( address ) )
{
- // address is not present in updated routing table and has no active connections
- // it's now safe to terminate corresponding connection pool and forget about it
- ExtendedChannelPool pool = pools.remove( address );
- if ( pool != null )
+ int activeChannels = nettyChannelTracker.inUseChannelCount( address );
+ if ( activeChannels == 0 )
{
- log.info( "Closing connection pool towards %s, it has no active connections " +
- "and is not in the routing table registry.", address );
- closePoolInBackground( address, pool );
+ // address is not present in updated routing table and has no active connections
+ // it's now safe to terminate corresponding connection pool and forget about it
+ ExtendedChannelPool pool = entry.getValue();
+ entryIterator.remove();
+ if ( pool != null )
+ {
+ log.info( "Closing connection pool towards %s, it has no active connections " +
+ "and is not in the routing table registry.", address );
+ closePoolInBackground( address, pool );
+ }
}
}
}
- }
+ } );
}
@Override
@@ -165,21 +178,26 @@ public CompletionStage close()
if ( closed.compareAndSet( false, true ) )
{
nettyChannelTracker.prepareToCloseChannels();
- CompletableFuture allPoolClosedFuture = closeAllPools();
- // We can only shutdown event loop group when all netty pools are fully closed,
- // otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
- allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
- pools.clear();
- if ( !ownsEventLoopGroup )
- {
- completeWithNullIfNoError( closeFuture, pollCloseError );
- }
- else
- {
- shutdownEventLoopGroup( pollCloseError );
- }
- } );
+ executeWithLockAsync( addressToPoolLock.writeLock(),
+ () ->
+ {
+ // We can only shutdown event loop group when all netty pools are fully closed,
+ // otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
+ return closeAllPools().whenComplete(
+ ( ignored, pollCloseError ) ->
+ {
+ addressToPool.clear();
+ if ( !ownsEventLoopGroup )
+ {
+ completeWithNullIfNoError( closeFuture, pollCloseError );
+ }
+ else
+ {
+ shutdownEventLoopGroup( pollCloseError );
+ }
+ } );
+ } );
}
return closeFuture;
}
@@ -187,13 +205,13 @@ public CompletionStage close()
@Override
public boolean isOpen( BoltServerAddress address )
{
- return pools.containsKey( address );
+ return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.containsKey( address ) );
}
@Override
public String toString()
{
- return "ConnectionPoolImpl{" + "pools=" + pools + '}';
+ return executeWithLock( addressToPoolLock.readLock(), () -> "ConnectionPoolImpl{" + "pools=" + addressToPool + '}' );
}
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
@@ -239,7 +257,7 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend
{
pool.release( channel );
closePoolInBackground( address, pool );
- pools.remove( address );
+ executeWithLock( addressToPoolLock.writeLock(), () -> addressToPool.remove( address ) );
assertNotClosed();
}
}
@@ -247,7 +265,7 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend
// for testing only
ExtendedChannelPool getPool( BoltServerAddress address )
{
- return pools.get( address );
+ return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
}
ExtendedChannelPool newPool( BoltServerAddress address )
@@ -258,12 +276,22 @@ ExtendedChannelPool newPool( BoltServerAddress address )
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
{
- return pools.computeIfAbsent( address, ignored -> {
- ExtendedChannelPool pool = newPool( address );
- // before the connection pool is added I can add the metrics for the pool.
- metricsListener.putPoolMetrics( pool.id(), address, this );
- return pool;
- } );
+ ExtendedChannelPool existingPool = executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
+ return existingPool != null
+ ? existingPool
+ : executeWithLock( addressToPoolLock.writeLock(),
+ () ->
+ {
+ ExtendedChannelPool pool = addressToPool.get( address );
+ if ( pool == null )
+ {
+ pool = newPool( address );
+ // before the connection pool is added I can add the metrics for the pool.
+ metricsListener.putPoolMetrics( pool.id(), address, this );
+ addressToPool.put( address, pool );
+ }
+ return pool;
+ } );
}
private CompletionStage closePool( ExtendedChannelPool pool )
@@ -305,12 +333,45 @@ private void shutdownEventLoopGroup( Throwable pollCloseError )
private CompletableFuture closeAllPools()
{
return CompletableFuture.allOf(
- pools.entrySet().stream().map( entry -> {
- BoltServerAddress address = entry.getKey();
- ExtendedChannelPool pool = entry.getValue();
- log.info( "Closing connection pool towards %s", address );
- // Wait for all pools to be closed.
- return closePool( pool ).toCompletableFuture();
- } ).toArray( CompletableFuture[]::new ) );
+ addressToPool.entrySet().stream()
+ .map( entry ->
+ {
+ BoltServerAddress address = entry.getKey();
+ ExtendedChannelPool pool = entry.getValue();
+ log.info( "Closing connection pool towards %s", address );
+ // Wait for all pools to be closed.
+ return closePool( pool ).toCompletableFuture();
+ } )
+ .toArray( CompletableFuture[]::new ) );
+ }
+
+ private void executeWithLock( Lock lock, Runnable runnable )
+ {
+ executeWithLock( lock, () ->
+ {
+ runnable.run();
+ return null;
+ } );
+ }
+
+ private T executeWithLock( Lock lock, Supplier supplier )
+ {
+ lock.lock();
+ try
+ {
+ return supplier.get();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ private void executeWithLockAsync( Lock lock, Supplier> stageSupplier )
+ {
+ lock.lock();
+ CompletableFuture.completedFuture( lock )
+ .thenCompose( ignored -> stageSupplier.get() )
+ .whenComplete( ( ignored, throwable ) -> lock.unlock() );
}
}
From 6c17025b577adfea3d046786cdc2e175b928e364 Mon Sep 17 00:00:00 2001
From: Dmitriy Tverdiakov <11927660+injectives@users.noreply.github.com>
Date: Tue, 19 Oct 2021 16:28:19 +0100
Subject: [PATCH 5/5] Replace AddressSet with standard Java collections (#1037)
* Replace AddressSet with standard Java collections
This update also prevents unnecessary connection pool closures by keeping `disused` addresses until next routing table update and reusing them.
* Update reader, writer and router addresses handling and change type
---
.../async/pool/ConnectionPoolImpl.java | 34 +---
.../driver/internal/cluster/AddressSet.java | 113 -----------
.../internal/cluster/ClusterRoutingTable.java | 152 +++++++++-----
.../internal/cluster/RediscoveryImpl.java | 4 +-
.../driver/internal/cluster/RoutingTable.java | 29 ++-
.../LeastConnectedLoadBalancingStrategy.java | 26 +--
.../cluster/loadbalancing/LoadBalancer.java | 17 +-
.../loadbalancing/LoadBalancingStrategy.java | 6 +-
.../neo4j/driver/internal/util/LockUtil.java | 61 ++++++
.../internal/cluster/AddressSetTest.java | 185 ------------------
.../internal/cluster/RediscoveryTest.java | 5 +-
.../cluster/RoutingTableHandlerTest.java | 4 +-
...astConnectedLoadBalancingStrategyTest.java | 55 +++---
.../loadbalancing/LoadBalancerTest.java | 28 ++-
.../neo4j/driver/internal/util/Matchers.java | 78 --------
.../messages/requests/GetRoutingTable.java | 11 +-
16 files changed, 272 insertions(+), 536 deletions(-)
delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java
create mode 100644 driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java
delete mode 100644 driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
index 551066d5a5..12ce4d1a9d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
@@ -32,10 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
@@ -54,6 +52,8 @@
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener;
import static org.neo4j.driver.internal.util.Futures.combineErrors;
import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError;
+import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
+import static org.neo4j.driver.internal.util.LockUtil.executeWithLockAsync;
public class ConnectionPoolImpl implements ConnectionPool
{
@@ -344,34 +344,4 @@ private CompletableFuture closeAllPools()
} )
.toArray( CompletableFuture[]::new ) );
}
-
- private void executeWithLock( Lock lock, Runnable runnable )
- {
- executeWithLock( lock, () ->
- {
- runnable.run();
- return null;
- } );
- }
-
- private T executeWithLock( Lock lock, Supplier supplier )
- {
- lock.lock();
- try
- {
- return supplier.get();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- private void executeWithLockAsync( Lock lock, Supplier> stageSupplier )
- {
- lock.lock();
- CompletableFuture.completedFuture( lock )
- .thenCompose( ignored -> stageSupplier.get() )
- .whenComplete( ( ignored, throwable ) -> lock.unlock() );
- }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java
deleted file mode 100644
index a08a914ea9..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [http://neo4j.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.cluster;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.neo4j.driver.internal.BoltServerAddress;
-
-public class AddressSet
-{
- private static final BoltServerAddress[] NONE = {};
-
- private volatile BoltServerAddress[] addresses = NONE;
-
- public BoltServerAddress[] toArray()
- {
- return addresses;
- }
-
- public int size()
- {
- return addresses.length;
- }
-
- /**
- * Updates addresses using the provided set.
- *
- * It aims to retain existing addresses by checking if they are present in the new set. To benefit from this, the provided set MUST contain specifically
- * {@link BoltServerAddress} instances with equal host and connection host values.
- *
- * @param newAddresses the new address set.
- */
- public synchronized void retainAllAndAdd( Set newAddresses )
- {
- BoltServerAddress[] addressesArr = new BoltServerAddress[newAddresses.size()];
- int insertionIdx = 0;
- for ( BoltServerAddress address : addresses )
- {
- BoltServerAddress lookupAddress =
- BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() );
- if ( newAddresses.remove( lookupAddress ) )
- {
- addressesArr[insertionIdx] = address;
- insertionIdx++;
- }
- }
- Iterator addressIterator = newAddresses.iterator();
- for ( ; insertionIdx < addressesArr.length && addressIterator.hasNext(); insertionIdx++ )
- {
- addressesArr[insertionIdx] = addressIterator.next();
- }
- addresses = addressesArr;
- }
-
- public synchronized void replaceIfPresent( BoltServerAddress oldAddress, BoltServerAddress newAddress )
- {
- for ( int i = 0; i < addresses.length; i++ )
- {
- if ( addresses[i].equals( oldAddress ) )
- {
- addresses[i] = newAddress;
- }
- }
- }
-
- public synchronized void remove( BoltServerAddress address )
- {
- BoltServerAddress[] addresses = this.addresses;
- if ( addresses != null )
- {
- for ( int i = 0; i < addresses.length; i++ )
- {
- if ( addresses[i].equals( address ) )
- {
- if ( addresses.length == 1 )
- {
- this.addresses = NONE;
- return;
- }
- BoltServerAddress[] copy = new BoltServerAddress[addresses.length - 1];
- System.arraycopy( addresses, 0, copy, 0, i );
- System.arraycopy( addresses, i + 1, copy, i, addresses.length - i - 1 );
- this.addresses = copy;
- return;
- }
- }
- }
- }
-
- @Override
- public String toString()
- {
- return "AddressSet=" + Arrays.toString( addresses );
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java
index 3604a5ffc9..a1caf44aa0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java
@@ -18,10 +18,15 @@
*/
package org.neo4j.driver.internal.cluster;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.internal.BoltServerAddress;
@@ -30,24 +35,27 @@
import static java.lang.String.format;
import static java.util.Arrays.asList;
+import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
public class ClusterRoutingTable implements RoutingTable
{
private static final int MIN_ROUTERS = 1;
+ private final ReadWriteLock tableLock = new ReentrantReadWriteLock();
+ private final DatabaseName databaseName;
private final Clock clock;
- private volatile long expirationTimestamp;
- private final AddressSet readers;
- private final AddressSet writers;
- private final AddressSet routers;
+ private final Set disused = new HashSet<>();
- private final DatabaseName databaseName; // specifies the database this routing table is acquired for
- private boolean preferInitialRouter;
+ private long expirationTimestamp;
+ private boolean preferInitialRouter = true;
+ private List readers = Collections.emptyList();
+ private List writers = Collections.emptyList();
+ private List routers = Collections.emptyList();
public ClusterRoutingTable( DatabaseName ofDatabase, Clock clock, BoltServerAddress... routingAddresses )
{
this( ofDatabase, clock );
- routers.retainAllAndAdd( new LinkedHashSet<>( asList( routingAddresses ) ) );
+ routers = Collections.unmodifiableList( asList( routingAddresses ) );
}
private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock )
@@ -55,77 +63,85 @@ private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock )
this.databaseName = ofDatabase;
this.clock = clock;
this.expirationTimestamp = clock.millis() - 1;
- this.preferInitialRouter = true;
-
- this.readers = new AddressSet();
- this.writers = new AddressSet();
- this.routers = new AddressSet();
}
@Override
public boolean isStaleFor( AccessMode mode )
{
- return expirationTimestamp < clock.millis() ||
- routers.size() < MIN_ROUTERS ||
- mode == AccessMode.READ && readers.size() == 0 ||
- mode == AccessMode.WRITE && writers.size() == 0;
+ return executeWithLock( tableLock.readLock(), () ->
+ expirationTimestamp < clock.millis() ||
+ routers.size() < MIN_ROUTERS ||
+ mode == AccessMode.READ && readers.size() == 0 ||
+ mode == AccessMode.WRITE && writers.size() == 0 );
}
@Override
public boolean hasBeenStaleFor( long extraTime )
{
- long totalTime = expirationTimestamp + extraTime;
+ long totalTime = executeWithLock( tableLock.readLock(), () -> expirationTimestamp ) + extraTime;
if ( totalTime < 0 )
{
totalTime = Long.MAX_VALUE;
}
- return totalTime < clock.millis();
+ return totalTime < clock.millis();
}
@Override
- public synchronized void update( ClusterComposition cluster )
+ public void update( ClusterComposition cluster )
{
- expirationTimestamp = cluster.expirationTimestamp();
- readers.retainAllAndAdd( cluster.readers() );
- writers.retainAllAndAdd( cluster.writers() );
- routers.retainAllAndAdd( cluster.routers() );
- preferInitialRouter = !cluster.hasWriters();
+ executeWithLock( tableLock.writeLock(), () ->
+ {
+ expirationTimestamp = cluster.expirationTimestamp();
+ readers = newWithReusedAddresses( readers, disused, cluster.readers() );
+ writers = newWithReusedAddresses( writers, disused, cluster.writers() );
+ routers = newWithReusedAddresses( routers, disused, cluster.routers() );
+ disused.clear();
+ preferInitialRouter = !cluster.hasWriters();
+ } );
}
@Override
- public synchronized void forget( BoltServerAddress address )
+ public void forget( BoltServerAddress address )
{
- routers.remove( address );
- readers.remove( address );
- writers.remove( address );
+ executeWithLock( tableLock.writeLock(), () ->
+ {
+ routers = newWithoutAddressIfPresent( routers, address );
+ readers = newWithoutAddressIfPresent( readers, address );
+ writers = newWithoutAddressIfPresent( writers, address );
+ disused.add( address );
+ } );
}
@Override
- public AddressSet readers()
+ public List readers()
{
- return readers;
+ return executeWithLock( tableLock.readLock(), () -> readers );
}
@Override
- public AddressSet writers()
+ public List writers()
{
- return writers;
+ return executeWithLock( tableLock.readLock(), () -> writers );
}
@Override
- public AddressSet routers()
+ public List routers()
{
- return routers;
+ return executeWithLock( tableLock.readLock(), () -> routers );
}
@Override
public Set servers()
{
- Set servers = new HashSet<>();
- Collections.addAll( servers, readers.toArray() );
- Collections.addAll( servers, writers.toArray() );
- Collections.addAll( servers, routers.toArray() );
- return servers;
+ return executeWithLock( tableLock.readLock(), () ->
+ {
+ Set servers = new HashSet<>();
+ servers.addAll( readers );
+ servers.addAll( writers );
+ servers.addAll( routers );
+ servers.addAll( disused );
+ return servers;
+ } );
}
@Override
@@ -137,25 +153,69 @@ public DatabaseName database()
@Override
public void forgetWriter( BoltServerAddress toRemove )
{
- writers.remove( toRemove );
+ executeWithLock( tableLock.writeLock(), () ->
+ {
+ writers = newWithoutAddressIfPresent( writers, toRemove );
+ disused.add( toRemove );
+ } );
}
@Override
public void replaceRouterIfPresent( BoltServerAddress oldRouter, BoltServerAddress newRouter )
{
- routers.replaceIfPresent( oldRouter, newRouter );
+ executeWithLock( tableLock.writeLock(), () -> routers = newWithAddressReplacedIfPresent( routers, oldRouter, newRouter ) );
}
@Override
public boolean preferInitialRouter()
{
- return preferInitialRouter;
+ return executeWithLock( tableLock.readLock(), () -> preferInitialRouter );
}
@Override
- public synchronized String toString()
+ public String toString()
+ {
+ return executeWithLock( tableLock.readLock(), () ->
+ format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'",
+ expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ) );
+ }
+
+ private List newWithoutAddressIfPresent( List addresses, BoltServerAddress addressToSkip )
+ {
+ List newList = new ArrayList<>( addresses.size() );
+ for ( BoltServerAddress address : addresses )
+ {
+ if ( !address.equals( addressToSkip ) )
+ {
+ newList.add( address );
+ }
+ }
+ return Collections.unmodifiableList( newList );
+ }
+
+ private List newWithAddressReplacedIfPresent( List addresses, BoltServerAddress oldAddress,
+ BoltServerAddress newAddress )
+ {
+ List newList = new ArrayList<>( addresses.size() );
+ for ( BoltServerAddress address : addresses )
+ {
+ newList.add( address.equals( oldAddress ) ? newAddress : address );
+ }
+ return Collections.unmodifiableList( newList );
+ }
+
+ private List newWithReusedAddresses( List currentAddresses, Set disusedAddresses,
+ Set newAddresses )
+ {
+ List newList = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() )
+ .filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) )
+ .collect( Collectors.toCollection( () -> new ArrayList<>( newAddresses.size() ) ) );
+ newList.addAll( newAddresses );
+ return Collections.unmodifiableList( newList );
+ }
+
+ private BoltServerAddress toBoltServerAddress( BoltServerAddress address )
{
- return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'",
- expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() );
+ return BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java
index adb06f7372..8e7d37952c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java
@@ -199,10 +199,8 @@ private CompletionStage lookupOnKnownRouters( Ro
Set seenServers, Bookmark bookmark,
Throwable baseError )
{
- BoltServerAddress[] addresses = routingTable.routers().toArray();
-
CompletableFuture result = completedWithNull();
- for ( BoltServerAddress address : addresses )
+ for ( BoltServerAddress address : routingTable.routers() )
{
result = result
.thenCompose(
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java
index 7fa7000bda..04c48d228c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cluster;
+import java.util.List;
import java.util.Set;
import org.neo4j.driver.AccessMode;
@@ -34,12 +35,34 @@ public interface RoutingTable
void forget( BoltServerAddress address );
- AddressSet readers();
+ /**
+ * Returns an immutable list of reader addresses.
+ *
+ * @return the immutable list of reader addresses.
+ */
+ List readers();
- AddressSet writers();
+ /**
+ * Returns an immutable list of writer addresses.
+ *
+ * @return the immutable list of write addresses.
+ */
- AddressSet routers();
+ List writers();
+ /**
+ * Returns an immutable list of router addresses.
+ *
+ * @return the immutable list of router addresses.
+ */
+
+ List routers();
+
+ /**
+ * Returns an immutable unordered set of all addresses known by this routing table. This includes all router, reader, writer and disused addresses.
+ *
+ * @return the immutable set of all addresses.
+ */
Set servers();
DatabaseName database();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java
index fe30c1ae59..3788771d74 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java
@@ -18,15 +18,17 @@
*/
package org.neo4j.driver.internal.cluster.loadbalancing;
-import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.spi.ConnectionPool;
+import java.util.List;
+
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.spi.ConnectionPool;
/**
- * Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from
- * given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent
- * choosing same first address over and over when all addresses have same amount of active connections.
+ * Load balancing strategy that finds server with the least amount of active (checked out of the pool) connections from given readers or writers. It finds a
+ * start index for iteration in a round-robin fashion. This is done to prevent choosing same first address over and over when all addresses have the same amount
+ * of active connections.
*/
public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy
{
@@ -45,21 +47,21 @@ public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool, Loggi
}
@Override
- public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
+ public BoltServerAddress selectReader( List knownReaders )
{
return select( knownReaders, readersIndex, "reader" );
}
@Override
- public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
+ public BoltServerAddress selectWriter( List knownWriters )
{
return select( knownWriters, writersIndex, "writer" );
}
- private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex,
- String addressType )
+ private BoltServerAddress select( List addresses, RoundRobinArrayIndex addressesIndex,
+ String addressType )
{
- int size = addresses.length;
+ int size = addresses.size();
if ( size == 0 )
{
log.trace( "Unable to select %s, no known addresses given", addressType );
@@ -73,10 +75,10 @@ private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArray
BoltServerAddress leastConnectedAddress = null;
int leastActiveConnections = Integer.MAX_VALUE;
- // iterate over the array to find least connected address
+ // iterate over the array to find the least connected address
do
{
- BoltServerAddress address = addresses[index];
+ BoltServerAddress address = addresses.get( index );
int activeConnections = connectionPool.inUseConnections( address );
if ( activeConnections < leastActiveConnections )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
index 16a741efb8..004fe57dcf 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
@@ -35,7 +35,6 @@
import org.neo4j.driver.internal.DomainNameResolver;
import org.neo4j.driver.internal.async.ConnectionContext;
import org.neo4j.driver.internal.async.connection.RoutingConnection;
-import org.neo4j.driver.internal.cluster.AddressSet;
import org.neo4j.driver.internal.cluster.ClusterCompositionProvider;
import org.neo4j.driver.internal.cluster.Rediscovery;
import org.neo4j.driver.internal.cluster.RediscoveryImpl;
@@ -68,6 +67,7 @@ public class LoadBalancer implements ConnectionProvider
"Failed to obtain connection towards %s server. Known routing table is: %s";
private static final String CONNECTION_ACQUISITION_ATTEMPT_FAILURE_MESSAGE =
"Failed to obtain a connection towards address %s, will try other addresses if available. Complete failure is reported separately from this entry.";
+ private static final BoltServerAddress[] BOLT_SERVER_ADDRESSES_EMPTY_ARRAY = new BoltServerAddress[0];
private final ConnectionPool connectionPool;
private final RoutingTableRegistry routingTables;
private final LoadBalancingStrategy loadBalancingStrategy;
@@ -192,16 +192,15 @@ private CompletionStage supportsMultiDb( BoltServerAddress address )
private CompletionStage acquire( AccessMode mode, RoutingTable routingTable )
{
- AddressSet addresses = addressSet( mode, routingTable );
CompletableFuture result = new CompletableFuture<>();
List attemptExceptions = new ArrayList<>();
- acquire( mode, routingTable, addresses, result, attemptExceptions );
+ acquire( mode, routingTable, result, attemptExceptions );
return result;
}
- private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet addresses, CompletableFuture result,
- List attemptErrors )
+ private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFuture result, List attemptErrors )
{
+ List addresses = getAddressesByMode( mode, routingTable );
BoltServerAddress address = selectAddress( mode, addresses );
if ( address == null )
@@ -226,7 +225,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add
log.debug( attemptMessage, error );
attemptErrors.add( error );
routingTable.forget( address );
- eventExecutorGroup.next().execute( () -> acquire( mode, routingTable, addresses, result, attemptErrors ) );
+ eventExecutorGroup.next().execute( () -> acquire( mode, routingTable, result, attemptErrors ) );
}
else
{
@@ -240,7 +239,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add
} );
}
- private static AddressSet addressSet( AccessMode mode, RoutingTable routingTable )
+ private static List getAddressesByMode( AccessMode mode, RoutingTable routingTable )
{
switch ( mode )
{
@@ -253,10 +252,8 @@ private static AddressSet addressSet( AccessMode mode, RoutingTable routingTable
}
}
- private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers )
+ private BoltServerAddress selectAddress( AccessMode mode, List addresses )
{
- BoltServerAddress[] addresses = servers.toArray();
-
switch ( mode )
{
case READ:
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java
index dbaecdb08f..d05189e79c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java
@@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.cluster.loadbalancing;
+import java.util.List;
+
import org.neo4j.driver.internal.BoltServerAddress;
/**
@@ -31,7 +33,7 @@ public interface LoadBalancingStrategy
* @param knownReaders array of all known readers.
* @return most appropriate reader or {@code null} if it can't be selected.
*/
- BoltServerAddress selectReader( BoltServerAddress[] knownReaders );
+ BoltServerAddress selectReader( List knownReaders );
/**
* Select most appropriate write address from the given array of addresses.
@@ -39,5 +41,5 @@ public interface LoadBalancingStrategy
* @param knownWriters array of all known writers.
* @return most appropriate writer or {@code null} if it can't be selected.
*/
- BoltServerAddress selectWriter( BoltServerAddress[] knownWriters );
+ BoltServerAddress selectWriter( List knownWriters );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java
new file mode 100644
index 0000000000..f308921ef3
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+
+public class LockUtil
+{
+ public static void executeWithLock( Lock lock, Runnable runnable )
+ {
+ lock.lock();
+ try
+ {
+ runnable.run();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public static T executeWithLock( Lock lock, Supplier supplier )
+ {
+ lock.lock();
+ try
+ {
+ return supplier.get();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public static void executeWithLockAsync( Lock lock, Supplier> stageSupplier )
+ {
+ lock.lock();
+ CompletableFuture.completedFuture( lock )
+ .thenCompose( ignored -> stageSupplier.get() )
+ .whenComplete( ( ignored, throwable ) -> lock.unlock() );
+ }
+}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java
deleted file mode 100644
index a91d5676b6..0000000000
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [http://neo4j.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.cluster;
-
-import org.junit.jupiter.api.Test;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.ResolvedBoltServerAddress;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-
-class AddressSetTest
-{
- @Test
- void shouldPreserveOrderWhenAdding() throws Exception
- {
- // given
- Set servers = addresses( "one", "two", "tre" );
-
- AddressSet set = new AddressSet();
- set.retainAllAndAdd( servers );
-
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "one" ),
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" )}, set.toArray() );
-
- // when
- servers.add( new BoltServerAddress( "fyr" ) );
- set.retainAllAndAdd( servers );
-
- // then
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "one" ),
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" ),
- new BoltServerAddress( "fyr" )}, set.toArray() );
- }
-
- @Test
- void shouldPreserveOrderWhenRemoving() throws Exception
- {
- // given
- Set servers = addresses( "one", "two", "tre" );
- AddressSet set = new AddressSet();
- set.retainAllAndAdd( servers );
-
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "one" ),
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" )}, set.toArray() );
-
- // when
- set.remove( new BoltServerAddress( "one" ) );
-
- // then
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" )}, set.toArray() );
- }
-
- @Test
- void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception
- {
- // given
- Set servers = addresses( "one", "two", "tre" );
- AddressSet set = new AddressSet();
- set.retainAllAndAdd( servers );
-
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "one" ),
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" )}, set.toArray() );
-
- // when
- servers.remove( new BoltServerAddress( "one" ) );
- set.retainAllAndAdd( servers );
-
- // then
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" )}, set.toArray() );
- }
-
- @Test
- void shouldExposeEmptyArrayWhenEmpty()
- {
- AddressSet addressSet = new AddressSet();
-
- BoltServerAddress[] addresses = addressSet.toArray();
-
- assertEquals( 0, addresses.length );
- }
-
- @Test
- void shouldExposeCorrectArray()
- {
- AddressSet addressSet = new AddressSet();
- addressSet.retainAllAndAdd( addresses( "one", "two", "tre" ) );
-
- BoltServerAddress[] addresses = addressSet.toArray();
-
- assertArrayEquals( new BoltServerAddress[]{
- new BoltServerAddress( "one" ),
- new BoltServerAddress( "two" ),
- new BoltServerAddress( "tre" )}, addresses );
- }
-
- @Test
- void shouldHaveSizeZeroWhenEmpty()
- {
- AddressSet addressSet = new AddressSet();
-
- assertEquals( 0, addressSet.size() );
- }
-
- @Test
- void shouldHaveCorrectSize()
- {
- AddressSet addressSet = new AddressSet();
- addressSet.retainAllAndAdd( addresses( "one", "two" ) );
-
- assertEquals( 2, addressSet.size() );
- }
-
- @Test
- void shouldRetainExistingAddresses()
- {
- AddressSet addressSet = new AddressSet();
- BoltServerAddress address0 = new BoltServerAddress( "node0", 7687 );
- BoltServerAddress address1 = new ResolvedBoltServerAddress( "node1", 7687, new InetAddress[]{InetAddress.getLoopbackAddress()} );
- BoltServerAddress address2 = new BoltServerAddress( "node2", 7687 );
- BoltServerAddress address3 = new BoltServerAddress( "node3", 7687 );
- BoltServerAddress address4 = new BoltServerAddress( "node4", 7687 );
- addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( address0, address1, address2, address3, address4 ) ) );
-
- BoltServerAddress sameAddress0 = new BoltServerAddress( "node0", 7687 );
- BoltServerAddress sameAddress1 = new BoltServerAddress( "node1", 7687 );
- BoltServerAddress differentAddress2 = new BoltServerAddress( "different-node2", 7687 );
- BoltServerAddress sameAddress3 = new BoltServerAddress( "node3", 7687 );
- BoltServerAddress sameAddress4 = new BoltServerAddress( "node4", 7687 );
- addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( sameAddress0, sameAddress1, differentAddress2, sameAddress3, sameAddress4 ) ) );
-
- assertEquals( 5, addressSet.size() );
- assertSame( addressSet.toArray()[0], address0 );
- assertSame( addressSet.toArray()[1], address1 );
- assertSame( addressSet.toArray()[2], address3 );
- assertSame( addressSet.toArray()[3], address4 );
- assertSame( addressSet.toArray()[4], differentAddress2 );
- }
-
- private static Set addresses( String... strings )
- {
- Set set = new LinkedHashSet<>();
- for ( String string : strings )
- {
- set.add( new BoltServerAddress( string ) );
- }
- return set;
- }
-}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java
index 0987774f0b..1aba6ecf17 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -507,9 +508,7 @@ private static RoutingTable routingTableMock( BoltServerAddress... routers )
private static RoutingTable routingTableMock( boolean preferInitialRouter, BoltServerAddress... routers )
{
RoutingTable routingTable = mock( RoutingTable.class );
- AddressSet addressSet = new AddressSet();
- addressSet.retainAllAndAdd( asOrderedSet( routers ) );
- when( routingTable.routers() ).thenReturn( addressSet );
+ when( routingTable.routers() ).thenReturn( Arrays.asList( routers ) );
when( routingTable.database() ).thenReturn( defaultDatabase() );
when( routingTable.preferInitialRouter() ).thenReturn( preferInitialRouter );
return routingTable;
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java
index b725731639..702ce41774 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
@@ -259,8 +260,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode )
RoutingTable routingTable = mock( RoutingTable.class );
when( routingTable.isStaleFor( mode ) ).thenReturn( true );
- AddressSet addresses = new AddressSet();
- addresses.retainAllAndAdd( new HashSet<>( singletonList( LOCAL_DEFAULT ) ) );
+ List addresses = singletonList( LOCAL_DEFAULT );
when( routingTable.readers() ).thenReturn( addresses );
when( routingTable.writers() ).thenReturn( addresses );
when( routingTable.database() ).thenReturn( defaultDatabase() );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java
index c2592e4a51..856ae881a0 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java
@@ -22,10 +22,13 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
-import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.spi.ConnectionPool;
+import java.util.Arrays;
+import java.util.Collections;
+
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.spi.ConnectionPool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -54,15 +57,15 @@ void setUp()
}
@Test
- void shouldHandleEmptyReadersArray()
+ void shouldHandleEmptyReaders()
{
- assertNull( strategy.selectReader( new BoltServerAddress[0] ) );
+ assertNull( strategy.selectReader( Collections.emptyList() ) );
}
@Test
- void shouldHandleEmptyWritersArray()
+ void shouldHandleEmptyWriters()
{
- assertNull( strategy.selectWriter( new BoltServerAddress[0] ) );
+ assertNull( strategy.selectWriter( Collections.emptyList() ) );
}
@Test
@@ -70,7 +73,7 @@ void shouldHandleSingleReaderWithoutActiveConnections()
{
BoltServerAddress address = new BoltServerAddress( "reader", 9999 );
- assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) );
+ assertEquals( address, strategy.selectReader( Collections.singletonList( address ) ) );
}
@Test
@@ -78,7 +81,7 @@ void shouldHandleSingleWriterWithoutActiveConnections()
{
BoltServerAddress address = new BoltServerAddress( "writer", 9999 );
- assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) );
+ assertEquals( address, strategy.selectWriter( Collections.singletonList( address ) ) );
}
@Test
@@ -87,7 +90,7 @@ void shouldHandleSingleReaderWithActiveConnections()
BoltServerAddress address = new BoltServerAddress( "reader", 9999 );
when( connectionPool.inUseConnections( address ) ).thenReturn( 42 );
- assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) );
+ assertEquals( address, strategy.selectReader( Collections.singletonList( address ) ) );
}
@Test
@@ -96,7 +99,7 @@ void shouldHandleSingleWriterWithActiveConnections()
BoltServerAddress address = new BoltServerAddress( "writer", 9999 );
when( connectionPool.inUseConnections( address ) ).thenReturn( 24 );
- assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) );
+ assertEquals( address, strategy.selectWriter( Collections.singletonList( address ) ) );
}
@Test
@@ -110,7 +113,7 @@ void shouldHandleMultipleReadersWithActiveConnections()
when( connectionPool.inUseConnections( address2 ) ).thenReturn( 4 );
when( connectionPool.inUseConnections( address3 ) ).thenReturn( 1 );
- assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
+ assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
}
@Test
@@ -127,7 +130,7 @@ void shouldHandleMultipleWritersWithActiveConnections()
when( connectionPool.inUseConnections( address4 ) ).thenReturn( 1 );
assertEquals( address3,
- strategy.selectWriter( new BoltServerAddress[]{address1, address2, address3, address4} ) );
+ strategy.selectWriter( Arrays.asList( address1, address2, address3, address4 ) ) );
}
@Test
@@ -137,13 +140,13 @@ void shouldReturnDifferentReaderOnEveryInvocationWhenNoActiveConnections()
BoltServerAddress address2 = new BoltServerAddress( "reader", 2 );
BoltServerAddress address3 = new BoltServerAddress( "reader", 3 );
- assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
- assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
- assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
+ assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
+ assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
+ assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
- assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
- assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
- assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
+ assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
+ assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
+ assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) );
}
@Test
@@ -152,11 +155,11 @@ void shouldReturnDifferentWriterOnEveryInvocationWhenNoActiveConnections()
BoltServerAddress address1 = new BoltServerAddress( "writer", 1 );
BoltServerAddress address2 = new BoltServerAddress( "writer", 2 );
- assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
- assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
+ assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2 ) ) );
+ assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2 ) ) );
- assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
- assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
+ assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2 ) ) );
+ assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2 ) ) );
}
@Test
@@ -168,8 +171,8 @@ void shouldTraceLogWhenNoAddressSelected()
LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging );
- strategy.selectReader( new BoltServerAddress[0] );
- strategy.selectWriter( new BoltServerAddress[0] );
+ strategy.selectReader( Collections.emptyList() );
+ strategy.selectWriter( Collections.emptyList() );
verify( logger ).trace( startsWith( "Unable to select" ), eq( "reader" ) );
verify( logger ).trace( startsWith( "Unable to select" ), eq( "writer" ) );
@@ -186,8 +189,8 @@ void shouldTraceLogSelectedAddress()
LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging );
- strategy.selectReader( new BoltServerAddress[]{A} );
- strategy.selectWriter( new BoltServerAddress[]{A} );
+ strategy.selectReader( Collections.singletonList( A ) );
+ strategy.selectWriter( Collections.singletonList( A ) );
verify( logger ).trace( startsWith( "Selected" ), eq( "reader" ), eq( A ), eq( 42 ) );
verify( logger ).trace( startsWith( "Selected" ), eq( "writer" ), eq( A ), eq( 42 ) );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
index 520c81946c..9c530ec282 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
@@ -25,8 +25,10 @@
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -39,7 +41,6 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.ConnectionContext;
import org.neo4j.driver.internal.async.connection.RoutingConnection;
-import org.neo4j.driver.internal.cluster.AddressSet;
import org.neo4j.driver.internal.cluster.ClusterComposition;
import org.neo4j.driver.internal.cluster.ClusterRoutingTable;
import org.neo4j.driver.internal.cluster.Rediscovery;
@@ -96,10 +97,8 @@ void returnsCorrectAccessMode( AccessMode mode )
{
ConnectionPool connectionPool = newConnectionPoolMock();
RoutingTable routingTable = mock( RoutingTable.class );
- AddressSet readerAddresses = mock( AddressSet.class );
- AddressSet writerAddresses = mock( AddressSet.class );
- when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} );
- when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{B} );
+ List readerAddresses = Collections.singletonList( A );
+ List writerAddresses = Collections.singletonList( B );
when( routingTable.readers() ).thenReturn( readerAddresses );
when( routingTable.writers() ).thenReturn( writerAddresses );
@@ -117,8 +116,7 @@ void returnsCorrectDatabaseName( String databaseName )
{
ConnectionPool connectionPool = newConnectionPoolMock();
RoutingTable routingTable = mock( RoutingTable.class );
- AddressSet writerAddresses = mock( AddressSet.class );
- when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} );
+ List writerAddresses = Collections.singletonList( A );
when( routingTable.writers() ).thenReturn( writerAddresses );
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable );
@@ -135,15 +133,17 @@ void shouldThrowWhenRediscoveryReturnsNoSuitableServers()
{
ConnectionPool connectionPool = newConnectionPoolMock();
RoutingTable routingTable = mock( RoutingTable.class );
- when( routingTable.readers() ).thenReturn( new AddressSet() );
- when( routingTable.writers() ).thenReturn( new AddressSet() );
+ when( routingTable.readers() ).thenReturn( Collections.emptyList() );
+ when( routingTable.writers() ).thenReturn( Collections.emptyList() );
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable );
- SessionExpiredException error1 = assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( READ ) ) ) );
+ SessionExpiredException error1 =
+ assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( READ ) ) ) );
assertThat( error1.getMessage(), startsWith( "Failed to obtain connection towards READ server" ) );
- SessionExpiredException error2 = assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( WRITE ) ) ) );
+ SessionExpiredException error2 =
+ assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( WRITE ) ) ) );
assertThat( error2.getMessage(), startsWith( "Failed to obtain connection towards WRITE server" ) );
}
@@ -157,8 +157,7 @@ void shouldSelectLeastConnectedAddress()
when( connectionPool.inUseConnections( C ) ).thenReturn( 0 );
RoutingTable routingTable = mock( RoutingTable.class );
- AddressSet readerAddresses = mock( AddressSet.class );
- when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A, B, C} );
+ List readerAddresses = Arrays.asList( A, B, C );
when( routingTable.readers() ).thenReturn( readerAddresses );
@@ -182,8 +181,7 @@ void shouldRoundRobinWhenNoActiveConnections()
ConnectionPool connectionPool = newConnectionPoolMock();
RoutingTable routingTable = mock( RoutingTable.class );
- AddressSet readerAddresses = mock( AddressSet.class );
- when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A, B, C} );
+ List readerAddresses = Arrays.asList( A, B, C );
when( routingTable.readers() ).thenReturn( readerAddresses );
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java
index 75e85df9e4..33c3897f42 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java
@@ -32,8 +32,6 @@
import org.neo4j.driver.internal.InternalDriver;
import org.neo4j.driver.internal.SessionFactory;
import org.neo4j.driver.internal.SessionFactoryImpl;
-import org.neo4j.driver.internal.cluster.AddressSet;
-import org.neo4j.driver.internal.cluster.RoutingTable;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.summary.ResultSummary;
@@ -44,69 +42,6 @@ private Matchers()
{
}
- public static Matcher containsRouter( final BoltServerAddress address )
- {
- return new TypeSafeMatcher()
- {
- @Override
- protected boolean matchesSafely( RoutingTable routingTable )
- {
- BoltServerAddress[] addresses = routingTable.routers().toArray();
-
- for ( BoltServerAddress currentAddress : addresses )
- {
- if ( currentAddress.equals( address ) )
- {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void describeTo( Description description )
- {
- description.appendText( "routing table that contains router " ).appendValue( address );
- }
- };
- }
-
- public static Matcher containsReader( final BoltServerAddress address )
- {
- return new TypeSafeMatcher()
- {
- @Override
- protected boolean matchesSafely( RoutingTable routingTable )
- {
- return contains( routingTable.readers(), address );
- }
-
- @Override
- public void describeTo( Description description )
- {
- description.appendText( "routing table that contains reader " ).appendValue( address );
- }
- };
- }
-
- public static Matcher containsWriter( final BoltServerAddress address )
- {
- return new TypeSafeMatcher()
- {
- @Override
- protected boolean matchesSafely( RoutingTable routingTable )
- {
- return contains( routingTable.writers(), address );
- }
-
- @Override
- public void describeTo( Description description )
- {
- description.appendText( "routing table that contains writer " ).appendValue( address );
- }
- };
- }
-
public static Matcher directDriver()
{
return new TypeSafeMatcher()
@@ -273,19 +208,6 @@ public void describeTo( Description description )
};
}
- private static boolean contains( AddressSet set, BoltServerAddress address )
- {
- BoltServerAddress[] addresses = set.toArray();
- for ( BoltServerAddress currentAddress : addresses )
- {
- if ( currentAddress.equals( address ) )
- {
- return true;
- }
- }
- return false;
- }
-
private static boolean hasConnectionProvider( Driver driver, Class extends ConnectionProvider> providerClass )
{
return extractConnectionProvider( driver, providerClass ) != null;
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
index 99bea9d694..0be682d5e3 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
@@ -25,16 +25,15 @@
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.DatabaseNameUtil;
-import org.neo4j.driver.internal.cluster.AddressSet;
import org.neo4j.driver.internal.cluster.RoutingTableHandler;
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
@@ -42,10 +41,10 @@
@Getter
public class GetRoutingTable implements TestkitRequest
{
- private static final Function> ADDRESSES_TO_STRINGS =
- ( addresses ) -> Arrays.stream( addresses.toArray() )
- .map( address -> String.format( "%s:%d", address.host(), address.port() ) )
- .collect( Collectors.toList() );
+ private static final Function,List> ADDRESSES_TO_STRINGS =
+ ( addresses ) -> addresses.stream()
+ .map( address -> String.format( "%s:%d", address.host(), address.port() ) )
+ .collect( Collectors.toList() );
private GetRoutingTableBody data;