diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bafdd29fffd..84fc509462f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,8 +13,8 @@ jobs: 1.30.0, # 1.34.2 is Debian stable 1.34.2, - # 1.39.0 is MSRV for lightning-net-tokio and generates coverage - 1.39.0] + # 1.45.2 is MSRV for lightning-net-tokio and generates coverage + 1.45.2] include: - toolchain: stable build-net-tokio: true @@ -26,7 +26,7 @@ jobs: build-net-tokio: true - toolchain: beta build-net-tokio: true - - toolchain: 1.39.0 + - toolchain: 1.45.2 build-net-tokio: true coverage: true runs-on: ${{ matrix.platform }} @@ -75,7 +75,7 @@ jobs: - name: Generate coverage report if: matrix.coverage run: | - for file in target/debug/lightning-*; do + for file in target/debug/deps/lightning*; do [ -x "${file}" ] || continue; mkdir -p "target/cov/$(basename $file)"; ./kcov-build/usr/local/bin/kcov --exclude-pattern=/.cargo,/usr/lib --verify "target/cov/$(basename $file)" "$file"; @@ -180,7 +180,7 @@ jobs: linting: runs-on: ubuntu-latest env: - TOOLCHAIN: 1.39.0 + TOOLCHAIN: 1.45.2 steps: - name: Checkout source code uses: actions/checkout@v2 diff --git a/lightning-net-tokio/Cargo.toml b/lightning-net-tokio/Cargo.toml index 50634bd32df..9165388066d 100644 --- a/lightning-net-tokio/Cargo.toml +++ b/lightning-net-tokio/Cargo.toml @@ -12,7 +12,7 @@ For Rust-Lightning clients which wish to make direct connections to Lightning P2 [dependencies] bitcoin = "0.24" lightning = { version = "0.0.12", path = "../lightning" } -tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "sync", "tcp", "time" ] } +tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "sync", "net", "time" ] } [dev-dependencies] -tokio = { version = ">=0.2.12", features = [ "io-util", "macros", "rt-core", "rt-threaded", "sync", "tcp", "time" ] } +tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 8e5885ca9bf..80f7e055283 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -24,7 +24,7 @@ //! The call site should, thus, look something like this: //! ``` //! use tokio::sync::mpsc; -//! use tokio::net::TcpStream; +//! use std::net::TcpStream; //! use bitcoin::secp256k1::key::PublicKey; //! use lightning::util::events::EventsProvider; //! use std::net::SocketAddr; @@ -86,6 +86,7 @@ use lightning::util::logger::Logger; use std::{task, thread}; use std::net::SocketAddr; +use std::net::TcpStream as StdTcpStream; use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -156,7 +157,7 @@ impl Connection { // In this case, we do need to call peer_manager.socket_disconnected() to inform // Rust-Lightning that the socket is gone. PeerDisconnected - }; + } let disconnect_type = loop { macro_rules! shutdown_socket { ($err: expr, $need_disconnect: expr) => { { @@ -218,7 +219,7 @@ impl Connection { } } - fn new(event_notify: mpsc::Sender<()>, stream: TcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { + fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { // We only ever need a channel of depth 1 here: if we returned a non-full write to the // PeerManager, we will eventually get notified that there is room in the socket to write // new bytes, which will generate an event. That event will be popped off the queue before @@ -229,7 +230,8 @@ impl Connection { // we shove a value into the channel which comes after we've reset the read_paused bool to // false. let (read_waker, read_receiver) = mpsc::channel(1); - let (reader, writer) = io::split(stream); + stream.set_nonblocking(true).unwrap(); + let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap()); (reader, write_receiver, read_receiver, Arc::new(Mutex::new(Self { @@ -248,7 +250,7 @@ impl Connection { /// not need to poll the provided future in order to make progress. /// /// See the module-level documentation for how to handle the event_notify mpsc::Sender. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -290,7 +292,7 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { @@ -366,7 +368,7 @@ pub async fn connect_outbound(peer_manager: Arc; - let mut sender = unsafe { (*sender_ptr).clone() }; + let sender = unsafe { (*sender_ptr).clone() }; let _ = sender.try_send(()); } fn drop_socket_waker(orig_ptr: *const ()) { @@ -512,6 +514,7 @@ mod tests { use tokio::sync::mpsc; use std::mem; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -526,6 +529,7 @@ mod tests { expected_pubkey: PublicKey, pubkey_connected: mpsc::Sender<()>, pubkey_disconnected: mpsc::Sender<()>, + disconnected_flag: AtomicBool, msg_events: Mutex>, } impl RoutingMessageHandler for MsgHandler { @@ -559,6 +563,7 @@ mod tests { fn handle_announcement_signatures(&self, _their_node_id: &PublicKey, _msg: &AnnouncementSignatures) {} fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) { if *their_node_id == self.expected_pubkey { + self.disconnected_flag.store(true, Ordering::SeqCst); self.pubkey_disconnected.clone().try_send(()).unwrap(); } } @@ -591,6 +596,7 @@ mod tests { expected_pubkey: b_pub, pubkey_connected: a_connected_sender, pubkey_disconnected: a_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let a_manager = Arc::new(PeerManager::new(MessageHandler { @@ -604,6 +610,7 @@ mod tests { expected_pubkey: a_pub, pubkey_connected: b_connected_sender, pubkey_disconnected: b_disconnected_sender, + disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); let b_manager = Arc::new(PeerManager::new(MessageHandler { @@ -624,8 +631,8 @@ mod tests { } else { panic!("Failed to bind to v4 localhost on common ports"); }; let (sender, _receiver) = mpsc::channel(2); - let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, tokio::net::TcpStream::from_std(conn_a).unwrap()); - let fut_b = super::setup_inbound(b_manager, sender, tokio::net::TcpStream::from_std(conn_b).unwrap()); + let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a); + let fut_b = super::setup_inbound(b_manager, sender, conn_b); tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); @@ -633,18 +640,20 @@ mod tests { a_handler.msg_events.lock().unwrap().push(MessageSendEvent::HandleError { node_id: b_pub, action: ErrorAction::DisconnectPeer { msg: None } }); - assert!(a_disconnected.try_recv().is_err()); - assert!(b_disconnected.try_recv().is_err()); + assert!(!a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(!b_handler.disconnected_flag.load(Ordering::SeqCst)); a_manager.process_events(); tokio::time::timeout(Duration::from_secs(10), a_disconnected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_disconnected.recv()).await.unwrap(); + assert!(a_handler.disconnected_flag.load(Ordering::SeqCst)); + assert!(b_handler.disconnected_flag.load(Ordering::SeqCst)); fut_a.await; fut_b.await; } - #[tokio::test(threaded_scheduler)] + #[tokio::test(flavor = "multi_thread")] async fn basic_threaded_connection_test() { do_basic_connection_test().await; }