Skip to content

Sweeper async change destination source fetching #3734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

joostjager
Copy link
Contributor

@joostjager joostjager commented Apr 14, 2025

This PR converts OutputSweeper to take an async ChangeDestinationSource implementation. This allows a (remote) address fetch call to run without blocking chain notifications.

Furthermore the changes demonstrates how LDK could be written in a natively async way, and still allow usage from a sync context using wrappers.

Part of #3540

@ldk-reviews-bot
Copy link

ldk-reviews-bot commented Apr 14, 2025

👋 Thanks for assigning @tnull as a reviewer!
I'll wait for their review and will help manage the review process.
Once they submit their review, I'll check if a second reviewer would be helpful.

}

Ok(())
self.persist_state(&*state_lock).map_err(|e| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more sweeping in this method, all moved to a timer.

if let Some(spending_tx) = spending_tx_opt {
self.broadcaster.broadcast_transactions(&[&spending_tx]);
}
let _ = self.persist_state(&*state_lock).map_err(|e| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more sweeping in this event handler.

if respend_descriptors.is_empty() {
// Nothing to do.
return None;
let change_destination_script = change_destination_script_result?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: address risk of getting a tx with a new address every block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: investigate what BDK does here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traced through BDK a bit. It seems that there is only inflation if we actually use the address in a tx. It won't blindly regenerate addresses when called. But will double check this.

@joostjager joostjager force-pushed the async-sweep branch 2 times, most recently from c3420ab to d6051d9 Compare April 16, 2025 09:28
@joostjager joostjager force-pushed the async-sweep branch 2 times, most recently from 90d7104 to 370d677 Compare April 16, 2025 16:32
sweeper: OutputSweeper<B, D, E, F, K, L, O>,
}

impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
Copy link
Contributor Author

@joostjager joostjager Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What might be good about this wrapper is that it eliminates the possibility of someone implementing async logic in combination with future poll/ready checking. This wrapper only accepts a sync trait.

@joostjager joostjager force-pushed the async-sweep branch 4 times, most recently from a9812ea to df47e8d Compare April 18, 2025 10:23
Copy link

codecov bot commented Apr 21, 2025

Codecov Report

Attention: Patch coverage is 80.08130% with 49 lines in your changes missing coverage. Please review.

Project coverage is 90.14%. Comparing base (46cb5ff) to head (a93b09a).
Report is 46 commits behind head on main.

Files with missing lines Patch % Lines
lightning/src/util/sweep.rs 75.41% 33 Missing and 11 partials ⚠️
lightning/src/util/async_poll.rs 57.14% 3 Missing ⚠️
lightning-background-processor/src/lib.rs 96.22% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3734      +/-   ##
==========================================
+ Coverage   89.11%   90.14%   +1.02%     
==========================================
  Files         156      156              
  Lines      123435   132005    +8570     
  Branches   123435   132005    +8570     
==========================================
+ Hits       109995   118990    +8995     
+ Misses      10758    10444     -314     
+ Partials     2682     2571     -111     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@joostjager joostjager force-pushed the async-sweep branch 6 times, most recently from f3e911e to 6cd735c Compare April 21, 2025 11:35
@joostjager joostjager marked this pull request as ready for review April 21, 2025 14:34
@joostjager joostjager requested review from tnull and TheBlueMatt April 21, 2025 14:34
@joostjager joostjager added the weekly goal Someone wants to land this this week label Apr 21, 2025
Copy link
Collaborator

@TheBlueMatt TheBlueMatt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically LGTM, were there any major questions remaining you wanted resolved?

@@ -755,6 +782,11 @@ where
}
}

struct RuntimeSweeperState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just to avoid hitting sweep_pending in the serialization logic for SweeperState? That can be avoided by writing it as (sweep_pending, static_value, false).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnull commented #3734 (comment) though.

Either way is fine for me, although I have a slight preference for the least amount of code (your suggestion).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel particularly strongly, but in general we've tended to treat everything in memory as the "runtime state" and only consider the difference at the serialization layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh. I'm not quite sure why sweep_pending can't just be an AtomicBool that lives in OutputSweeper directly? That would also allow us to check whether we're pending without acquiring (and potentially blocking on) the state mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but it raised questions with me how these two locks (atomic bool and the mutex) work together. As usual with multiple locks, it gets complex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are accessing sweep_pending inside the mutex. Locks inside locks, it's just hard to reason about?

