From 5a403bdb137bd4324da5a2d03b618db4fdd5a2cb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 26 Jan 2021 15:38:19 -0500 Subject: [PATCH 1/4] Update tokio to 1.0 This requires ensuring TcpStreams are set in nonblocking mode as tokio doesn't handle this for us anymore, so we adapt the public API to just accept std TcpStreams instead of an extra conversion hop. Luckily converting them is cheap. --- lightning-net-tokio/Cargo.toml | 4 ++-- lightning-net-tokio/src/lib.rs | 33 +++++++++++++++++++++------------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/lightning-net-tokio/Cargo.toml b/lightning-net-tokio/Cargo.toml index 50634bd32df..9165388066d 100644 --- a/lightning-net-tokio/Cargo.toml +++ b/lightning-net-tokio/Cargo.toml @@ -12,7 +12,7 @@ For Rust-Lightning clients which wish to make direct connections to Lightning P2 [dependencies] bitcoin = "0.24" lightning = { version = "0.0.12", path = "../lightning" } -tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "sync", "tcp", "time" ] } +tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "sync", "net", "time" ] } [dev-dependencies] -tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "rt-threaded", "sync", "tcp", "time" ] } +tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 8e5885ca9bf..b8b33318f99 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -24,7 +24,7 @@ //! The call site should, thus, look something like this: //! ``` //! use tokio::sync::mpsc; -//! use tokio::net::TcpStream; +//! use std::net::TcpStream; //! use bitcoin::secp256k1::key::PublicKey; //! use lightning::util::events::EventsProvider; //! use std::net::SocketAddr; @@ -86,6 +86,7 @@ use lightning::util::logger::Logger; use std::{task, thread}; use std::net::SocketAddr; +use std::net::TcpStream as StdTcpStream; use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -218,7 +219,7 @@ impl Connection { } } - fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { + fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { // We only ever need a channel of depth 1 here: if we returned a non-full write to the // PeerManager, we will eventually get notified that there is room in the socket to write // new bytes, which will generate an event. That event will be popped off the queue before @@ -229,7 +230,8 @@ impl Connection { // we shove a value into the channel which comes after we've reset the read_paused bool to // false. let (read_waker, read_receiver) = mpsc::channel(1); - let (reader, writer) = io::split(stream); + stream.set_nonblocking(true).unwrap(); + let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap()); (reader, write_receiver, read_receiver, Arc::new(Mutex::new(Self { @@ -248,7 +250,7 @@ impl Connection { /// not need to poll the provided future in order to make progress. /// /// See the module-level documentation for how to handle the event_notify mpsc::Sender. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -290,7 +292,7 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -366,7 +368,7 @@ pub async fn connect_outbound(peer_manager: Arc; - let mut sender = unsafe { (*sender_ptr).clone() }; + let sender = unsafe { (*sender_ptr).clone() }; let _ = sender.try_send(()); } fn drop_socket_waker(orig_ptr: *const ()) { @@ -512,6 +514,7 @@ mod tests { use tokio::sync::mpsc; use std::mem; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -526,6 +529,7 @@ mod tests { expected_pubkey: PublicKey, pubkey_connected: mpsc::Sender<()>, pubkey_disconnected: mpsc::Sender<()>, + disconnected_flag: AtomicBool, msg_events: Mutex>, } impl RoutingMessageHandler for MsgHandler { @@ -559,6 +563,7 @@ mod tests { fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {} fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) { if *their_node_id == self.expected_pubkey { + self.disconnected_flag.store(true, Ordering::SeqCst); self.pubkey_disconnected.clone().try_send(()).unwrap(); } } @@ -591,6 +596,7 @@ mod tests { expected_pubkey: b_pub, pubkey_connected: a_connected_sender, pubkey_disconnected: a_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let a_manager = Arc::new(PeerManager::new(MessageHandler { @@ -604,6 +610,7 @@ mod tests { expected_pubkey: a_pub, pubkey_connected: b_connected_sender, pubkey_disconnected: b_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let b_manager = Arc::new(PeerManager::new(MessageHandler { @@ -624,8 +631,8 @@ mod tests { } else { panic!("Failed to bind to v4 localhost on common ports"); }; let (sender, _receiver) = mpsc::channel(2); - let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap()); - let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap()); + let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a); + let fut_b = super::setup_inbound(b_manager, sender, conn_b); tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); @@ -633,18 +640,20 @@ mod tests { a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError { node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None } }); - assert!(a_disconnected.try_recv().is_err()); - assert!(b_disconnected.try_recv().is_err()); + assert!(!a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(!b_handler.disconnected_flag.load(Ordering::SeqCst)); a_manager.process_events(); tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap(); + assert!(a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(b_handler.disconnected_flag.load(Ordering::SeqCst)); fut_a.await; fut_b.await; } - #[tokio::test(threaded_scheduler)] + #[tokio::test(flavor = "multi_thread")] async fn basic_threaded_connection_test() { do_basic_connection_test().await; } From 07aff06f67299ec502d91ff56d083286a219595d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 26 Jan 2021 16:21:03 -0500 Subject: [PATCH 2/4] Bump MSRV for net-tokio to 1.45 as tokio 1.0 req very recent rustc --- .github/workflows/build.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bafdd29fffd..92eb6c87b83 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,8 +13,8 @@ jobs: 1.30.0, # 1.34.2 is Debian stable 1.34.2, - # 1.39.0 is MSRV for lightning-net-tokio and generates coverage - 1.39.0] + # 1.45.2 is MSRV for lightning-net-tokio and generates coverage + 1.45.2] include: - toolchain: stable build-net-tokio: true @@ -26,7 +26,7 @@ jobs: build-net-tokio: true - toolchain: beta build-net-tokio: true - - toolchain: 1.39.0 + - toolchain: 1.45.2 build-net-tokio: true coverage: true runs-on: ${{ matrix.platform }} @@ -180,7 +180,7 @@ jobs: linting: runs-on: ubuntu-latest env: - TOOLCHAIN: 1.39.0 + TOOLCHAIN: 1.45.2 steps: - name: Checkout source code uses: actions/checkout@v2 From 11f5e23357d74f2b322c29620c383953c70de5e6 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 26 Jan 2021 17:17:01 -0500 Subject: [PATCH 3/4] Fix codecov by calling the new binary paths from rust 1.45 Rustc 1.45 moved the paths to test binaries, so we need to update our CI scripts to run the correct ones under kcov. The solution to this was pointed out by Val at https://github.com/rust-bitcoin/rust-lightning/pull/774#issuecomment-763250623 --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 92eb6c87b83..84fc509462f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -75,7 +75,7 @@ jobs: - name: Generate coverage report if: matrix.coverage run: | - for file in target/debug/lightning-*; do + for file in target/debug/deps/lightning*; do [ -x "${file}" ] || continue; mkdir -p "target/cov/$(basename $file)"; ./kcov-build/usr/local/bin/kcov --exclude-pattern=/.cargo,/usr/lib --verify "target/cov/$(basename $file)" "$file"; From 32abba7201f4045d31bc7c7df43683028989b4aa Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 27 Jan 2021 23:01:37 -0500 Subject: [PATCH 4/4] Drop spurious semicolon that new rustc complains about --- lightning-net-tokio/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index b8b33318f99..80f7e055283 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -157,7 +157,7 @@ impl Connection { // In this case, we do need to call peer_manager.socket_disconnected() to inform // Rust-Lightning that the socket is gone. PeerDisconnected - }; + } let disconnect_type = loop { macro_rules! shutdown_socket { ($err: expr, $need_disconnect: expr) => { {