1818 */
1919package org .neo4j .driver .internal .pool ;
2020
21+ import org .junit .Before ;
22+ import org .junit .Rule ;
23+ import org .junit .Test ;
24+ import org .junit .rules .ExpectedException ;
25+
26+ import java .lang .invoke .MethodHandle ;
27+ import java .lang .invoke .MethodHandles ;
28+ import java .lang .reflect .Field ;
2129import java .util .Arrays ;
2230import java .util .Collections ;
2331import java .util .LinkedList ;
2432import java .util .List ;
33+ import java .util .concurrent .BlockingQueue ;
34+ import java .util .concurrent .ExecutorService ;
35+ import java .util .concurrent .Executors ;
2536import java .util .concurrent .TimeUnit ;
37+ import java .util .concurrent .atomic .AtomicBoolean ;
2638import java .util .concurrent .atomic .AtomicInteger ;
2739
28- import org .junit .Before ;
29- import org .junit .Rule ;
30- import org .junit .Test ;
31- import org .junit .rules .ExpectedException ;
32-
3340import org .neo4j .driver .internal .util .Clock ;
3441import org .neo4j .driver .internal .util .Consumer ;
3542import org .neo4j .driver .v1 .exceptions .ClientException ;
3643
3744import static junit .framework .TestCase .fail ;
3845import static org .hamcrest .MatcherAssert .assertThat ;
46+ import static org .hamcrest .Matchers .empty ;
3947import static org .hamcrest .Matchers .equalTo ;
48+ import static org .hamcrest .Matchers .hasSize ;
4049import static org .junit .Assert .assertNull ;
50+ import static org .junit .Assert .assertTrue ;
4151
4252public class ThreadCachingPoolTest
4353{
4454 private final List <PooledObject > inUse = new LinkedList <>();
4555 private final List <PooledObject > inPool = new LinkedList <>();
4656 private final List <PooledObject > disposed = new LinkedList <>();
47-
57+ private final MethodHandle liveQueueGet = queueGetter ( "live" );
58+ private final MethodHandle disposedQueueGet = queueGetter ( "disposed" );
59+ private final ExecutorService executor = Executors .newFixedThreadPool ( 10 );
4860 private static AtomicInteger IDGEN = new AtomicInteger ();
4961
5062 @ Rule
@@ -127,7 +139,7 @@ public void shouldDisposeOfInvalidItems() throws Throwable
127139 {
128140 // Given
129141 ThreadCachingPool <PooledObject >
130- pool = new ThreadCachingPool <>( 4 , trackAllocator , invalidIfIdIs (0 ), Clock .SYSTEM );
142+ pool = new ThreadCachingPool <>( 4 , trackAllocator , invalidIfIdIs ( 0 ), Clock .SYSTEM );
131143
132144 // And given we've allocated/releasd object with id 0 once (no validation on first allocation)
133145 // TODO: Is that the right thing to do? I assume the allocator will allocate healthy objects..
@@ -137,8 +149,8 @@ public void shouldDisposeOfInvalidItems() throws Throwable
137149 pool .acquire ( 10 , TimeUnit .SECONDS );
138150
139151 // Then object with id 0 should've been disposed of, and we should have one live object with id 1
140- assertThat ( inPool , equalTo ( none () ) );
141- assertThat ( inUse , equalTo ( items ( 1 ) ) );
152+ assertThat ( inPool , equalTo ( none () ) );
153+ assertThat ( inUse , equalTo ( items ( 1 ) ) );
142154 assertThat ( disposed , equalTo ( items ( 0 ) ) );
143155 }
144156
@@ -171,8 +183,8 @@ public void shouldDisposeOfObjectsThatBecomeInvalidWhileInUse() throws Throwable
171183 val .invalidate ().release ();
172184
173185 // Then
174- assertThat ( inPool , equalTo ( none () ) );
175- assertThat ( inUse , equalTo ( none () ) );
186+ assertThat ( inPool , equalTo ( none () ) );
187+ assertThat ( inUse , equalTo ( none () ) );
176188 assertThat ( disposed , equalTo ( items ( val ) ) );
177189 }
178190
@@ -191,9 +203,9 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable
191203 try
192204 {
193205 pool .acquire ( 10 , TimeUnit .SECONDS );
194- fail ("Should not succeed at allocating any item here." );
206+ fail ( "Should not succeed at allocating any item here." );
195207 }
196- catch ( ClientException e )
208+ catch ( ClientException e )
197209 {
198210 // Expected
199211 }
@@ -207,8 +219,8 @@ public void shouldRecoverFromItemCreationFailure() throws Throwable
207219 {
208220 pool .acquire ( 10 , TimeUnit .SECONDS );
209221 }
210- assertThat ( inPool , equalTo ( none () ) );
211- assertThat ( inUse , equalTo ( items ( 0 , 1 , 2 , 3 ) ) );
222+ assertThat ( inPool , equalTo ( none () ) );
223+ assertThat ( inUse , equalTo ( items ( 0 , 1 , 2 , 3 ) ) );
212224 assertThat ( disposed , equalTo ( none () ) ); // because allocation fails, onDispose is not called
213225 }
214226
@@ -256,10 +268,88 @@ public void shouldRecovedDisposedItemReallocationFailing() throws Throwable
256268 assertThat ( inPool , equalTo ( none () ) );
257269 assertThat ( inUse , equalTo ( items ( 2 , 3 ) ) );
258270 // only the first two items value onDispose called, since allocation fails after that
259- assertThat ( disposed , equalTo ( items ( 0 , 1 ) ) );
271+ assertThat ( disposed , equalTo ( items ( 0 , 1 ) ) );
272+ }
273+
274+ @ SuppressWarnings ( "unchecked" )
275+ @ Test
276+ public void shouldNotHaveReferenceAsBothLiveAndDisposed () throws Throwable
277+ {
278+ // Given
279+ final ThreadCachingPool <PooledObject >
280+ pool = new ThreadCachingPool <>( 4 , trackAllocator , checkInvalidateFlag , Clock .SYSTEM );
281+
282+ // This object will be cached in ThreadLocal
283+ final PooledObject obj1 = pool .acquire ( 10 , TimeUnit .SECONDS );
284+
285+ //This will add another object to the live queue
286+ assertTrue ( acquireInSeparateThread ( pool ) );
287+
288+ //Now we release the first object, meaning that it will be added
289+ //to the live queue (as well as being cached as ThreadLocal in this thread)
290+ obj1 .release ();
291+ //Now we invalidate the object
292+ obj1 .invalidate ();
293+
294+ // When
295+ //Now the cached object is invalidated, we should now pick the object
296+ //from the live objects created in the background thread
297+ PooledObject obj2 = pool .acquire ( 10 , TimeUnit .SECONDS );
298+
299+ //THEN
300+ assertThat ( obj1 .id , equalTo ( 0 ) );
301+ assertThat ( obj2 .id , equalTo ( 1 ) );
302+ BlockingQueue <Slot <PooledObject >> liveQueue = (BlockingQueue <Slot <PooledObject >>) liveQueueGet .invoke ( pool );
303+ BlockingQueue <Slot <PooledObject >> disposedQueue =
304+ (BlockingQueue <Slot <PooledObject >>) disposedQueueGet .invoke ( pool );
305+
306+ assertThat ( disposedQueue , empty () );
307+ assertThat ( liveQueue , hasSize ( 1 ) );
308+ assertThat ( liveQueue .poll ( 10 , TimeUnit .SECONDS ).value .id , equalTo ( 0 ) );
309+ }
310+
311+ private boolean acquireInSeparateThread ( final ThreadCachingPool <PooledObject > pool ) throws InterruptedException
312+ {
313+ final AtomicBoolean succeeded = new AtomicBoolean ( true );
314+ executor .execute ( new Runnable ()
315+ {
316+ @ Override
317+ public void run ()
318+ {
319+ try
320+ {
321+ PooledObject obj = pool .acquire ( 10 , TimeUnit .MINUTES );
322+ obj .release ();
323+ }
324+ catch ( InterruptedException e )
325+ {
326+ succeeded .set ( false );
327+ }
328+ }
329+ } );
330+ executor .awaitTermination ( 2 , TimeUnit .SECONDS );
331+ return succeeded .get ();
332+ }
333+
334+
335+ //This is terrible hack, but I really want to keep the queues private in
336+ //ThreadCachingPool
337+ private static MethodHandle queueGetter ( String name )
338+ {
339+ try
340+ {
341+ MethodHandles .Lookup lookup = MethodHandles .lookup ();
342+ Field value = ThreadCachingPool .class .getDeclaredField ( name );
343+ value .setAccessible ( true );
344+ return lookup .unreflectGetter ( value );
345+ }
346+ catch ( NoSuchFieldException | IllegalAccessException e )
347+ {
348+ throw new AssertionError ( e );
349+ }
260350 }
261351
262- private List <PooledObject > items ( int ... objects )
352+ private List <PooledObject > items ( int ... objects )
263353 {
264354 List <PooledObject > out = new LinkedList <>();
265355 for ( int id : objects )
@@ -269,9 +359,9 @@ private List<PooledObject> items( int ... objects )
269359 return out ;
270360 }
271361
272- private List <PooledObject > items ( PooledObject ... objects )
362+ private List <PooledObject > items ( PooledObject ... objects )
273363 {
274- return Arrays .asList (objects );
364+ return Arrays .asList ( objects );
275365 }
276366
277367 private List <PooledObject > none ()
@@ -305,7 +395,7 @@ private class PooledObject
305395
306396 public PooledObject ( Consumer <PooledObject > release )
307397 {
308- this (IDGEN .getAndIncrement (), release );
398+ this ( IDGEN .getAndIncrement (), release );
309399 }
310400
311401 public PooledObject ( int id , Consumer <PooledObject > release )
@@ -362,7 +452,7 @@ private class TestAllocator implements Allocator<PooledObject>
362452 @ Override
363453 public PooledObject allocate ( Consumer <PooledObject > release )
364454 {
365- if ( creationException != null )
455+ if ( creationException != null )
366456 {
367457 throw creationException ;
368458 }
0 commit comments