2525import java .util .concurrent .BlockingQueue ;
2626import java .util .concurrent .ConcurrentHashMap ;
2727import java .util .concurrent .LinkedBlockingQueue ;
28+ import java .util .concurrent .atomic .AtomicBoolean ;
2829
2930import org .neo4j .driver .internal .util .Supplier ;
3031
@@ -37,6 +38,8 @@ public class BlockingPooledConnectionQueue
3738 /** The backing queue, keeps track of connections currently in queue */
3839 private final BlockingQueue <PooledConnection > queue ;
3940
41+ private final AtomicBoolean isTerminating = new AtomicBoolean ( false );
42+
4043 /** Keeps track of acquired connections */
4144 private final Set <PooledConnection > acquiredConnections =
4245 Collections .newSetFromMap (new ConcurrentHashMap <PooledConnection , Boolean >());
@@ -54,12 +57,30 @@ public BlockingPooledConnectionQueue( int capacity )
5457 */
5558 public boolean offer ( PooledConnection pooledConnection )
5659 {
57- boolean offer = queue .offer ( pooledConnection );
58- if ( offer )
60+ acquiredConnections .remove ( pooledConnection );
61+ //Don't offer to the queue if we are terminating.
62+ return !isTerminating .get () && queue .offer ( pooledConnection );
63+
64+ }
65+
66+ /**
67+ * Acquire connection or create a new one if the queue is empty
68+ * @param supplier used to create a new connection if queue is empty
69+ * @return a PooledConnection instance
70+ */
71+ public PooledConnection acquire ( Supplier <PooledConnection > supplier )
72+ {
73+ PooledConnection poll = queue .poll ();
74+ if ( poll == null )
5975 {
60- acquiredConnections . remove ( pooledConnection );
76+ poll = supplier . get ( );
6177 }
62- return offer ;
78+ //We are closing down, don't put anything back
79+ if ( isTerminating .get () )
80+ {
81+ acquiredConnections .add ( poll );
82+ }
83+ return poll ;
6384 }
6485
6586 public List <PooledConnection > toList ()
@@ -88,34 +109,21 @@ public boolean contains( PooledConnection pooledConnection )
88109 */
89110 public void terminate ()
90111 {
91- while ( ! queue . isEmpty () )
112+ if ( isTerminating . compareAndSet ( false , true ) )
92113 {
93- PooledConnection conn = queue .poll ();
94- if ( conn != null )
114+ while ( !queue .isEmpty () )
95115 {
96- //close the underlying connection without adding it back to the queue
97- conn .dispose ();
116+ PooledConnection conn = queue .poll ();
117+ if ( conn != null )
118+ {
119+ //close the underlying connection without adding it back to the queue
120+ conn .dispose ();
121+ }
122+ }
123+ for ( PooledConnection pooledConnection : acquiredConnections )
124+ {
125+ pooledConnection .dispose ();
98126 }
99127 }
100- for ( PooledConnection pooledConnection : acquiredConnections )
101- {
102- pooledConnection .dispose ();
103- }
104- }
105-
106- /**
107- * Acquire connection or create a new one if the queue is empty
108- * @param supplier used to create a new connection if queue is empty
109- * @return a PooledConnection instance
110- */
111- public PooledConnection acquire ( Supplier <PooledConnection > supplier )
112- {
113- PooledConnection poll = queue .poll ();
114- if ( poll == null )
115- {
116- poll = supplier .get ();
117- }
118- acquiredConnections .add ( poll );
119- return poll ;
120128 }
121129}
0 commit comments