Well, it does mean we might block the background processor on chain syncing/block connection and state persisting IO though. Surely there are worse things, but not sure why it can't simply be avoided.

The persisting is still sync indeed, but we'll solve that soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are accessing sweep_pending inside the mutex. Locks inside locks, it's just hard to reason about?

An AtomicBool is not a lock? You would -- just like currently -- skip if the bool would be true? There isn't even a lock order as it's only read once in a single method really.

The persisting is still sync indeed, but we'll solve that soon.

Right, but there is also no need now to wait until it finishes just to possibly discover pending_sweep is true and we need to skip anyways?

Copy link
Contributor Author

@joostjager joostjager Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a lock indeed, but it does complicate the reasoning about concurrent behavior. I've added a try-out commit to this PR where I implemented it, and I found myself trying to go through cases in which the lock, atomicbool and threads can interact.

Maybe I should have wrapped the whole method in another method that handles the atomicbool, was that your idea?

Copy link
Contributor

@tnull tnull Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a lock indeed, but it does complicate the reasoning about concurrent behavior. I've added a try-out commit to this PR where I implemented it, and I found myself trying to go through cases in which the lock, atomicbool and threads can interact.

I don't think it's super complicated? FWIW, you could simplify it further if you directly compare_exchange in the beginning of the method, essentially just ensuring we'd never run concurrently. To make sure that the possible interaction is only scoped on the method, you can make it a local static variable:

diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs
index aedb150c3..adfd97105 100644
--- a/lightning/src/util/sweep.rs
+++ b/lightning/src/util/sweep.rs
@@ -352,7 +352,6 @@ where
        O::Target: OutputSpender,
 {
        sweeper_state: Mutex<SweeperState>,
-       pending_sweep: AtomicBool,
        broadcaster: B,
        fee_estimator: E,
        chain_data_source: Option<F>,
@@ -385,7 +384,6 @@ where
                let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
                Self {
                        sweeper_state,
-                       pending_sweep: AtomicBool::new(false),
                        broadcaster,
                        fee_estimator,
                        chain_data_source,
@@ -462,6 +460,12 @@ where

        /// Regenerates and broadcasts the spending transaction for any outputs that are pending
        pub async fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> {
+               static PENDING_SWEEP: AtomicBool = AtomicBool::new(false);
+
+               if PENDING_SWEEP.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+                       return Ok(());
+               }
+
                let filter_fn = |o: &TrackedSpendableOutput, cur_height: u32| {
                        if o.status.is_confirmed() {
                                // Don't rebroadcast confirmed txs.
@@ -481,11 +485,6 @@ where
                        true
                };

-               // Prevent concurrent sweeps.
-               if self.pending_sweep.load(Ordering::Relaxed) {
-                       return Ok(());
-               }
-
                // See if there is anything to sweep before requesting a change address.
                {
                        let sweeper_state = self.sweeper_state.lock().unwrap();
@@ -493,19 +492,11 @@ where
                        let cur_height = sweeper_state.best_block.height;
                        let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
                        if !has_respends {
+                               PENDING_SWEEP.store(false, Ordering::Release);
                                return Ok(());
                        }
                }

-               // Mark sweep pending, if no other thread did so already.
-               if self
-                       .pending_sweep
-                       .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
-                       .is_err()
-               {
-                       return Ok(());
-               }
-
                // Request a new change address outside of the mutex to avoid the mutex crossing await.
                let change_destination_script_result =
                        self.change_destination_source.get_change_destination_script().await;
@@ -529,9 +520,8 @@ where
                                .collect();

                        if respend_descriptors.is_empty() {
-                               self.pending_sweep.store(false, Ordering::Release);
-
                                // It could be that a tx confirmed and there is now nothing to sweep anymore.
+                               PENDING_SWEEP.store(false, Ordering::Release);
                                return Ok(());
                        }
@@ -549,9 +539,7 @@ where
                                        spending_tx
                                },
                                Err(e) => {
-                                       self.pending_sweep.store(false, Ordering::Release);
-
                                        log_error!(self.logger, "Error spending outputs: {:?}", e);
+                                       PENDING_SWEEP.store(false, Ordering::Release);
                                        return Ok(());
                                },
                        };
@@ -571,12 +559,13 @@ where

                        self.persist_state(&sweeper_state).map_err(|e| {
                                log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
+                               PENDING_SWEEP.store(false, Ordering::Release);
                        })?;

                        self.broadcaster.broadcast_transactions(&[&spending_tx]);
                }

