@@ -94,24 +94,25 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra
9494
9595 public T acquire ( long timeout , TimeUnit unit ) throws InterruptedException
9696 {
97+ assert live .size () <= maxSize ;
9798 long deadline = clock .millis () + unit .toMillis ( timeout );
9899
99100 // 1. Try and value an object from our local slot
100101 Slot <T > slot = local .get ();
101102
102- if ( slot != null && slot .availableToClaimed () )
103+ if ( slot != null && slot .availableToThreadLocalClaimed () )
103104 {
104105 if ( slot .isValid ( validationStrategy ) )
105106 {
106107 allocator .onAcquire ( slot .value );
107108 return slot .value ;
108109 }
109- else
110- {
111- // We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
112- // and go to the global pool.
110+ else {
113111 dispose ( slot );
114112 }
113+
114+ //The slot was invalidated however we cannot put it to the
115+ //disposed queue yet since it already exists in the live queue
115116 }
116117
117118 // 2. If that fails, acquire from big pool
@@ -133,17 +134,18 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
133134 if ( slot != null )
134135 {
135136 // Yay, got a slot - can we keep it?
136- if ( slot .availableToClaimed ( ) )
137+ if ( slot .isValid ( validationStrategy ) )
137138 {
138- if ( slot .isValid ( validationStrategy ) )
139+ if ( slot .availableToClaimed ( ) )
139140 {
140141 break ;
141142 }
142- else
143- {
144- // We've acquired the slot, but the validation strategy says it's time for it to die.
145- dispose ( slot );
146- }
143+ }
144+ // We've acquired the slot, but the validation strategy says it's time for it to die.
145+ // Either the slot is already claimed or if it is available make it claimed
146+ else if ( slot .isClaimedOrAvailableToClaimed () )
147+ {
148+ dispose ( slot );
147149 }
148150 }
149151 else
@@ -179,22 +181,38 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
179181
180182 // Keep this slot cached with our thread, so that we can grab this value quickly next time,
181183 // assuming threads generally availableToClaimed one instance at a time
182- local . set ( slot );
184+ updateThreadLocal ( slot );
183185 allocator .onAcquire ( slot .value );
184186 return slot .value ;
185187 }
186188
187- private void dispose ( Slot <T > slot )
189+ private void updateThreadLocal ( Slot <T > slot )
188190 {
189- if ( !slot .claimedToDisposed () )
191+ Slot <T > localSlot = local .get ();
192+ if ( localSlot != null )
190193 {
191- throw new IllegalStateException ( "Cannot dispose unclaimed pool object: " + slot );
194+ //The old slot is no longer in the tread local
195+ localSlot .threadLocalClaimedToClaimed ();
192196 }
197+ else
198+ {
199+ //There was nothing stored in thread local
200+ //no we must also add this slot to the live queue
201+ live .add ( slot );
202+ }
203+ slot .claimByThreadLocal ();
204+ local .set ( slot );
205+ }
193206
194- // Done before below, in case dispose call fails. This is safe since objects on the
195- // pool are used for read-only operations
196- disposed .add ( slot );
197- allocator .onDispose ( slot .value );
207+ private void dispose ( Slot <T > slot )
208+ {
209+ if ( slot .claimedToDisposed () || slot .threadLocalClaimedToDisposed () )
210+ {
211+ // Done before below, in case dispose call fails. This is safe since objects on the
212+ // pool are used for read-only operations
213+ disposed .add ( slot );
214+ allocator .onDispose ( slot .value );
215+ }
198216 }
199217
200218 /**
@@ -217,7 +235,7 @@ private Slot<T> allocate( int slotIndex )
217235 // Return it :)
218236 return slot ;
219237 }
220- catch ( Neo4jException e )
238+ catch ( Neo4jException e )
221239 {
222240 // Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
223241 slot .claimedToDisposed ();
@@ -236,33 +254,41 @@ public void accept( T t )
236254 slot .updateUsageTimestamp ();
237255 if ( !slot .isValid ( validationStrategy ) )
238256 {
239- // The value has for some reason become invalid, dispose of it
240257 dispose ( slot );
241258 return ;
242259 }
243260
244- if ( ! slot .claimedToAvailable () )
261+ if ( slot .claimedToAvailable () )
245262 {
246- throw new IllegalStateException ( "Failed to release pooled object: " + slot );
263+ // Make sure the pool isn't being stopped in the middle of all these shenanigans
264+ if ( !stopped .get () )
265+ {
266+ // All good, as you were.
267+ live .add ( slot );
268+ }
269+ else
270+ {
271+ // Another thread concurrently closing the pool may have started closing before we
272+ // set our slot to "available". In that case, the slot will not be disposed of by the closing thread
273+ // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
274+ // If we can't claim the slot back, that means another thread is dealing with it.
275+ if ( slot .availableToClaimed () )
276+ {
277+ dispose ( slot );
278+ }
279+ }
247280 }
248281
249- // Make sure the pool isn't being stopped in the middle of all these shenanigans
250- if ( ! stopped .get () )
282+ // If we are claimed by thread local we are already in the live queue
283+ if ( slot . threadLocalClaimedToAvailable () && stopped .get () )
251284 {
252- // All good, as you were.
253- live .add ( slot );
254- }
255- else
256- {
257- // Another thread concurrently closing the pool may have started closing before we
258- // set our slot to "available". In that case, the slot will not be disposed of by the closing thread
259- // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
260- // If we can't claim the slot back, that means another thread is dealing with it.
285+ // As above, try to claim the slot back and dispose
261286 if ( slot .availableToClaimed () )
262287 {
263288 dispose ( slot );
264289 }
265290 }
291+
266292 }
267293 };
268294 }
@@ -293,6 +319,7 @@ class Slot<T>
293319 enum State
294320 {
295321 AVAILABLE ,
322+ THREAD_LOCAL_CLAIMED ,
296323 CLAIMED ,
297324 DISPOSED
298325 }
@@ -304,13 +331,6 @@ enum State
304331 long lastUsed ;
305332 T value ;
306333
307- public static <T > Slot <T > disposed ( int index , Clock clock )
308- {
309- Slot <T > slot = new Slot <>( index , clock );
310- slot .claimedToDisposed ();
311- return slot ;
312- }
313-
314334 /**
315335 * @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is
316336 * disposed
@@ -332,6 +352,11 @@ public boolean availableToClaimed()
332352 return state .compareAndSet ( State .AVAILABLE , State .CLAIMED );
333353 }
334354
355+ public boolean availableToThreadLocalClaimed ()
356+ {
357+ return state .compareAndSet ( State .AVAILABLE , State .THREAD_LOCAL_CLAIMED );
358+ }
359+
335360 public boolean claimedToAvailable ()
336361 {
337362 updateUsageTimestamp ();
@@ -343,6 +368,36 @@ public boolean claimedToDisposed()
343368 return state .compareAndSet ( State .CLAIMED , State .DISPOSED );
344369 }
345370
371+ public boolean threadLocalClaimedToDisposed ()
372+ {
373+ return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .DISPOSED );
374+ }
375+
376+ public boolean threadLocalClaimedToClaimed ()
377+ {
378+ return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .CLAIMED );
379+ }
380+
381+ public boolean threadLocalClaimedToAvailable ()
382+ {
383+ return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .AVAILABLE );
384+ }
385+
386+ public void claimByThreadLocal ()
387+ {
388+ state .set ( State .THREAD_LOCAL_CLAIMED );
389+ }
390+
391+ public boolean isClaimedOrAvailableToClaimed ()
392+ {
393+ return availableToClaimed () || state .get () == State .CLAIMED ;
394+ }
395+
396+ public boolean disposed ()
397+ {
398+ return state .get () == State .DISPOSED ;
399+ }
400+
346401 public void updateUsageTimestamp ()
347402 {
348403 lastUsed = clock .millis ();
0 commit comments