From f227ece44c2ce48b4910e73b58a659bd1bb99978 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Thu, 23 Jan 2020 13:36:48 +0100 Subject: [PATCH 1/2] Use non-blocking connect for TcpStream. Instead of spawning a background thread which is unaware of any timeouts but continues to run until the TCP stack decides that the remote is not reachable we use mio's non-blocking connect. mio's `TcpStream::connect` returns immediately but the actual connection is usually just in progress and we have to be sure the socket is writeable before we can consider the connection as established. --- src/net/driver/mod.rs | 77 ++++++++++++++++++++++++++++++++++++++++--- src/net/tcp/stream.rs | 58 ++++++++++++++++++++++++-------- 2 files changed, 116 insertions(+), 19 deletions(-) diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 7f33e8594..d86c4d9bb 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -59,6 +59,17 @@ impl Reactor { /// Registers an I/O event source and returns its associated entry. fn register(&self, source: &dyn Evented) -> io::Result> { + self.register_with(source, Interest::All, Vec::new(), Vec::new()) + } + + /// Registers an I/O event source and returns its associated entry. + fn register_with( + &self, + source: &dyn Evented, + interest: Interest, + readers: Vec, + writers: Vec + ) -> io::Result> { let mut entries = self.entries.lock().unwrap(); // Reserve a vacant spot in the slab and use its key as the token value. @@ -68,19 +79,23 @@ impl Reactor { // Allocate an entry and insert it into the slab. let entry = Arc::new(Entry { token, - readers: Mutex::new(Vec::new()), - writers: Mutex::new(Vec::new()), + readers: Mutex::new(readers), + writers: Mutex::new(writers), }); vacant.insert(entry.clone()); // Register the I/O event source in the poller. - let interest = mio::Ready::all(); let opts = mio::PollOpt::edge(); - self.poller.register(source, token, interest, opts)?; + self.poller.register(source, token, interest.to_ready_set(), opts)?; Ok(entry) } + /// Re-register a previously registered event source with the given interest. + fn reregister(&self, source: &dyn Evented, entry: &Entry, interest: Interest) -> io::Result<()> { + self.poller.reregister(source, entry.token, interest.to_ready_set(), mio::PollOpt::edge()) + } + /// Deregisters an I/O event source associated with an entry. fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> { // Deregister the I/O object from the mio instance. @@ -161,6 +176,28 @@ fn main_loop() -> io::Result<()> { } } +/// The kind of readiness to be notified about. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Interest { + /// Notify about read readiness. + Read, + /// Notify about write readiness. + Write, + /// Notify about general readiness. + All +} + +impl Interest { + /// Translate to mio's readiness set. + fn to_ready_set(self) -> mio::Ready { + match self { + Interest::Read => reader_interests(), + Interest::Write => writer_interests(), + Interest::All => mio::Ready::all() + } + } +} + /// An I/O handle powered by the networking driver. /// /// This handle wraps an I/O event source and exposes a "futurized" interface on top of it, @@ -171,6 +208,9 @@ pub struct Watcher { /// The I/O event source. source: Option, + + /// The interest of this watcher. + interest: Interest } impl Watcher { @@ -179,11 +219,27 @@ impl Watcher { /// The provided I/O event source will be kept registered inside the reactor's poller for the /// lifetime of the returned I/O handle. pub fn new(source: T) -> Watcher { + Watcher::new_with(source, Interest::All, None, None) + } + + /// Creates a new Watcher. + /// + /// The provided I/O event source will be kept registered inside the reactor's poller for the + /// lifetime of the returned Watcher. + pub fn new_with( + source: T, + interest: Interest, + reader: Option, + writer: Option + ) -> Watcher { + let readers = reader.map(|r| vec![r]).unwrap_or_default(); + let writers = writer.map(|w| vec![w]).unwrap_or_default(); Watcher { entry: REACTOR - .register(&source) + .register_with(&source, interest, readers, writers) .expect("cannot register an I/O event source"), source: Some(source), + interest } } @@ -192,6 +248,13 @@ impl Watcher { self.source.as_ref().unwrap() } + /// Change this Watcher's registration. + pub fn reconfigure(&mut self, interest: Interest) -> io::Result<()> { + REACTOR.reregister(self.get_ref(), self.entry.as_ref(), interest)?; + self.interest = interest; + Ok(()) + } + /// Polls the inner I/O source for a non-blocking read operation. /// /// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task @@ -200,6 +263,8 @@ impl Watcher { where F: FnMut(&'a T) -> io::Result, { + debug_assert!(self.interest == Interest::Read || self.interest == Interest::All); + // If the operation isn't blocked, return its result. match f(self.source.as_ref().unwrap()) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} @@ -235,6 +300,8 @@ impl Watcher { where F: FnMut(&'a T) -> io::Result, { + debug_assert!(self.interest == Interest::Write || self.interest == Interest::All); + // If the operation isn't blocked, return its result. match f(self.source.as_ref().unwrap()) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index ae8ca7dc8..e2ce56e78 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -4,10 +4,9 @@ use std::pin::Pin; use crate::future; use crate::io::{self, Read, Write}; -use crate::net::driver::Watcher; +use crate::net::driver::{Interest, Watcher}; use crate::net::ToSocketAddrs; -use crate::task::{spawn_blocking, Context, Poll}; -use crate::utils::Context as _; +use crate::task::{Context, Poll}; /// A TCP stream between a local and a remote socket. /// @@ -77,20 +76,51 @@ impl TcpStream { .await?; for addr in addrs { - let res = spawn_blocking(move || { - let std_stream = std::net::TcpStream::connect(addr) - .context(|| format!("could not connect to {}", addr))?; - let mio_stream = mio::net::TcpStream::from_stream(std_stream) - .context(|| format!("could not open async connection to {}", addr))?; - Ok(TcpStream { - watcher: Watcher::new(mio_stream), - }) + let mut watcher = None; + let connected = future::poll_fn(|cx| { + match &mut watcher { + None => { + // mio's connect is non-blocking and may just be in progress when + // it returns with `Ok`. We therefore register our write interest + // and once writable we know the connection is either established + // or there was an error which we check for afterwards. + match mio::net::TcpStream::connect(&addr) { + Ok(s) => { + let waker = cx.waker().clone(); + watcher = Some(Watcher::new_with(s, Interest::Write, None, Some(waker))) + } + Err(e) => return Poll::Ready(Err(e)) + } + Poll::Pending + } + Some(w) => + if let Err(e) = w.reconfigure(Interest::All) { + Poll::Ready(Err(e)) + } else { + Poll::Ready(Ok(())) + } + } }) .await; - match res { - Ok(stream) => return Ok(stream), - Err(err) => last_err = Some(err), + if let Err(e) = connected { + last_err = Some(e); + continue + } + + debug_assert!(watcher.is_some()); + + let watcher = + if let Some(w) = watcher { + w + } else { + continue + }; + + match watcher.get_ref().take_error() { + Ok(None) => return Ok(TcpStream { watcher }), + Ok(Some(e)) => last_err = Some(e), + Err(e) => last_err = Some(e) } } From 9f7e23302e8588f187a260b9320b065795be622c Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Thu, 23 Jan 2020 22:33:34 +0100 Subject: [PATCH 2/2] Add Watcher::{poll_read_ready, poll_write_ready}. Following a suggestion of @stjepang we offer methods to check for read/write readiness of a `Watcher` instead of the previous approach to accept a set of `Waker`s when registering an event source. The changes relative to master are smaller and both methods look more useful in other contexts. Also the code is more robust w.r.t. wakeups of the `Waker` from clones outside the `Reactor`. I am not sure if we need to add protection mechanisms against spurious wakeups from mio. Currently we treat the `Poll::Ready(())` of `Watcher::poll_write_ready` as proof that the non-blocking connect has finished, but if the event from mio was a spurious one, it might still be ongoing. --- src/net/driver/mod.rs | 162 +++++++++++++++++++++--------------------- src/net/tcp/stream.rs | 51 ++++--------- 2 files changed, 92 insertions(+), 121 deletions(-) diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index d86c4d9bb..07ef2c7d2 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -16,10 +16,30 @@ struct Entry { token: mio::Token, /// Tasks that are blocked on reading from this I/O handle. - readers: Mutex>, + readers: Mutex, /// Thasks that are blocked on writing to this I/O handle. - writers: Mutex>, + writers: Mutex, +} + +/// The set of `Waker`s interested in read readiness. +#[derive(Debug)] +struct Readers { + /// Flag indicating read readiness. + /// (cf. `Watcher::poll_read_ready`) + ready: bool, + /// The `Waker`s blocked on reading. + wakers: Vec +} + +/// The set of `Waker`s interested in write readiness. +#[derive(Debug)] +struct Writers { + /// Flag indicating write readiness. + /// (cf. `Watcher::poll_write_ready`) + ready: bool, + /// The `Waker`s blocked on writing. + wakers: Vec } /// The state of a networking driver. @@ -59,17 +79,6 @@ impl Reactor { /// Registers an I/O event source and returns its associated entry. fn register(&self, source: &dyn Evented) -> io::Result> { - self.register_with(source, Interest::All, Vec::new(), Vec::new()) - } - - /// Registers an I/O event source and returns its associated entry. - fn register_with( - &self, - source: &dyn Evented, - interest: Interest, - readers: Vec, - writers: Vec - ) -> io::Result> { let mut entries = self.entries.lock().unwrap(); // Reserve a vacant spot in the slab and use its key as the token value. @@ -79,23 +88,19 @@ impl Reactor { // Allocate an entry and insert it into the slab. let entry = Arc::new(Entry { token, - readers: Mutex::new(readers), - writers: Mutex::new(writers), + readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }), + writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }), }); vacant.insert(entry.clone()); // Register the I/O event source in the poller. + let interest = mio::Ready::all(); let opts = mio::PollOpt::edge(); - self.poller.register(source, token, interest.to_ready_set(), opts)?; + self.poller.register(source, token, interest, opts)?; Ok(entry) } - /// Re-register a previously registered event source with the given interest. - fn reregister(&self, source: &dyn Evented, entry: &Entry, interest: Interest) -> io::Result<()> { - self.poller.reregister(source, entry.token, interest.to_ready_set(), mio::PollOpt::edge()) - } - /// Deregisters an I/O event source associated with an entry. fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> { // Deregister the I/O object from the mio instance. @@ -159,14 +164,18 @@ fn main_loop() -> io::Result<()> { // Wake up reader tasks blocked on this I/O handle. if !(readiness & reader_interests()).is_empty() { - for w in entry.readers.lock().unwrap().drain(..) { + let mut readers = entry.readers.lock().unwrap(); + readers.ready = true; + for w in readers.wakers.drain(..) { w.wake(); } } // Wake up writer tasks blocked on this I/O handle. if !(readiness & writer_interests()).is_empty() { - for w in entry.writers.lock().unwrap().drain(..) { + let mut writers = entry.writers.lock().unwrap(); + writers.ready = true; + for w in writers.wakers.drain(..) { w.wake(); } } @@ -176,28 +185,6 @@ fn main_loop() -> io::Result<()> { } } -/// The kind of readiness to be notified about. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Interest { - /// Notify about read readiness. - Read, - /// Notify about write readiness. - Write, - /// Notify about general readiness. - All -} - -impl Interest { - /// Translate to mio's readiness set. - fn to_ready_set(self) -> mio::Ready { - match self { - Interest::Read => reader_interests(), - Interest::Write => writer_interests(), - Interest::All => mio::Ready::all() - } - } -} - /// An I/O handle powered by the networking driver. /// /// This handle wraps an I/O event source and exposes a "futurized" interface on top of it, @@ -208,9 +195,6 @@ pub struct Watcher { /// The I/O event source. source: Option, - - /// The interest of this watcher. - interest: Interest } impl Watcher { @@ -219,27 +203,11 @@ impl Watcher { /// The provided I/O event source will be kept registered inside the reactor's poller for the /// lifetime of the returned I/O handle. pub fn new(source: T) -> Watcher { - Watcher::new_with(source, Interest::All, None, None) - } - - /// Creates a new Watcher. - /// - /// The provided I/O event source will be kept registered inside the reactor's poller for the - /// lifetime of the returned Watcher. - pub fn new_with( - source: T, - interest: Interest, - reader: Option, - writer: Option - ) -> Watcher { - let readers = reader.map(|r| vec![r]).unwrap_or_default(); - let writers = writer.map(|w| vec![w]).unwrap_or_default(); Watcher { entry: REACTOR - .register_with(&source, interest, readers, writers) + .register(&source) .expect("cannot register an I/O event source"), source: Some(source), - interest } } @@ -248,13 +216,6 @@ impl Watcher { self.source.as_ref().unwrap() } - /// Change this Watcher's registration. - pub fn reconfigure(&mut self, interest: Interest) -> io::Result<()> { - REACTOR.reregister(self.get_ref(), self.entry.as_ref(), interest)?; - self.interest = interest; - Ok(()) - } - /// Polls the inner I/O source for a non-blocking read operation. /// /// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task @@ -263,8 +224,6 @@ impl Watcher { where F: FnMut(&'a T) -> io::Result, { - debug_assert!(self.interest == Interest::Read || self.interest == Interest::All); - // If the operation isn't blocked, return its result. match f(self.source.as_ref().unwrap()) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} @@ -272,7 +231,7 @@ impl Watcher { } // Lock the waker list. - let mut list = self.entry.readers.lock().unwrap(); + let mut readers = self.entry.readers.lock().unwrap(); // Try running the operation again. match f(self.source.as_ref().unwrap()) { @@ -281,10 +240,12 @@ impl Watcher { } // Register the task if it isn't registered already. - if list.iter().all(|w| !w.will_wake(cx.waker())) { - list.push(cx.waker().clone()); + if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + readers.wakers.push(cx.waker().clone()); } + readers.ready = false; + Poll::Pending } @@ -300,8 +261,6 @@ impl Watcher { where F: FnMut(&'a T) -> io::Result, { - debug_assert!(self.interest == Interest::Write || self.interest == Interest::All); - // If the operation isn't blocked, return its result. match f(self.source.as_ref().unwrap()) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} @@ -309,7 +268,7 @@ impl Watcher { } // Lock the waker list. - let mut list = self.entry.writers.lock().unwrap(); + let mut writers = self.entry.writers.lock().unwrap(); // Try running the operation again. match f(self.source.as_ref().unwrap()) { @@ -318,10 +277,49 @@ impl Watcher { } // Register the task if it isn't registered already. - if list.iter().all(|w| !w.will_wake(cx.waker())) { - list.push(cx.waker().clone()); + if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + writers.wakers.push(cx.waker().clone()); } + writers.ready = false; + + Poll::Pending + } + + /// Polls the inner I/O source until a non-blocking read can be performed. + /// + /// If non-blocking reads are currently not possible, the `Waker` + /// will be saved and notified when it can read non-blocking + /// again. + #[allow(dead_code)] + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lock the waker list. + let mut readers = self.entry.readers.lock().unwrap(); + if readers.ready { + return Poll::Ready(()) + } + // Register the task if it isn't registered already. + if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + readers.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + + /// Polls the inner I/O source until a non-blocking write can be performed. + /// + /// If non-blocking writes are currently not possible, the `Waker` + /// will be saved and notified when it can write non-blocking + /// again. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lock the waker list. + let mut writers = self.entry.writers.lock().unwrap(); + if writers.ready { + return Poll::Ready(()) + } + // Register the task if it isn't registered already. + if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + writers.wakers.push(cx.waker().clone()); + } Poll::Pending } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index e2ce56e78..537bd4cdc 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use crate::future; use crate::io::{self, Read, Write}; -use crate::net::driver::{Interest, Watcher}; +use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; use crate::task::{Context, Poll}; @@ -76,46 +76,19 @@ impl TcpStream { .await?; for addr in addrs { - let mut watcher = None; - let connected = future::poll_fn(|cx| { - match &mut watcher { - None => { - // mio's connect is non-blocking and may just be in progress when - // it returns with `Ok`. We therefore register our write interest - // and once writable we know the connection is either established - // or there was an error which we check for afterwards. - match mio::net::TcpStream::connect(&addr) { - Ok(s) => { - let waker = cx.waker().clone(); - watcher = Some(Watcher::new_with(s, Interest::Write, None, Some(waker))) - } - Err(e) => return Poll::Ready(Err(e)) - } - Poll::Pending - } - Some(w) => - if let Err(e) = w.reconfigure(Interest::All) { - Poll::Ready(Err(e)) - } else { - Poll::Ready(Ok(())) - } + // mio's TcpStream::connect is non-blocking and may just be in progress + // when it returns with `Ok`. We therefore wait for write readiness to + // be sure the connection has either been established or there was an + // error which we check for afterwards. + let watcher = match mio::net::TcpStream::connect(&addr) { + Ok(s) => Watcher::new(s), + Err(e) => { + last_err = Some(e); + continue } - }) - .await; - - if let Err(e) = connected { - last_err = Some(e); - continue - } + }; - debug_assert!(watcher.is_some()); - - let watcher = - if let Some(w) = watcher { - w - } else { - continue - }; + future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; match watcher.get_ref().take_error() { Ok(None) => return Ok(TcpStream { watcher }),