-               self.pending_sweep.store(false, Ordering::Release);
+               PENDING_SWEEP.store(false, Ordering::Release);

                Ok(())
        }

@@ -855,7 +844,6 @@ where
                let sweeper_state = Mutex::new(state);
                Ok(Self {
                        sweeper_state,
-                       pending_sweep: AtomicBool::new(false),
                        broadcaster,
                        fee_estimator,
                        chain_data_source,
@@ -906,7 +894,6 @@ where
                        best_block,
                        OutputSweeper {
                                sweeper_state,
-                               pending_sweep: AtomicBool::new(false),
                                broadcaster,
                                fee_estimator,
                                chain_data_source,

Maybe I should have wrapped the whole method in another method that handles the atomicbool, was that your idea?

You could do that if you want to make sure we always remember to store false before exiting. Could also reach something similar with a closure, or just leave as is. No strong opinion here, as adding a few stores is not too bad, IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping it around all of it is definitely simpler. Not a fan of a global variable though, will leave it a sweeper field. My preference is still to use a single synchronization primitive by adding it to the state without a separate struct and not serializing it (what I had in the very first revision), but can go along with your suggestion too.

pub fn sweeper_async(
&self,
) -> Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>> {
self.sweeper.clone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bleh, its a bit weird to wrap the sweeper in an Arc just so that we can expose it here. Would returning a reference suffice?

Copy link
Contributor Author

@joostjager joostjager Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is possible, because we would then need to move that reference into the async bp thread? Of course there is always uncertainty - for me - around these kinds of statements in Rust.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, its definitely possible, but its not quite as clean as I was hoping due to the tokio::spawns in the BP tests:

diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs
index e52affecf..da3df6ecb 100644
--- a/lightning-background-processor/src/lib.rs
+++ b/lightning-background-processor/src/lib.rs
@@ -2062,5 +2062,5 @@ mod tests {
                        nodes[0].rapid_gossip_sync(),
                        nodes[0].peer_manager.clone(),
-                       Some(nodes[0].sweeper.sweeper_async()),
+                       Some(OutputSweeperSync::sweeper_async(Arc::clone(&nodes[0].sweeper))),
                        nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()),
@@ -2562,5 +2562,5 @@ mod tests {
                        nodes[0].rapid_gossip_sync(),
                        nodes[0].peer_manager.clone(),
-                       Some(nodes[0].sweeper.sweeper_async()),
+                       Some(OutputSweeperSync::sweeper_async(Arc::clone(&nodes[0].sweeper))),
                        nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()),
@@ -2776,5 +2776,5 @@ mod tests {
                        nodes[0].no_gossip_sync(),
                        nodes[0].peer_manager.clone(),
-                       Some(nodes[0].sweeper.sweeper_async()),
+                       Some(OutputSweeperSync::sweeper_async(Arc::clone(&nodes[0].sweeper))),
                        nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()),
diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs
index e0d455109..cbc186e08 100644
--- a/lightning/src/sign/mod.rs
+++ b/lightning/src/sign/mod.rs
@@ -1029,4 +1029,20 @@ where
 }

+/// Because this wrapper is used by the sweeper to hold an underlying change destination in a
+/// generic which requires `Deref`, we implement a dummy `Deref` here.
+///
+/// This is borderline bad practice and can occasionally result in spurious compiler errors due to
+/// infinite auto-deref recursion, but it avoids a more complicated indirection and the type is not
+/// public, so there's not really any harm.
+impl<T: Deref> Deref for ChangeDestinationSourceSyncWrapper<T>
+where
+       T::Target: ChangeDestinationSourceSync,
+{
+       type Target = Self;
+       fn deref(&self) -> &Self {
+               &self
+       }
+}
+
 mod sealed {
        use bitcoin::secp256k1::{Scalar, SecretKey};
diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs
index 3b0ce5e5e..f8f4626ac 100644
--- a/lightning/src/util/sweep.rs
+++ b/lightning/src/util/sweep.rs
@@ -928,5 +928,5 @@ where
        O::Target: OutputSpender,
 {
-       sweeper: Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>>,
+       sweeper: OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, K, L, O>,
 }

@@ -948,5 +948,5 @@ where
        ) -> Self {
                let change_destination_source =
-                       Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source));
+                       ChangeDestinationSourceSyncWrapper::new(change_destination_source);

                let sweeper = OutputSweeper::new(
@@ -960,5 +960,5 @@ where
                        logger,
                );
-               Self { sweeper: Arc::new(sweeper) }
+               Self { sweeper }
        }

@@ -997,9 +997,13 @@ where

        /// Returns the inner async sweeper for testing purposes.
+       ///
+       /// Note that this leaks the provided `Arc`, keeping this sweeper in memory forever.
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn sweeper_async(
-               &self,
-       ) -> Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>> {
-               self.sweeper.clone()
+               us: Arc<Self>,
+       ) -> &'static OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, K, L, O> {
+               let res = unsafe { core::mem::transmute(&us.sweeper) };
+               core::mem::forget(us);
+               res
        }
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting solution, but for simplicity I'd think the previous Arc<> solution may be the better option? Both introduce some dirtyness that isn't publicly visible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only necessary for tests, we could just also feature-gate the Arc-wrapping in on _test_utils, no? This way we could use the approach you proposed in test, while not actually doing it in production?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is here to gain in a practical sense? It's not making the code easier to understand, at least not to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just avoids the Arc indirection which reads weird and moves yet another thing to the heap that doesn't need to be there, but in practice it doesn't matter all that much. Making it a cfg flag would read even worse, I think, so not entirely sure that's the right tradeoff either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will leave it as it is then for now

