Skip to content

Commit b2cdc2c

Browse files
Merge pull request #3254 from TheBlueMatt/2024-08-flaky-test
Correct `peer_handler::test_process_events_multithreaded`
2 parents f89e963 + e34519b commit b2cdc2c

File tree

2 files changed

+33
-33
lines changed

2 files changed

+33
-33
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3452,48 +3452,44 @@ mod tests {
34523452
#[cfg(feature = "std")]
34533453
fn test_process_events_multithreaded() {
34543454
use std::time::{Duration, Instant};
3455-
// Test that `process_events` getting called on multiple threads doesn't generate too many
3456-
// loop iterations.
3455+
// `process_events` shouldn't block on another thread processing events and instead should
3456+
// simply signal the currently processing thread to go around the loop again.
3457+
// Here we test that this happens by spawning a few threads and checking that we see one go
3458+
// around again at least once.
3459+
//
34573460
// Each time `process_events` goes around the loop we call
3458-
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`.
3459-
// Because the loop should go around once more after a call which fails to take the
3460-
// single-threaded lock, if we write zero to the counter before calling `process_events` we
3461-
// should never observe there having been more than 2 loop iterations.
3462-
// Further, because the last thread to exit will call `process_events` before returning, we
3463-
// should always have at least one count at the end.
3461+
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. Thus,
3462+
// to test we simply write zero to the counter before calling `process_events` and make
3463+
// sure we observe a value greater than one at least once.
34643464
let cfg = Arc::new(create_peermgr_cfgs(1));
34653465
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
34663466
let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap());
34673467

3468-
let exit_flag = Arc::new(AtomicBool::new(false));
3469-
macro_rules! spawn_thread { () => { {
3470-
let thread_cfg = Arc::clone(&cfg);
3468+
let end_time = Instant::now() + Duration::from_millis(100);
3469+
let observed_loop = Arc::new(AtomicBool::new(false));
3470+
let thread_fn = || {
34713471
let thread_peer = Arc::clone(&peer);
3472-
let thread_exit = Arc::clone(&exit_flag);
3473-
std::thread::spawn(move || {
3474-
while !thread_exit.load(Ordering::Acquire) {
3475-
thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release);
3472+
let thread_observed_loop = Arc::clone(&observed_loop);
3473+
move || {
3474+
while Instant::now() < end_time || !thread_observed_loop.load(Ordering::Acquire) {
3475+
test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER.with(|val| val.store(0, Ordering::Relaxed));
34763476
thread_peer.process_events();
3477+
if test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER.with(|val| val.load(Ordering::Relaxed)) > 1 {
3478+
thread_observed_loop.store(true, Ordering::Release);
3479+
return;
3480+
}
34773481
std::thread::sleep(Duration::from_micros(1));
34783482
}
3479-
})
3480-
} } }
3481-
3482-
let thread_a = spawn_thread!();
3483-
let thread_b = spawn_thread!();
3484-
let thread_c = spawn_thread!();
3485-
3486-
let start_time = Instant::now();
3487-
while start_time.elapsed() < Duration::from_millis(100) {
3488-
let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire);
3489-
assert!(val <= 2);
3490-
std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?!
3491-
}
3483+
}
3484+
};
34923485

3493-
exit_flag.store(true, Ordering::Release);
3486+
let thread_a = std::thread::spawn(thread_fn());
3487+
let thread_b = std::thread::spawn(thread_fn());
3488+
let thread_c = std::thread::spawn(thread_fn());
3489+
thread_fn()();
34943490
thread_a.join().unwrap();
34953491
thread_b.join().unwrap();
34963492
thread_c.join().unwrap();
3497-
assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1);
3493+
assert!(observed_loop.load(Ordering::Acquire));
34983494
}
34993495
}

lightning/src/util/test_utils.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -759,17 +759,21 @@ pub struct TestChannelMessageHandler {
759759
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
760760
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
761761
connected_peers: Mutex<HashSet<PublicKey>>,
762-
pub message_fetch_counter: AtomicUsize,
763762
chain_hash: ChainHash,
764763
}
765764

765+
impl TestChannelMessageHandler {
766+
thread_local! {
767+
pub static MESSAGE_FETCH_COUNTER: AtomicUsize = AtomicUsize::new(0);
768+
}
769+
}
770+
766771
impl TestChannelMessageHandler {
767772
pub fn new(chain_hash: ChainHash) -> Self {
768773
TestChannelMessageHandler {
769774
pending_events: Mutex::new(Vec::new()),
770775
expected_recv_msgs: Mutex::new(None),
771776
connected_peers: Mutex::new(new_hash_set()),
772-
message_fetch_counter: AtomicUsize::new(0),
773777
chain_hash,
774778
}
775779
}
@@ -940,7 +944,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
940944

941945
impl events::MessageSendEventsProvider for TestChannelMessageHandler {
942946
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
943-
self.message_fetch_counter.fetch_add(1, Ordering::AcqRel);
947+
Self::MESSAGE_FETCH_COUNTER.with(|val| val.fetch_add(1, Ordering::AcqRel));
944948
let mut pending_events = self.pending_events.lock().unwrap();
945949
let mut ret = Vec::new();
946950
mem::swap(&mut ret, &mut *pending_events);

0 commit comments

Comments
 (0)