Skip to content

Commit d7de357

Browse files
committed
Allow events processing without holding total_consistency_lock
Unfortunately, the RAII types used by `RwLock` are not `Send`, which is why they can't be held over `await` boundaries. In order to allow asynchronous events processing in multi-threaded environments, we here allow to process events without holding the `total_consistency_lock`.
1 parent 36bf817 commit d7de357

File tree

2 files changed

+50
-33
lines changed

2 files changed

+50
-33
lines changed

lightning-background-processor/src/lib.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -1480,10 +1480,9 @@ mod tests {
14801480
})
14811481
}, false,
14821482
);
1483-
// TODO: Drop _local and simply spawn after #2003
1484-
let local_set = tokio::task::LocalSet::new();
1485-
local_set.spawn_local(bp_future);
1486-
local_set.spawn_local(async move {
1483+
1484+
let t1 = tokio::spawn(bp_future);
1485+
let t2 = tokio::spawn(async move {
14871486
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
14881487
let mut i = 0;
14891488
loop {
@@ -1495,7 +1494,9 @@ mod tests {
14951494
}, tokio::time::sleep(Duration::from_millis(1)).await);
14961495
exit_sender.send(()).unwrap();
14971496
});
1498-
local_set.await;
1497+
let (r1, r2) = tokio::join!(t1, t2);
1498+
r1.unwrap().unwrap();
1499+
r2.unwrap()
14991500
}
15001501

15011502
macro_rules! do_test_payment_path_scoring {
@@ -1649,13 +1650,14 @@ mod tests {
16491650
})
16501651
}, false,
16511652
);
1652-
// TODO: Drop _local and simply spawn after #2003
1653-
let local_set = tokio::task::LocalSet::new();
1654-
local_set.spawn_local(bp_future);
1655-
local_set.spawn_local(async move {
1653+
let t1 = tokio::spawn(bp_future);
1654+
let t2 = tokio::spawn(async move {
16561655
do_test_payment_path_scoring!(nodes, receiver.recv().await);
16571656
exit_sender.send(()).unwrap();
16581657
});
1659-
local_set.await;
1658+
1659+
let (r1, r2) = tokio::join!(t1, t2);
1660+
r1.unwrap().unwrap();
1661+
r2.unwrap()
16601662
}
16611663
}

lightning/src/ln/channelmanager.rs

+38-23
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use core::{cmp, mem};
7272
use core::cell::RefCell;
7373
use crate::io::Read;
7474
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
75-
use core::sync::atomic::{AtomicUsize, Ordering};
75+
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
7676
use core::time::Duration;
7777
use core::ops::Deref;
7878

@@ -926,6 +926,8 @@ where
926926

927927
/// See `ChannelManager` struct-level documentation for lock order requirements.
928928
pending_events: Mutex<Vec<events::Event>>,
929+
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
930+
pending_events_processor: AtomicBool,
929931
/// See `ChannelManager` struct-level documentation for lock order requirements.
930932
pending_background_events: Mutex<Vec<BackgroundEvent>>,
931933
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1680,30 +1682,42 @@ macro_rules! handle_new_monitor_update {
16801682

16811683
macro_rules! process_events_body {
16821684
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
1683-
// We'll acquire our total consistency lock until the returned future completes so that
1684-
// we can be sure no other persists happen while processing events.
1685-
let _read_guard = $self.total_consistency_lock.read().unwrap();
1685+
let mut processed_all_events = false;
1686+
while !processed_all_events {
1687+
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1688+
return;
1689+
}
16861690

1687-
let mut result = NotifyOption::SkipPersist;
1691+
let mut result = NotifyOption::SkipPersist;
16881692

1689-
// TODO: This behavior should be documented. It's unintuitive that we query
1690-
// ChannelMonitors when clearing other events.
1691-
if $self.process_pending_monitor_events() {
1692-
result = NotifyOption::DoPersist;
1693-
}
1693+
// TODO: This behavior should be documented. It's unintuitive that we query
1694+
// ChannelMonitors when clearing other events.
1695+
if $self.process_pending_monitor_events() {
1696+
result = NotifyOption::DoPersist;
1697+
}
16941698

1695-
let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
1696-
if !pending_events.is_empty() {
1697-
result = NotifyOption::DoPersist;
1698-
}
1699+
let pending_events = $self.pending_events.lock().unwrap().clone();
1700+
let num_events = pending_events.len();
1701+
if !pending_events.is_empty() {
1702+
result = NotifyOption::DoPersist;
1703+
}
16991704

1700-
for event in pending_events {
1701-
$event_to_handle = event;
1702-
$handle_event;
1703-
}
1705+
for event in pending_events {
1706+
$event_to_handle = event;
1707+
$handle_event;
1708+
}
17041709

1705-
if result == NotifyOption::DoPersist {
1706-
$self.persistence_notifier.notify();
1710+
{
1711+
let mut pending_events = $self.pending_events.lock().unwrap();
1712+
pending_events.drain(..num_events);
1713+
processed_all_events = pending_events.is_empty();
1714+
}
1715+
1716+
if result == NotifyOption::DoPersist {
1717+
$self.persistence_notifier.notify();
1718+
}
1719+
1720+
$self.pending_events_processor.store(false, Ordering::Release);
17071721
}
17081722
}
17091723
}
@@ -1771,6 +1785,7 @@ where
17711785
per_peer_state: FairRwLock::new(HashMap::new()),
17721786

17731787
pending_events: Mutex::new(Vec::new()),
1788+
pending_events_processor: AtomicBool::new(false),
17741789
pending_background_events: Mutex::new(Vec::new()),
17751790
total_consistency_lock: RwLock::new(()),
17761791
persistence_notifier: Notifier::new(),
@@ -4369,8 +4384,6 @@ where
43694384
}
43704385

43714386
fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
4372-
debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
4373-
43744387
let counterparty_node_id = match counterparty_node_id {
43754388
Some(cp_id) => cp_id.clone(),
43764389
None => {
@@ -5312,7 +5325,8 @@ where
53125325

53135326
/// Process pending events from the [`chain::Watch`], returning whether any events were processed.
53145327
fn process_pending_monitor_events(&self) -> bool {
5315-
debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
5328+
debug_assert!(self.total_consistency_lock.try_write().is_err() ||
5329+
self.pending_events_processor.load(Ordering::Relaxed)); // Caller holds read lock or processes events asynchronously.
53165330

53175331
let mut failed_channels = Vec::new();
53185332
let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
@@ -7916,6 +7930,7 @@ where
79167930
per_peer_state: FairRwLock::new(per_peer_state),
79177931

79187932
pending_events: Mutex::new(pending_events_read),
7933+
pending_events_processor: AtomicBool::new(false),
79197934
pending_background_events: Mutex::new(pending_background_events),
79207935
total_consistency_lock: RwLock::new(()),
79217936
persistence_notifier: Notifier::new(),

0 commit comments

Comments
 (0)