@joostjager
Copy link
Contributor Author

joostjager commented Apr 22, 2025

Basically LGTM, were there any major questions remaining you wanted resolved?

Few things:

  1. We are breaking the sweeper api in this PR. Are there specific things to check/double-check now? Update ldk-node perhaps to see if it works? Not sure if much more can be done, given that rust-lightning is an open-source library.

  2. The address inflation risk should have remained the same as before. We are calling sweeper on a timer now, but the filter function is still checking latest_broadcast_height. Or did I miss anything there?

  3. Futures flag: Sweeper async change destination source fetching #3734 (comment)

@joostjager joostjager requested a review from TheBlueMatt April 22, 2025 06:38
@TheBlueMatt
Copy link
Collaborator

We are breaking the sweeper api in this PR. Are there specific things to check/double-check now? Update ldk-node perhaps to see if it works? Not sure if much more can be done, given that rust-lightning is an open-source library.

In general we don't care too much about API breaks between versions. Obviously we don't want to require downstream projects completely overhaul their integration with LDK just for the sake of it, but this change should basically just require adding "Sync" in a few places, so it should be trivial.

The address inflation risk should have remained the same as before. We are calling sweeper on a timer now, but the filter function is still checking latest_broadcast_height. Or did I miss anything there?

Sounds correct to me.

Futures flag: Sweeper async change destination source fetching #3734 (comment)

Indeed, I see no reason not to remove the futures feature from BP.

Copy link
Collaborator

@TheBlueMatt TheBlueMatt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to see the futures flag removed in a followup or here, up to you.

pub fn sweeper_async(
&self,
) -> Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>> {
self.sweeper.clone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, its definitely possible, but its not quite as clean as I was hoping due to the tokio::spawns in the BP tests:

diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs
index e52affecf..da3df6ecb 100644
--- a/lightning-background-processor/src/lib.rs
+++ b/lightning-background-processor/src/lib.rs
@@ -2062,5 +2062,5 @@ mod tests {
                        nodes[0].rapid_gossip_sync(),
                        nodes[0].peer_manager.clone(),
-                       Some(nodes[0].sweeper.sweeper_async()),
+                       Some(OutputSweeperSync::sweeper_async(Arc::clone(&nodes[0].sweeper))),
                        nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()),
@@ -2562,5 +2562,5 @@ mod tests {
                        nodes[0].rapid_gossip_sync(),
                        nodes[0].peer_manager.clone(),
-                       Some(nodes[0].sweeper.sweeper_async()),
+                       Some(OutputSweeperSync::sweeper_async(Arc::clone(&nodes[0].sweeper))),
                        nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()),
@@ -2776,5 +2776,5 @@ mod tests {
                        nodes[0].no_gossip_sync(),
                        nodes[0].peer_manager.clone(),
-                       Some(nodes[0].sweeper.sweeper_async()),
+                       Some(OutputSweeperSync::sweeper_async(Arc::clone(&nodes[0].sweeper))),
                        nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()),
diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs
index e0d455109..cbc186e08 100644
--- a/lightning/src/sign/mod.rs
+++ b/lightning/src/sign/mod.rs
@@ -1029,4 +1029,20 @@ where
 }

