9191import java .util .Iterator ;
9292import java .util .List ;
9393import java .util .Map ;
94+ import java .util .concurrent .CompletableFuture ;
9495import java .util .concurrent .CountDownLatch ;
9596import java .util .concurrent .ExecutionException ;
9697import java .util .concurrent .Semaphore ;
@@ -1387,11 +1388,10 @@ public void writeAndReadVectorEmbeddings() throws ExecutionException, Interrupte
13871388
13881389 @ Test
13891390 public void listenToDocumentsWithVectors () throws Throwable {
1390- final Semaphore semaphore = new Semaphore ( 0 );
1391+ CompletableFuture < Void > listen = new CompletableFuture <>( );
13911392 ListenerRegistration registration = null ;
13921393 DocumentReference ref = randomColl .document ();
1393- AtomicReference <Throwable > failureMessage = new AtomicReference (null );
1394- int totalPermits = 5 ;
1394+ AtomicInteger snapshotCount = new AtomicInteger ();
13951395
13961396 try {
13971397 registration =
@@ -1403,7 +1403,7 @@ public void listenToDocumentsWithVectors() throws Throwable {
14031403 DocumentSnapshot docSnap =
14041404 value .isEmpty () ? null : value .getDocuments ().get (0 );
14051405
1406- switch (semaphore . availablePermits ()) {
1406+ switch (snapshotCount . getAndIncrement ()) {
14071407 case 0 :
14081408 assertNull (docSnap );
14091409 ref .create (
@@ -1472,41 +1472,35 @@ public void listenToDocumentsWithVectors() throws Throwable {
14721472 break ;
14731473 case 4 :
14741474 assertNull (docSnap );
1475+ listen .complete (null );
14751476 break ;
14761477 }
14771478 } catch (Throwable t ) {
1478- failureMessage .set (t );
1479- semaphore .release (totalPermits );
1479+ listen .completeExceptionally (t );
14801480 }
1481-
1482- semaphore .release ();
14831481 });
14841482
1485- semaphore . acquire ( totalPermits );
1483+ listen . get ( );
14861484 } finally {
14871485 if (registration != null ) {
14881486 registration .remove ();
14891487 }
1490-
1491- if (failureMessage .get () != null ) {
1492- throw failureMessage .get ();
1493- }
14941488 }
14951489 }
14961490
14971491 @ Test
14981492 public void documentWatch () throws Exception {
1499- final DocumentReference documentReference = randomColl .document ();
1500-
1501- final Semaphore semaphore = new Semaphore (0 );
1493+ CompletableFuture <Void > listen = new CompletableFuture <>();
1494+ DocumentReference documentReference = randomColl .document ();
15021495 ListenerRegistration registration = null ;
1496+ AtomicInteger snapshotCount = new AtomicInteger ();
15031497
15041498 try {
15051499 registration =
15061500 documentReference .addSnapshotListener (
15071501 (value , error ) -> {
15081502 try {
1509- switch (semaphore . availablePermits ()) {
1503+ switch (snapshotCount . getAndIncrement ()) {
15101504 case 0 :
15111505 assertFalse (value .exists ());
15121506 documentReference .set (map ("foo" , "foo" ));
@@ -1525,15 +1519,14 @@ public void documentWatch() throws Exception {
15251519 break ;
15261520 case 3 :
15271521 assertFalse (value .exists ());
1522+ listen .complete (null );
15281523 break ;
15291524 }
15301525 } catch (Exception e ) {
1531- fail ( e . getMessage () );
1526+ listen . completeExceptionally ( e );
15321527 }
1533- semaphore .release ();
15341528 });
1535-
1536- semaphore .acquire (4 );
1529+ listen .get ();
15371530 } finally {
15381531 if (registration != null ) {
15391532 registration .remove ();
0 commit comments