From 428c875ac33c1a8375d4e583ea708e230a324310 Mon Sep 17 00:00:00 2001 From: Julien Cretin Date: Sun, 12 Nov 2017 00:30:46 +0100 Subject: [PATCH 1/3] Add std::sync::mpsc::Receiver::recv_deadline() Essentially renames recv_max_until to recv_deadline (mostly copying recv_timeout documentation). This function is useful to avoid the often unnecessary call to Instant::now in recv_timeout (e.g. when the user already has a deadline). A concrete example would be something along those lines: ```rust use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; /// Reads a batch of elements /// /// Returns as soon as `max_size` elements have been received or `timeout` expires. fn recv_batch_timeout(receiver: &Receiver, timeout: Duration, max_size: usize) -> Vec { recv_batch_deadline(receiver, Instant::now() + timeout, max_size) } /// Reads a batch of elements /// /// Returns as soon as `max_size` elements have been received or `deadline` is reached. fn recv_batch_deadline(receiver: &Receiver, deadline: Instant, max_size: usize) -> Vec { let mut result = Vec::new(); while let Ok(x) = receiver.recv_deadline(deadline) { result.push(x); if result.len() == max_size { break; } } result } ``` --- src/libstd/sync/mpsc/mod.rs | 63 +++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 45a26e594b065..6462c1b7976a9 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -1297,11 +1297,70 @@ impl Receiver { Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected), Err(TryRecvError::Empty) - => self.recv_max_until(Instant::now() + timeout) + => self.recv_deadline(Instant::now() + timeout) } } - fn recv_max_until(&self, deadline: Instant) -> Result { + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if `deadline` is reached. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// + /// # Examples + /// + /// Successfully receiving value before reaching deadline: + /// + /// ```no_run + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching deadline: + /// + /// ```no_run + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[stable(feature = "mpsc_recv_deadline", since = "1.23.0")] + pub fn recv_deadline(&self, deadline: Instant) -> Result { use self::RecvTimeoutError::*; loop { From 8424cacff8cc59a8408bbdd6b83cab5b43802aad Mon Sep 17 00:00:00 2001 From: Julien Cretin Date: Mon, 27 Nov 2017 22:31:00 +0100 Subject: [PATCH 2/3] Use an unstable feature linked to #46316 --- src/libstd/sync/mpsc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 6462c1b7976a9..10ece333c2c95 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -1359,7 +1359,7 @@ impl Receiver { /// Err(mpsc::RecvTimeoutError::Timeout) /// ); /// ``` - #[stable(feature = "mpsc_recv_deadline", since = "1.23.0")] + #[unstable(feature = "deadline_api", issue = "46316")] pub fn recv_deadline(&self, deadline: Instant) -> Result { use self::RecvTimeoutError::*; From 8e025d8009b661bde3397bf323aa088ba23e46d1 Mon Sep 17 00:00:00 2001 From: Julien Cretin Date: Tue, 28 Nov 2017 21:22:30 +0100 Subject: [PATCH 3/3] Fix doc test of previous commit --- src/libstd/sync/mpsc/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 10ece333c2c95..4bf420ba68a9b 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -1324,6 +1324,7 @@ impl Receiver { /// Successfully receiving value before reaching deadline: /// /// ```no_run + /// #![feature(deadline_api)] /// use std::thread; /// use std::time::{Duration, Instant}; /// use std::sync::mpsc; @@ -1343,6 +1344,7 @@ impl Receiver { /// Receiving an error upon reaching deadline: /// /// ```no_run + /// #![feature(deadline_api)] /// use std::thread; /// use std::time::{Duration, Instant}; /// use std::sync::mpsc;