+/// Because this wrapper is used by the sweeper to hold an underlying change destination in a
+/// generic which requires `Deref`, we implement a dummy `Deref` here.
+///
+/// This is borderline bad practice and can occasionally result in spurious compiler errors due to
+/// infinite auto-deref recursion, but it avoids a more complicated indirection and the type is not
+/// public, so there's not really any harm.
+impl<T: Deref> Deref for ChangeDestinationSourceSyncWrapper<T>
+where
+       T::Target: ChangeDestinationSourceSync,
+{
+       type Target = Self;
+       fn deref(&self) -> &Self {
+               &self
+       }
+}
+
 mod sealed {
        use bitcoin::secp256k1::{Scalar, SecretKey};
diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs
index 3b0ce5e5e..f8f4626ac 100644
--- a/lightning/src/util/sweep.rs
+++ b/lightning/src/util/sweep.rs
@@ -928,5 +928,5 @@ where
        O::Target: OutputSpender,
 {
-       sweeper: Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>>,
+       sweeper: OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, K, L, O>,
 }

@@ -948,5 +948,5 @@ where
        ) -> Self {
                let change_destination_source =
-                       Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source));
+                       ChangeDestinationSourceSyncWrapper::new(change_destination_source);

                let sweeper = OutputSweeper::new(
@@ -960,5 +960,5 @@ where
                        logger,
                );
-               Self { sweeper: Arc::new(sweeper) }
+               Self { sweeper }
        }

@@ -997,9 +997,13 @@ where

        /// Returns the inner async sweeper for testing purposes.
+       ///
+       /// Note that this leaks the provided `Arc`, keeping this sweeper in memory forever.
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn sweeper_async(
-               &self,
-       ) -> Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>> {
-               self.sweeper.clone()
+               us: Arc<Self>,
+       ) -> &'static OutputSweeper<B, ChangeDestinationSourceSyncWrapper<D>, E, F, K, L, O> {
+               let res = unsafe { core::mem::transmute(&us.sweeper) };
+               core::mem::forget(us);
+               res
        }
 }

@ldk-reviews-bot
Copy link

🔔 1st Reminder

Hey @tnull! This PR has been waiting for your review.
Please take a look when you have a chance. If you're unable to review, please let us know so we can find another reviewer.

@joostjager
Copy link
Contributor Author

Rebased after merge of #3509

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the OutputSweeper logic to support asynchronous change destination source fetching while still enabling synchronous usage via a new wrapper. Key changes include:

  • Converting OutputSweeper to use an async implementation for fetching change addresses.
  • Introducing a RuntimeSweeperState to support concurrent sweeper operations.
  • Adding an OutputSweeperSync wrapper to allow synchronous usage of the async API.

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
lightning/src/util/sweep.rs Refactored OutputSweeper APIs to support async operations and state management.
lightning/src/util/async_poll.rs Updated dummy_waker implementation to work with a complete Waker vtable.
lightning/src/sign/mod.rs Introduced async alias and a synchronous wrapper (ChangeDestinationSourceSyncWrapper) for change destination fetching.
lightning/src/lib.rs Removed the forbid(unsafe_code) attribute.
lightning-background-processor/src/lib.rs Integrated the new sweeper APIs into background processing flows.
Comments suppressed due to low confidence (1)

lightning/src/lib.rs:32

  • The removal of the 'forbid(unsafe_code)' attribute now permits unsafe code in production. If this removal was not intentional, consider re-enabling it or ensuring that any introduced unsafe code is thoroughly reviewed.
#![cfg_attr(not(any(test, fuzzing, feature = "_test_utils")), forbid(unsafe_code))]

Comment on lines 974 to 975
// In a sync context, we can't wait for the future to complete.
panic!("task not ready");
Copy link
Preview

