diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 7f33e8594..07ef2c7d2 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -16,10 +16,30 @@ struct Entry { token: mio::Token, /// Tasks that are blocked on reading from this I/O handle. - readers: Mutex>, + readers: Mutex, /// Thasks that are blocked on writing to this I/O handle. - writers: Mutex>, + writers: Mutex, +} + +/// The set of `Waker`s interested in read readiness. +#[derive(Debug)] +struct Readers { + /// Flag indicating read readiness. + /// (cf. `Watcher::poll_read_ready`) + ready: bool, + /// The `Waker`s blocked on reading. + wakers: Vec +} + +/// The set of `Waker`s interested in write readiness. +#[derive(Debug)] +struct Writers { + /// Flag indicating write readiness. + /// (cf. `Watcher::poll_write_ready`) + ready: bool, + /// The `Waker`s blocked on writing. + wakers: Vec } /// The state of a networking driver. @@ -68,8 +88,8 @@ impl Reactor { // Allocate an entry and insert it into the slab. let entry = Arc::new(Entry { token, - readers: Mutex::new(Vec::new()), - writers: Mutex::new(Vec::new()), + readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }), + writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }), }); vacant.insert(entry.clone()); @@ -144,14 +164,18 @@ fn main_loop() -> io::Result<()> { // Wake up reader tasks blocked on this I/O handle. if !(readiness & reader_interests()).is_empty() { - for w in entry.readers.lock().unwrap().drain(..) { + let mut readers = entry.readers.lock().unwrap(); + readers.ready = true; + for w in readers.wakers.drain(..) { w.wake(); } } // Wake up writer tasks blocked on this I/O handle. if !(readiness & writer_interests()).is_empty() { - for w in entry.writers.lock().unwrap().drain(..) { + let mut writers = entry.writers.lock().unwrap(); + writers.ready = true; + for w in writers.wakers.drain(..) { w.wake(); } } @@ -207,7 +231,7 @@ impl Watcher { } // Lock the waker list. - let mut list = self.entry.readers.lock().unwrap(); + let mut readers = self.entry.readers.lock().unwrap(); // Try running the operation again. match f(self.source.as_ref().unwrap()) { @@ -216,10 +240,12 @@ impl Watcher { } // Register the task if it isn't registered already. - if list.iter().all(|w| !w.will_wake(cx.waker())) { - list.push(cx.waker().clone()); + if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + readers.wakers.push(cx.waker().clone()); } + readers.ready = false; + Poll::Pending } @@ -242,7 +268,7 @@ impl Watcher { } // Lock the waker list. - let mut list = self.entry.writers.lock().unwrap(); + let mut writers = self.entry.writers.lock().unwrap(); // Try running the operation again. match f(self.source.as_ref().unwrap()) { @@ -251,10 +277,49 @@ impl Watcher { } // Register the task if it isn't registered already. - if list.iter().all(|w| !w.will_wake(cx.waker())) { - list.push(cx.waker().clone()); + if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + writers.wakers.push(cx.waker().clone()); } + writers.ready = false; + + Poll::Pending + } + + /// Polls the inner I/O source until a non-blocking read can be performed. + /// + /// If non-blocking reads are currently not possible, the `Waker` + /// will be saved and notified when it can read non-blocking + /// again. + #[allow(dead_code)] + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lock the waker list. + let mut readers = self.entry.readers.lock().unwrap(); + if readers.ready { + return Poll::Ready(()) + } + // Register the task if it isn't registered already. + if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + readers.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + + /// Polls the inner I/O source until a non-blocking write can be performed. + /// + /// If non-blocking writes are currently not possible, the `Waker` + /// will be saved and notified when it can write non-blocking + /// again. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lock the waker list. + let mut writers = self.entry.writers.lock().unwrap(); + if writers.ready { + return Poll::Ready(()) + } + // Register the task if it isn't registered already. + if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + writers.wakers.push(cx.waker().clone()); + } Poll::Pending } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index ae8ca7dc8..537bd4cdc 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -6,8 +6,7 @@ use crate::future; use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; -use crate::task::{spawn_blocking, Context, Poll}; -use crate::utils::Context as _; +use crate::task::{Context, Poll}; /// A TCP stream between a local and a remote socket. /// @@ -77,20 +76,24 @@ impl TcpStream { .await?; for addr in addrs { - let res = spawn_blocking(move || { - let std_stream = std::net::TcpStream::connect(addr) - .context(|| format!("could not connect to {}", addr))?; - let mio_stream = mio::net::TcpStream::from_stream(std_stream) - .context(|| format!("could not open async connection to {}", addr))?; - Ok(TcpStream { - watcher: Watcher::new(mio_stream), - }) - }) - .await; + // mio's TcpStream::connect is non-blocking and may just be in progress + // when it returns with `Ok`. We therefore wait for write readiness to + // be sure the connection has either been established or there was an + // error which we check for afterwards. + let watcher = match mio::net::TcpStream::connect(&addr) { + Ok(s) => Watcher::new(s), + Err(e) => { + last_err = Some(e); + continue + } + }; - match res { - Ok(stream) => return Ok(stream), - Err(err) => last_err = Some(err), + future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; + + match watcher.get_ref().take_error() { + Ok(None) => return Ok(TcpStream { watcher }), + Ok(Some(e)) => last_err = Some(e), + Err(e) => last_err = Some(e) } }