@@ -94,18 +94,22 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra
9494
9595 public T acquire ( long timeout , TimeUnit unit ) throws InterruptedException
9696 {
97+ assert live .size () <= maxSize ;
9798 long deadline = clock .millis () + unit .toMillis ( timeout );
9899
99100 // 1. Try and value an object from our local slot
100101 Slot <T > slot = local .get ();
101102
102- if ( slot != null && slot .availableToClaimed () )
103+ if ( slot != null && slot .availableToThreadLocalClaimed () )
103104 {
104105 if ( slot .isValid ( validationStrategy ) )
105106 {
106107 allocator .onAcquire ( slot .value );
107108 return slot .value ;
108109 }
110+ else {
111+ dispose ( slot );
112+ }
109113
110114 //The slot was invalidated however we cannot put it to the
111115 //disposed queue yet since it already exists in the live queue
@@ -177,14 +181,32 @@ else if ( slot.isClaimedOrAvailableToClaimed() )
177181
178182 // Keep this slot cached with our thread, so that we can grab this value quickly next time,
179183 // assuming threads generally availableToClaimed one instance at a time
180- local . set ( slot );
184+ updateThreadLocal ( slot );
181185 allocator .onAcquire ( slot .value );
182186 return slot .value ;
183187 }
184188
189+ private void updateThreadLocal (Slot <T > slot )
190+ {
191+ Slot <T > localSlot = local .get ();
192+ if ( localSlot != null )
193+ {
194+ //The old slot is no longer in the tread local
195+ localSlot .threadLocalClaimedToClaimed ();
196+ }
197+ else
198+ {
199+ //There was nothing stored in thread local
200+ //no we must also add this slot to the live queue
201+ live .add ( slot );
202+ }
203+ slot .claimByThreadLocal ();
204+ local .set ( slot );
205+ }
206+
185207 private void dispose ( Slot <T > slot )
186208 {
187- if ( slot .claimedToDisposed () )
209+ if ( slot .claimedToDisposed () || slot . threadLocalClaimedToDisposed () )
188210 {
189211 // Done before below, in case dispose call fails. This is safe since objects on the
190212 // pool are used for read-only operations
@@ -213,7 +235,7 @@ private Slot<T> allocate( int slotIndex )
213235 // Return it :)
214236 return slot ;
215237 }
216- catch ( Neo4jException e )
238+ catch ( Neo4jException e )
217239 {
218240 // Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
219241 slot .claimedToDisposed ();
@@ -230,7 +252,7 @@ private Consumer<T> createDisposeCallback( final Slot<T> slot )
230252 public void accept ( T t )
231253 {
232254 slot .updateUsageTimestamp ();
233- if ( !slot .isValid ( validationStrategy ) )
255+ if ( !slot .isValid ( validationStrategy ) )
234256 {
235257 dispose ( slot );
236258 return ;
@@ -256,6 +278,17 @@ public void accept( T t )
256278 }
257279 }
258280 }
281+
282+ // If we are claimed by thread local we are already in the live queue
283+ if ( slot .threadLocalClaimedToAvailable () && stopped .get () )
284+ {
285+ // As above, try to claim the slot back and dispose
286+ if ( slot .availableToClaimed () )
287+ {
288+ dispose ( slot );
289+ }
290+ }
291+
259292 }
260293 };
261294 }
@@ -286,6 +319,7 @@ class Slot<T>
286319 enum State
287320 {
288321 AVAILABLE ,
322+ THREAD_LOCAL_CLAIMED ,
289323 CLAIMED ,
290324 DISPOSED
291325 }
@@ -318,6 +352,11 @@ public boolean availableToClaimed()
318352 return state .compareAndSet ( State .AVAILABLE , State .CLAIMED );
319353 }
320354
355+ public boolean availableToThreadLocalClaimed ()
356+ {
357+ return state .compareAndSet ( State .AVAILABLE , State .THREAD_LOCAL_CLAIMED );
358+ }
359+
321360 public boolean claimedToAvailable ()
322361 {
323362 updateUsageTimestamp ();
@@ -329,6 +368,26 @@ public boolean claimedToDisposed()
329368 return state .compareAndSet ( State .CLAIMED , State .DISPOSED );
330369 }
331370
371+ public boolean threadLocalClaimedToDisposed ()
372+ {
373+ return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .DISPOSED );
374+ }
375+
376+ public boolean threadLocalClaimedToClaimed ()
377+ {
378+ return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .CLAIMED );
379+ }
380+
381+ public boolean threadLocalClaimedToAvailable ()
382+ {
383+ return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .AVAILABLE );
384+ }
385+
386+ public void claimByThreadLocal ()
387+ {
388+ state .set ( State .THREAD_LOCAL_CLAIMED );
389+ }
390+
332391 public boolean isClaimedOrAvailableToClaimed ()
333392 {
334393 return availableToClaimed () || state .get () == State .CLAIMED ;
0 commit comments