1818 */
1919package org .neo4j .driver .internal .net .pooling ;
2020
21- import java .util .ArrayList ;
2221import java .util .List ;
2322import java .util .Map ;
24- import java .util .concurrent .BlockingQueue ;
2523import java .util .concurrent .ConcurrentHashMap ;
26- import java .util .concurrent .LinkedBlockingQueue ;
2724import java .util .concurrent .atomic .AtomicBoolean ;
2825
2926import org .neo4j .driver .internal .ConnectionSettings ;
3532import org .neo4j .driver .internal .spi .Connection ;
3633import org .neo4j .driver .internal .spi .ConnectionPool ;
3734import org .neo4j .driver .internal .util .Clock ;
35+ import org .neo4j .driver .internal .util .Supplier ;
3836import org .neo4j .driver .v1 .AuthToken ;
3937import org .neo4j .driver .v1 .AuthTokens ;
4038import org .neo4j .driver .v1 .Logging ;
4846 * try to return the session into the session pool, however if we failed to return it back, either because the pool
4947 * is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the
5048 * session.
51- *
49+ * <p>
5250 * The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to
5351 * pool) when the work with the session has been done.
54- *
52+ * <p>
5553 * The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool
5654 * at the same time.
5755 */
@@ -60,7 +58,8 @@ public class SocketConnectionPool implements ConnectionPool
6058 /**
6159 * Pools, organized by server address.
6260 */
63- private final ConcurrentHashMap <BoltServerAddress ,BlockingQueue <PooledConnection >> pools = new ConcurrentHashMap <>();
61+ private final ConcurrentHashMap <BoltServerAddress ,BlockingPooledConnectionQueue > pools =
62+ new ConcurrentHashMap <>();
6463
6564 private final Clock clock = Clock .SYSTEM ;
6665
@@ -73,7 +72,7 @@ public class SocketConnectionPool implements ConnectionPool
7372 private final AtomicBoolean stopped = new AtomicBoolean ( false );
7473
7574 public SocketConnectionPool ( ConnectionSettings connectionSettings , SecurityPlan securityPlan ,
76- PoolSettings poolSettings , Logging logging )
75+ PoolSettings poolSettings , Logging logging )
7776 {
7877 this .connectionSettings = connectionSettings ;
7978 this .securityPlan = securityPlan ;
@@ -94,41 +93,48 @@ private Connection connect( BoltServerAddress address ) throws ClientException
9493
9594 private static Map <String ,Value > tokenAsMap ( AuthToken token )
9695 {
97- if ( token instanceof InternalAuthToken )
96+ if ( token instanceof InternalAuthToken )
9897 {
9998 return ((InternalAuthToken ) token ).toMap ();
10099 }
101100 else
102101 {
103- throw new ClientException ( "Unknown authentication token, `" + token + "`. Please use one of the supported " +
102+ throw new ClientException (
103+ "Unknown authentication token, `" + token + "`. Please use one of the supported " +
104104 "tokens from `" + AuthTokens .class .getSimpleName () + "`." );
105105 }
106106 }
107107
108108 @ Override
109- public Connection acquire ( BoltServerAddress address )
109+ public Connection acquire ( final BoltServerAddress address )
110110 {
111111 if ( stopped .get () )
112112 {
113113 throw new IllegalStateException ( "Pool has been closed, cannot acquire new values." );
114114 }
115- BlockingQueue <PooledConnection > connections = pool ( address );
116- PooledConnection conn = connections .poll ();
117- if ( conn == null )
115+ final BlockingPooledConnectionQueue connections = pool ( address );
116+ Supplier <PooledConnection > supplier = new Supplier <PooledConnection >()
118117 {
119- conn = new PooledConnection ( connect ( address ), new
120- PooledConnectionReleaseConsumer ( connections , stopped , new PooledConnectionValidator ( this , poolSettings ) ), clock );
121- }
118+ @ Override
119+ public PooledConnection get ()
120+ {
121+ return new PooledConnection ( connect ( address ), new
122+ PooledConnectionReleaseConsumer ( connections , stopped ,
123+ new PooledConnectionValidator ( SocketConnectionPool .this , poolSettings ) ), clock );
124+
125+ }
126+ };
127+ PooledConnection conn = connections .acquire ( supplier );
122128 conn .updateTimestamp ();
123129 return conn ;
124130 }
125131
126- private BlockingQueue < PooledConnection > pool ( BoltServerAddress address )
132+ private BlockingPooledConnectionQueue pool ( BoltServerAddress address )
127133 {
128- BlockingQueue < PooledConnection > pool = pools .get ( address );
134+ BlockingPooledConnectionQueue pool = pools .get ( address );
129135 if ( pool == null )
130136 {
131- pool = new LinkedBlockingQueue <>( poolSettings .maxIdleConnectionPoolSize ());
137+ pool = new BlockingPooledConnectionQueue ( poolSettings .maxIdleConnectionPoolSize () );
132138
133139 if ( pools .putIfAbsent ( address , pool ) != null )
134140 {
@@ -142,19 +148,13 @@ private BlockingQueue<PooledConnection> pool( BoltServerAddress address )
142148 @ Override
143149 public void purge ( BoltServerAddress address )
144150 {
145- BlockingQueue < PooledConnection > connections = pools .remove ( address );
151+ BlockingPooledConnectionQueue connections = pools .remove ( address );
146152 if ( connections == null )
147153 {
148154 return ;
149155 }
150- while (!connections .isEmpty ())
151- {
152- PooledConnection connection = connections .poll ();
153- if ( connection != null )
154- {
155- connection .dispose ();
156- }
157- }
156+
157+ connections .terminate ();
158158 }
159159
160160 @ Override
@@ -166,41 +166,34 @@ public boolean hasAddress( BoltServerAddress address )
166166 @ Override
167167 public void close ()
168168 {
169- if ( !stopped .compareAndSet ( false , true ) )
169+ if ( !stopped .compareAndSet ( false , true ) )
170170 {
171171 // already closed or some other thread already started close
172172 return ;
173173 }
174174
175- for ( BlockingQueue < PooledConnection > pool : pools .values () )
175+ for ( BlockingPooledConnectionQueue pool : pools .values () )
176176 {
177- while ( !pool .isEmpty () )
178- {
179- PooledConnection conn = pool .poll ();
180- if ( conn != null )
181- {
182- //close the underlying connection without adding it back to the queue
183- conn .dispose ();
184- }
185- }
177+ pool .terminate ();
186178 }
187179
188180 pools .clear ();
189181 }
190182
183+
191184 //for testing
192- public List <PooledConnection > connectionsForAddress (BoltServerAddress address )
185+ public List <PooledConnection > connectionsForAddress ( BoltServerAddress address )
193186 {
194- LinkedBlockingQueue <PooledConnection > pooledConnections =
195- (LinkedBlockingQueue <PooledConnection >) pools .get ( address );
196- if (pooledConnections == null )
187+ BlockingPooledConnectionQueue pooledConnections = pools .get ( address );
188+ if ( pooledConnections == null )
197189 {
198190 return emptyList ();
199191 }
200192 else
201193 {
202- return new ArrayList <>( pooledConnections );
194+ return pooledConnections . toList ( );
203195 }
204196 }
205197
198+
206199}
0 commit comments