@@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node");
173173pub struct Node {
174174 runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
175175 stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
176+ event_handling_stopped_sender : tokio:: sync:: watch:: Sender < ( ) > ,
176177 config : Arc < Config > ,
177178 wallet : Arc < Wallet > ,
178179 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -726,6 +727,7 @@ impl Node {
726727 } ;
727728
728729 let background_stop_logger = Arc :: clone ( & self . logger ) ;
730+ let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
729731 runtime. spawn ( async move {
730732 process_events_async (
731733 background_persister,
@@ -746,6 +748,18 @@ impl Node {
746748 panic ! ( "Failed to process events" ) ;
747749 } ) ;
748750 log_trace ! ( background_stop_logger, "Events processing stopped." , ) ;
751+
752+ match event_handling_stopped_sender. send ( ( ) ) {
753+ Ok ( _) => ( ) ,
754+ Err ( e) => {
755+ log_error ! (
756+ background_stop_logger,
757+ "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
758+ e
759+ ) ;
760+ debug_assert ! ( false ) ;
761+ } ,
762+ }
749763 } ) ;
750764
751765 if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
@@ -795,9 +809,55 @@ impl Node {
795809 } ,
796810 }
797811
798- // Stop disconnect peers.
812+ // Disconnect all peers.
799813 self . peer_manager . disconnect_all_peers ( ) ;
800814
815+ // Wait until event handling stopped, at least until a timeout is reached.
816+ let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
817+ let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
818+
819+ // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
820+ // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
821+ // should drop this considerably post upgrading to BDK 1.0.
822+ let timeout_res = runtime. block_on ( async {
823+ tokio:: time:: timeout (
824+ Duration :: from_secs ( 100 ) ,
825+ event_handling_stopped_receiver. changed ( ) ,
826+ )
827+ . await
828+ } ) ;
829+
830+ match timeout_res {
831+ Ok ( stop_res) => match stop_res {
832+ Ok ( ( ) ) => { } ,
833+ Err ( e) => {
834+ log_error ! (
835+ event_handling_stopped_logger,
836+ "Stopping event handling failed. This should never happen: {}" ,
837+ e
838+ ) ;
839+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
840+ } ,
841+ } ,
842+ Err ( e) => {
843+ log_error ! (
844+ event_handling_stopped_logger,
845+ "Stopping event handling timed out: {}" ,
846+ e
847+ ) ;
848+ } ,
849+ }
850+
851+ #[ cfg( tokio_unstable) ]
852+ {
853+ log_trace ! (
854+ self . logger,
855+ "Active runtime tasks left prior to shutdown: {}" ,
856+ runtime. metrics( ) . active_tasks_count( )
857+ ) ;
858+ }
859+
860+ // Shutdown our runtime. By now ~no or only very few tasks should be left.
801861 runtime. shutdown_timeout ( Duration :: from_secs ( 10 ) ) ;
802862
803863 log_info ! ( self . logger, "Shutdown complete." ) ;
0 commit comments