Copilot AI Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using a panic in the synchronous wrapper when the future is not immediately ready may lead to abrupt termination. Consider returning an error or implementing a blocking wait so that the caller can handle the situation gracefully.

Suggested change
// In a sync context, we can't wait for the future to complete.
panic!("task not ready");
// In a sync context, block until the future is ready.
return futures::executor::block_on(fut);

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to be not tolerant in this case, because everything lives in the wrapper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm a bit on the fence about this. But if we indeed want to panic here, using the unreachable! macro would be the most idiomatic way to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it unreachable!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We absolutely don't want to take a dependency to seek to run a future in a fresh executor in cases where its unreachable :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI is going to AI

@ldk-reviews-bot
Copy link

🔔 2nd Reminder

Hey @tnull! This PR has been waiting for your review.
Please take a look when you have a chance. If you're unable to review, please let us know so we can find another reviewer.

@ldk-reviews-bot
Copy link

🔔 3rd Reminder

Hey @tnull! This PR has been waiting for your review.
Please take a look when you have a chance. If you're unable to review, please let us know so we can find another reviewer.

Copy link
Contributor

@tnull tnull left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much LGTM, mod some comments.

I think the main think I'd prefer to be changed is the (rather ugly) RuntimeSweperState wrapping.

) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CF::Target: 'static + chain::Filter + Sync + Send,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not quite sure why/whether these bounds are necessary now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to remove them, but that gives me a compiler error

<CF as Deref>::Target cannot be shared between threads safely
the trait std::marker::Sync is not implemented for <CF as Deref>::Target

On the OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> bound.

@@ -1269,7 +1322,7 @@ mod tests {
Arc<test_utils::TestBroadcaster>,
Arc<TestWallet>,
Arc<test_utils::TestFeeEstimator>,
Arc<dyn Filter + Sync + Send>,
Arc<test_utils::TestChainSource>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was so much fighting and trying. If I revert I am getting:

type mismatch resolving <Arc<OutputSweeper<Arc, Arc, Arc, Arc<dyn Filter + Send + Sync>, Arc<...>, ..., ...>> as Deref>::Target == OutputSweeper<Arc, Arc, Arc, Arc, , Arc<...>, ...>expected structOutputSweeper<, _, _, Arc, _, _, >found structOutputSweeper<, _, _, Arc<dyn lightning::chain::Filter + Send + std::marker::Sync>, Arc, _, Arc> TestChainSourceimplementsFilterso you could change the expected type toBox``

@@ -755,6 +782,11 @@ where
}
}

struct RuntimeSweeperState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh. I'm not quite sure why sweep_pending can't just be an AtomicBool that lives in OutputSweeper directly? That would also allow us to check whether we're pending without acquiring (and potentially blocking on) the state mutex?

@@ -975,17 +978,56 @@ pub trait SignerProvider {
fn get_shutdown_scriptpubkey(&self) -> Result<ShutdownScript, ()>;
}

/// A type alias for a future that returns a result of type T.
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a more general type related to future-polling. I wonder if it should rather live in utils/wakers.rs or async_poll.rs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to async_poll.rs

Comment on lines 974 to 975
// In a sync context, we can't wait for the future to complete.
panic!("task not ready");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm a bit on the fence about this. But if we indeed want to panic here, using the unreachable! macro would be the most idiomatic way to do it.

pub fn sweeper_async(
&self,
) -> Arc<OutputSweeper<B, Arc<ChangeDestinationSourceSyncWrapper<D>>, E, F, K, L, O>> {
self.sweeper.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only necessary for tests, we could just also feature-gate the Arc-wrapping in on _test_utils, no? This way we could use the approach you proposed in test, while not actually doing it in production?

To prepare for asynchronous processing of the sweep, we need to decouple
the spending from the chain notifications. These notifications run in a
sync context and wouldn't allow calls into an async trait.

Instead we now periodically call into the sweeper, to open up the
possibility to do so from an async context if desired.
@joostjager
Copy link
Contributor Author

joostjager commented May 1, 2025

Futures flag removed

Changed my mind. Will do in a follow up.

@joostjager joostjager force-pushed the async-sweep branch 2 times, most recently from d317ae3 to 493b686 Compare May 1, 2025 10:48
@joostjager joostjager requested a review from tnull May 1, 2025 12:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
weekly goal Someone wants to land this this week
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants