@@ -406,7 +406,6 @@ async fn migrate_sset(
406406pub mod tests {
407407 use std:: time:: Duration ;
408408
409- use assert_matches:: assert_matches;
410409 use chrono:: Utc ;
411410 use redis:: { streams:: StreamReadReply , AsyncCommands as _, Direction } ;
412411 use tokio:: time:: timeout;
@@ -415,7 +414,7 @@ pub mod tests {
415414 use crate :: {
416415 cfg:: Configuration ,
417416 core:: types:: { ApplicationId , EndpointId , MessageAttemptTriggerType , MessageId } ,
418- queue:: { MessageTask , QueueTask , TaskQueueConsumer , TaskQueueProducer } ,
417+ queue:: { MessageTask , QueueTask } ,
419418 redis:: RedisManager ,
420419 } ;
421420
@@ -492,18 +491,12 @@ pub mod tests {
492491 assert_eq ! ( should_be_none, vec![ ] ) ;
493492 }
494493
495- /// Reads and acknowledges all items in the queue with the given name for clearing out entries
496- /// from previous test runs
497- async fn flush_stale_queue_items ( _p : TaskQueueProducer , c : & mut TaskQueueConsumer ) {
498- while let Ok ( recv) = timeout (
499- Duration :: from_millis ( 100 ) ,
500- c. receive_all ( TEST_RECV_DEADLINE ) ,
501- )
502- . await
503- {
504- let recv = recv. unwrap ( ) . pop ( ) . unwrap ( ) ;
505- recv. ack ( ) . await . unwrap ( ) ;
506- }
494+ async fn cleanup ( pool : & RedisManager , q1 : & str , q2 : & str , q3 : & str ) {
495+ let mut conn = pool
496+ . get ( )
497+ . await
498+ . expect ( "Error retrieving connection from Redis pool" ) ;
499+ let _: ( ) = conn. del ( & [ q1, q2, q3] ) . await . unwrap ( ) ;
507500 }
508501
509502 #[ tokio:: test]
@@ -512,19 +505,16 @@ pub mod tests {
512505 let cfg = crate :: cfg:: load ( ) . unwrap ( ) ;
513506 let pool = get_pool ( & cfg) . await ;
514507
515- let ( p, mut c) = new_pair_inner (
516- & cfg,
517- Duration :: from_millis ( 100 ) ,
518- "" ,
519- "{test}_idle_period" ,
520- "{test}_idle_period_delayed" ,
521- "{test}_idle_period_delayed_lock" ,
522- "{test}_dlq" ,
523- )
524- . await ;
508+ let main_queue = "{test}_idle_period" ;
509+ let delayed = "{test}_idle_period_delayed" ;
510+ let lock = "{test}_idle_period_delayed_lock" ;
511+ let dlq = "{test}_dlq" ;
512+
513+ let delay = Duration :: from_millis ( 100 ) ;
525514
526- tokio:: time:: sleep ( Duration :: from_millis ( 150 ) ) . await ;
527- flush_stale_queue_items ( p. clone ( ) , & mut c) . await ;
515+ cleanup ( & pool, main_queue, delayed, lock) . await ;
516+
517+ let ( p, mut c) = new_pair_inner ( & cfg, delay, "" , main_queue, delayed, lock, dlq) . await ;
528518
529519 let mt = QueueTask :: MessageV1 ( MessageTask {
530520 msg_id : MessageId ( "test" . to_owned ( ) ) ,
@@ -540,9 +530,9 @@ pub mod tests {
540530 . expect ( "`c.receive()` has timed out" ) ;
541531 assert_eq ! ( * recv. unwrap( ) [ 0 ] . task, mt) ;
542532
543- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
533+ tokio:: time:: sleep ( delay ) . await ;
544534
545- let recv = timeout ( Duration :: from_secs ( 5 ) , c. receive_all ( TEST_RECV_DEADLINE ) )
535+ let recv = timeout ( Duration :: from_secs ( 1 ) , c. receive_all ( TEST_RECV_DEADLINE ) )
546536 . await
547537 . expect ( "`c.receive()` has timed out" ) ;
548538 let recv = recv. unwrap ( ) . pop ( ) . unwrap ( ) ;
@@ -555,12 +545,12 @@ pub mod tests {
555545 . get ( )
556546 . await
557547 . expect ( "Error retrieving connection from Redis pool" ) ;
558- let keys = conn
559- . xread :: < _ , _ , StreamReadReply > ( & [ "{test}_ack" ] , & [ 0 ] )
548+ assert ! ( conn
549+ . xread:: <_, _, StreamReadReply >( & [ main_queue ] , & [ 0 ] )
560550 . await
561551 . unwrap( )
562- . keys ;
563- assert_matches ! ( keys . as_slice ( ) , [ ] ) ;
552+ . keys
553+ . is_empty ( ) ) ;
564554 }
565555
566556 #[ tokio:: test]
@@ -569,30 +559,16 @@ pub mod tests {
569559 let cfg = crate :: cfg:: load ( ) . unwrap ( ) ;
570560 let pool = get_pool ( & cfg) . await ;
571561
572- // Delete the keys used in this test to ensure nothing pollutes the output
573- let mut conn = pool
574- . get ( )
575- . await
576- . expect ( "Error retrieving connection from Redis pool" ) ;
577- let _: ( ) = conn
578- . del ( & [
579- "{test}_ack" ,
580- "{test}_ack_delayed" ,
581- "{test}_ack_delayed_lock" ,
582- ] )
583- . await
584- . unwrap ( ) ;
562+ let main_queue = "{test}_ack" ;
563+ let delayed = "{test}_ack_delayed" ;
564+ let lock = "{test}_ack_delayed_lock" ;
565+ let dlq = "{test}_dlq" ;
585566
586- let ( p, mut c) = new_pair_inner (
587- & cfg,
588- Duration :: from_millis ( 5000 ) ,
589- "" ,
590- "{test}_ack" ,
591- "{test}_ack_delayed" ,
592- "{test}_ack_delayed_lock" ,
593- "{test}_dlq" ,
594- )
595- . await ;
567+ cleanup ( & pool, main_queue, delayed, lock) . await ;
568+
569+ let delay = Duration :: from_millis ( 100 ) ;
570+
571+ let ( p, mut c) = new_pair_inner ( & cfg, delay, "" , main_queue, delayed, lock, dlq) . await ;
596572
597573 let mt = QueueTask :: MessageV1 ( MessageTask {
598574 msg_id : MessageId ( "test2" . to_owned ( ) ) ,
@@ -612,38 +588,39 @@ pub mod tests {
612588 assert_eq ! ( * recv. task, mt) ;
613589 recv. ack ( ) . await . unwrap ( ) ;
614590
615- if let Ok ( recv) = timeout ( Duration :: from_secs ( 1 ) , c. receive_all ( TEST_RECV_DEADLINE ) ) . await {
591+ if let Ok ( recv) = timeout ( delay , c. receive_all ( TEST_RECV_DEADLINE ) ) . await {
616592 panic ! ( "Received unexpected QueueTask {:?}" , recv. unwrap( ) [ 0 ] . task) ;
617593 }
618594
595+ let mut conn = pool
596+ . get ( )
597+ . await
598+ . expect ( "Error retrieving connection from Redis pool" ) ;
619599 // And assert that the task has been deleted
620- let keys = conn
621- . xread :: < _ , _ , StreamReadReply > ( & [ "{test}_ack" ] , & [ 0 ] )
600+ assert ! ( conn
601+ . xread:: <_, _, StreamReadReply >( & [ main_queue ] , & [ 0 ] )
622602 . await
623603 . unwrap( )
624- . keys ;
625- assert_matches ! ( keys . as_slice ( ) , [ ] ) ;
604+ . keys
605+ . is_empty ( ) ) ;
626606 }
627607
628608 #[ tokio:: test]
629609 #[ ignore]
630610 async fn test_nack ( ) {
631611 let cfg = crate :: cfg:: load ( ) . unwrap ( ) ;
612+ let pool = get_pool ( & cfg) . await ;
632613
633- let ( p, mut c) = new_pair_inner (
634- & cfg,
635- Duration :: from_millis ( 500 ) ,
636- "" ,
637- "{test}_nack" ,
638- "{test}_nack_delayed" ,
639- "{test}_nack_delayed_lock" ,
640- "{test}_dlq" ,
641- )
642- . await ;
614+ let main_queue = "{test}_nack" ;
615+ let delayed = "{test}_nack_delayed" ;
616+ let lock = "{test}_nack_delayed_lock" ;
617+ let dlq = "{test}_nack_delayed_dlq" ;
643618
644- tokio :: time :: sleep ( Duration :: from_millis ( 550 ) ) . await ;
619+ cleanup ( & pool , main_queue , delayed , lock ) . await ;
645620
646- flush_stale_queue_items ( p. clone ( ) , & mut c) . await ;
621+ let delay = Duration :: from_millis ( 100 ) ;
622+
623+ let ( p, mut c) = new_pair_inner ( & cfg, delay, "" , main_queue, delayed, lock, dlq) . await ;
647624
648625 let mt = QueueTask :: MessageV1 ( MessageTask {
649626 msg_id : MessageId ( "test" . to_owned ( ) ) ,
@@ -663,31 +640,30 @@ pub mod tests {
663640 assert_eq ! ( * recv. task, mt) ;
664641 recv. nack ( ) . await . unwrap ( ) ;
665642
666- let recv = timeout ( Duration :: from_secs ( 1 ) , c. receive_all ( TEST_RECV_DEADLINE ) )
667- . await
668- . expect ( "Expected QueueTask" ) ;
643+ let recv = timeout (
644+ Duration :: from_millis ( 500 ) + delay,
645+ c. receive_all ( TEST_RECV_DEADLINE ) ,
646+ )
647+ . await
648+ . expect ( "Expected QueueTask" ) ;
669649 assert_eq ! ( * recv. unwrap( ) . pop( ) . unwrap( ) . task, mt) ;
670650 }
671651
672652 #[ tokio:: test]
673653 #[ ignore]
674654 async fn test_delay ( ) {
675655 let cfg = crate :: cfg:: load ( ) . unwrap ( ) ;
656+ let pool = get_pool ( & cfg) . await ;
676657
677- let ( p, mut c) = new_pair_inner (
678- & cfg,
679- Duration :: from_millis ( 500 ) ,
680- "" ,
681- "{test}_delay" ,
682- "{test}_delay_delayed" ,
683- "{test}_delay_delayed_lock" ,
684- "{test}_dlq" ,
685- )
686- . await ;
658+ let main_queue = "{test}_delay" ;
659+ let delayed = "{test}_delay_delayed" ;
660+ let lock = "{test}_delay_delayed_lock" ;
661+ let dlq = "{test}_delay_delayed_dlq" ;
687662
688- tokio :: time :: sleep ( Duration :: from_millis ( 550 ) ) . await ;
663+ cleanup ( & pool , main_queue , delayed , lock ) . await ;
689664
690- flush_stale_queue_items ( p. clone ( ) , & mut c) . await ;
665+ let delay = Duration :: from_millis ( 500 ) ;
666+ let ( p, mut c) = new_pair_inner ( & cfg, delay, "" , main_queue, delayed, lock, dlq) . await ;
691667
692668 let mt1 = QueueTask :: MessageV1 ( MessageTask {
693669 msg_id : MessageId ( "test1" . to_owned ( ) ) ,
@@ -709,20 +685,20 @@ pub mod tests {
709685 . unwrap ( ) ;
710686 p. send ( & mt2, None ) . await . unwrap ( ) ;
711687
712- let [ recv2] = c
688+ let recv2 = c
713689 . receive_all ( TEST_RECV_DEADLINE )
714690 . await
715691 . unwrap ( )
716- . try_into ( )
692+ . pop ( )
717693 . unwrap ( ) ;
718694 assert_eq ! ( * recv2. task, mt2) ;
719695 recv2. ack ( ) . await . unwrap ( ) ;
720696
721- let [ recv1] = c
697+ let recv1 = c
722698 . receive_all ( TEST_RECV_DEADLINE )
723699 . await
724700 . unwrap ( )
725- . try_into ( )
701+ . pop ( )
726702 . unwrap ( ) ;
727703 assert_eq ! ( * recv1. task, mt1) ;
728704 recv1. ack ( ) . await . unwrap ( ) ;
0 commit comments