@@ -354,11 +354,6 @@ impl PendingChecks {
354354 if latest_announce. is_none ( ) ||
355355 latest_announce. as_ref ( ) . unwrap ( ) . timestamp ( ) < msg. timestamp
356356 {
357- // If the messages we got has a higher timestamp, just blindly
358- // assume the signatures on the new message are correct and drop
359- // the old message. This may cause us to end up dropping valid
360- // `node_announcement`s if a peer is malicious, but we should get
361- // the correct ones when the node updates them.
362357 * latest_announce = Some (
363358 if let Some ( msg) = full_msg { NodeAnnouncement :: Full ( msg. clone ( ) ) }
364359 else { NodeAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
@@ -536,3 +531,313 @@ impl PendingChecks {
536531 }
537532 }
538533}
534+
535+ #[ cfg( test) ]
536+ mod tests {
537+ use super :: * ;
538+ use crate :: routing:: gossip:: tests:: * ;
539+ use crate :: util:: test_utils:: { TestChainSource , TestLogger } ;
540+ use crate :: ln:: msgs;
541+
542+ use bitcoin:: blockdata:: constants:: genesis_block;
543+ use bitcoin:: secp256k1:: { Secp256k1 , SecretKey } ;
544+
545+ use core:: sync:: atomic:: Ordering ;
546+
547+ fn get_network ( ) -> ( TestChainSource , NetworkGraph < Box < TestLogger > > ) {
548+ let logger = Box :: new ( TestLogger :: new ( ) ) ;
549+ let genesis_hash = genesis_block ( bitcoin:: Network :: Testnet ) . header . block_hash ( ) ;
550+ let chain_source = TestChainSource :: new ( bitcoin:: Network :: Testnet ) ;
551+ let network_graph = NetworkGraph :: new ( genesis_hash, logger) ;
552+
553+ ( chain_source, network_graph)
554+ }
555+
556+ fn get_test_objects ( ) -> ( msgs:: ChannelAnnouncement , TestChainSource ,
557+ NetworkGraph < Box < TestLogger > > , bitcoin:: Script , msgs:: NodeAnnouncement ,
558+ msgs:: NodeAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate , msgs:: ChannelUpdate )
559+ {
560+ let secp_ctx = Secp256k1 :: new ( ) ;
561+
562+ let ( chain_source, network_graph) = get_network ( ) ;
563+
564+ let good_script = get_channel_script ( & secp_ctx) ;
565+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
566+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
567+ let valid_announcement = get_signed_channel_announcement ( |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
568+
569+ let node_a_announce = get_signed_node_announcement ( |_| { } , node_1_privkey, & secp_ctx) ;
570+ let node_b_announce = get_signed_node_announcement ( |_| { } , node_2_privkey, & secp_ctx) ;
571+
572+ // Note that we have to set the "direction" flag correctly on both messages
573+ let chan_update_a = get_signed_channel_update ( |msg| msg. flags = 0 , node_1_privkey, & secp_ctx) ;
574+ let chan_update_b = get_signed_channel_update ( |msg| msg. flags = 1 , node_2_privkey, & secp_ctx) ;
575+ let chan_update_c = get_signed_channel_update ( |msg| {
576+ msg. flags = 1 ; msg. timestamp += 1 ; } , node_2_privkey, & secp_ctx) ;
577+
578+ ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
579+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
580+ }
581+
582+ #[ test]
583+ fn test_fast_async_lookup ( ) {
584+ // Check that async lookups which resolve quicker than the future is returned to the
585+ // `get_utxo` call can read it still resolve properly.
586+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
587+
588+ let future = AccessFuture :: new ( ) ;
589+ future. resolve_without_forwarding ( & network_graph,
590+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
591+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
592+
593+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap ( ) ;
594+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_some( ) ) ;
595+ }
596+
597+ #[ test]
598+ fn test_async_lookup ( ) {
599+ // Test a simple async lookup
600+ let ( valid_announcement, chain_source, network_graph, good_script,
601+ node_a_announce, node_b_announce, ..) = get_test_objects ( ) ;
602+
603+ let future = AccessFuture :: new ( ) ;
604+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
605+
606+ assert_eq ! (
607+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
608+ "Channel being checked async" ) ;
609+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
610+
611+ future. resolve_without_forwarding ( & network_graph,
612+ Ok ( TxOut { value : 0 , script_pubkey : good_script } ) ) ;
613+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
614+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
615+
616+ assert ! ( network_graph. read_only( ) . nodes( )
617+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
618+ . announcement_info. is_none( ) ) ;
619+
620+ network_graph. update_node_from_announcement ( & node_a_announce) . unwrap ( ) ;
621+ network_graph. update_node_from_announcement ( & node_b_announce) . unwrap ( ) ;
622+
623+ assert ! ( network_graph. read_only( ) . nodes( )
624+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
625+ . announcement_info. is_some( ) ) ;
626+ }
627+
628+ #[ test]
629+ fn test_invalid_async_lookup ( ) {
630+ // Test an async lookup which returns an incorrect script
631+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
632+
633+ let future = AccessFuture :: new ( ) ;
634+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
635+
636+ assert_eq ! (
637+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
638+ "Channel being checked async" ) ;
639+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
640+
641+ future. resolve_without_forwarding ( & network_graph,
642+ Ok ( TxOut { value : 1_000_000 , script_pubkey : bitcoin:: Script :: new ( ) } ) ) ;
643+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
644+ }
645+
646+ #[ test]
647+ fn test_failing_async_lookup ( ) {
648+ // Test an async lookup which returns an error
649+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
650+
651+ let future = AccessFuture :: new ( ) ;
652+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
653+
654+ assert_eq ! (
655+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
656+ "Channel being checked async" ) ;
657+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
658+
659+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
660+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
661+ }
662+
663+ #[ test]
664+ fn test_updates_async_lookup ( ) {
665+ // Test async lookups will process pending channel_update/node_announcements once they
666+ // complete.
667+ let ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
668+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects ( ) ;
669+
670+ let future = AccessFuture :: new ( ) ;
671+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
672+
673+ assert_eq ! (
674+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
675+ "Channel being checked async" ) ;
676+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
677+
678+ assert_eq ! (
679+ network_graph. update_node_from_announcement( & node_a_announce) . unwrap_err( ) . err,
680+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
681+ assert_eq ! (
682+ network_graph. update_node_from_announcement( & node_b_announce) . unwrap_err( ) . err,
683+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
684+
685+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
686+ "Awaiting channel_announcement validation to accept channel_update" ) ;
687+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
688+ "Awaiting channel_announcement validation to accept channel_update" ) ;
689+
690+ future. resolve_without_forwarding ( & network_graph,
691+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
692+
693+ assert ! ( network_graph. read_only( ) . channels( )
694+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . one_to_two. is_some( ) ) ;
695+ assert ! ( network_graph. read_only( ) . channels( )
696+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . two_to_one. is_some( ) ) ;
697+
698+ assert ! ( network_graph. read_only( ) . nodes( )
699+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_1) ) . unwrap( )
700+ . announcement_info. is_some( ) ) ;
701+ assert ! ( network_graph. read_only( ) . nodes( )
702+ . get( & NodeId :: from_pubkey( & valid_announcement. contents. node_id_2) ) . unwrap( )
703+ . announcement_info. is_some( ) ) ;
704+ }
705+
706+ #[ test]
707+ fn test_latest_update_async_lookup ( ) {
708+ // Test async lookups will process the latest channel_update if two are received while
709+ // awaiting an async UTXO lookup.
710+ let ( valid_announcement, chain_source, network_graph, good_script, _,
711+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects ( ) ;
712+
713+ let future = AccessFuture :: new ( ) ;
714+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
715+
716+ assert_eq ! (
717+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
718+ "Channel being checked async" ) ;
719+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
720+
721+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
722+ "Awaiting channel_announcement validation to accept channel_update" ) ;
723+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
724+ "Awaiting channel_announcement validation to accept channel_update" ) ;
725+ assert_eq ! ( network_graph. update_channel( & chan_update_c) . unwrap_err( ) . err,
726+ "Awaiting channel_announcement validation to accept channel_update" ) ;
727+
728+ future. resolve_without_forwarding ( & network_graph,
729+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
730+
731+ assert_eq ! ( chan_update_a. contents. timestamp, chan_update_b. contents. timestamp) ;
732+ assert ! ( network_graph. read_only( ) . channels( )
733+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
734+ . one_to_two. as_ref( ) . unwrap( ) . last_update !=
735+ network_graph. read_only( ) . channels( )
736+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
737+ . two_to_one. as_ref( ) . unwrap( ) . last_update) ;
738+ }
739+
740+ #[ test]
741+ fn test_no_double_lookups ( ) {
742+ // Test that a pending async lookup will prevent a second async lookup from flying, but
743+ // only if the channel_announcement message is identical.
744+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
745+
746+ let future = AccessFuture :: new ( ) ;
747+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
748+
749+ assert_eq ! (
750+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
751+ "Channel being checked async" ) ;
752+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
753+
754+ // If we make a second request with the same message, the call count doesn't increase...
755+ let future_b = AccessFuture :: new ( ) ;
756+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future_b. clone ( ) ) ;
757+ assert_eq ! (
758+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
759+ "Channel announcement is already being checked" ) ;
760+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
761+
762+ // But if we make a third request with a tweaked message, we should get a second call
763+ // against our new future...
764+ let secp_ctx = Secp256k1 :: new ( ) ;
765+ let replacement_pk_1 = & SecretKey :: from_slice ( & [ 99 ; 32 ] ) . unwrap ( ) ;
766+ let replacement_pk_2 = & SecretKey :: from_slice ( & [ 98 ; 32 ] ) . unwrap ( ) ;
767+ let invalid_announcement = get_signed_channel_announcement ( |_| { } , replacement_pk_1, replacement_pk_2, & secp_ctx) ;
768+ assert_eq ! (
769+ network_graph. update_channel_from_announcement( & invalid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
770+ "Channel being checked async" ) ;
771+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 2 ) ;
772+
773+ // Still, if we resolve the original future, the original channel will be accepted.
774+ future. resolve_without_forwarding ( & network_graph,
775+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
776+ assert ! ( !network_graph. read_only( ) . channels( )
777+ . get( & valid_announcement. contents. short_channel_id) . unwrap( )
778+ . announcement_message. as_ref( ) . unwrap( )
779+ . contents. features. supports_unknown_test_feature( ) ) ;
780+ }
781+
782+ #[ test]
783+ fn test_checks_backpressure ( ) {
784+ // Test that too_many_checks_pending returns true when there are many checks pending, and
785+ // returns false once they complete.
786+ let secp_ctx = Secp256k1 :: new ( ) ;
787+ let ( chain_source, network_graph) = get_network ( ) ;
788+
789+ // We cheat and use a single future for all the lookups to complete them all at once.
790+ let future = AccessFuture :: new ( ) ;
791+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( future. clone ( ) ) ;
792+
793+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
794+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
795+
796+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
797+ let valid_announcement = get_signed_channel_announcement (
798+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
799+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
800+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
801+ }
802+
803+ let valid_announcement = get_signed_channel_announcement (
804+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
805+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
806+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
807+
808+ // Once the future completes the "too many checks" flag should reset.
809+ future. resolve_without_forwarding ( & network_graph, Err ( ChainAccessError :: UnknownTx ) ) ;
810+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
811+ }
812+
813+ #[ test]
814+ fn test_checks_backpressure_drop ( ) {
815+ // Test that too_many_checks_pending returns true when there are many checks pending, and
816+ // returns false if we drop some of the futures without completion.
817+ let secp_ctx = Secp256k1 :: new ( ) ;
818+ let ( chain_source, network_graph) = get_network ( ) ;
819+
820+ // We cheat and use a single future for all the lookups to complete them all at once.
821+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Async ( AccessFuture :: new ( ) ) ;
822+
823+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
824+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
825+
826+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
827+ let valid_announcement = get_signed_channel_announcement (
828+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
829+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
830+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
831+ }
832+
833+ let valid_announcement = get_signed_channel_announcement (
834+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
835+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
836+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
837+
838+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
839+ // should reset to false.
840+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = ChainAccessResult :: Sync ( Err ( ChainAccessError :: UnknownTx ) ) ;
841+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
842+ }
843+ }
0 commit comments