@@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node");
173173pub struct Node {
174174 runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
175175 stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
176+ event_handling_stopped_sender : tokio:: sync:: watch:: Sender < ( ) > ,
176177 config : Arc < Config > ,
177178 wallet : Arc < Wallet > ,
178179 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -731,6 +732,7 @@ impl Node {
731732 } ;
732733
733734 let background_stop_logger = Arc :: clone ( & self . logger ) ;
735+ let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
734736 runtime. spawn ( async move {
735737 process_events_async (
736738 background_persister,
@@ -751,6 +753,18 @@ impl Node {
751753 panic ! ( "Failed to process events" ) ;
752754 } ) ;
753755 log_trace ! ( background_stop_logger, "Events processing stopped." , ) ;
756+
757+ match event_handling_stopped_sender. send ( ( ) ) {
758+ Ok ( _) => ( ) ,
759+ Err ( e) => {
760+ log_error ! (
761+ background_stop_logger,
762+ "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
763+ e
764+ ) ;
765+ debug_assert ! ( false ) ;
766+ } ,
767+ }
754768 } ) ;
755769
756770 if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
@@ -800,9 +814,55 @@ impl Node {
800814 } ,
801815 }
802816
803- // Stop disconnect peers.
817+ // Disconnect all peers.
804818 self . peer_manager . disconnect_all_peers ( ) ;
805819
820+ // Wait until event handling stopped, at least until a timeout is reached.
821+ let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
822+ let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
823+
824+ // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
825+ // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
826+ // should drop this considerably post upgrading to BDK 1.0.
827+ let timeout_res = runtime. block_on ( async {
828+ tokio:: time:: timeout (
829+ Duration :: from_secs ( 100 ) ,
830+ event_handling_stopped_receiver. changed ( ) ,
831+ )
832+ . await
833+ } ) ;
834+
835+ match timeout_res {
836+ Ok ( stop_res) => match stop_res {
837+ Ok ( ( ) ) => { } ,
838+ Err ( e) => {
839+ log_error ! (
840+ event_handling_stopped_logger,
841+ "Stopping event handling failed. This should never happen: {}" ,
842+ e
843+ ) ;
844+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
845+ } ,
846+ } ,
847+ Err ( e) => {
848+ log_error ! (
849+ event_handling_stopped_logger,
850+ "Stopping event handling timed out: {}" ,
851+ e
852+ ) ;
853+ } ,
854+ }
855+
856+ #[ cfg( tokio_unstable) ]
857+ {
858+ log_trace ! (
859+ self . logger,
860+ "Active runtime tasks left prior to shutdown: {}" ,
861+ runtime. metrics( ) . active_tasks_count( )
862+ ) ;
863+ }
864+
865+ // Shutdown our runtime. By now ~no or only very few tasks should be left.
806866 runtime. shutdown_timeout ( Duration :: from_secs ( 10 ) ) ;
807867
808868 log_info ! ( self . logger, "Shutdown complete." ) ;
0 commit comments