Skip to content

Commit 0d75975

Browse files
authored
Auto merge of #34724 - mitchmindtree:mpsc_receiver_try_recv, r=alexcrichton
Add a method to the mpsc::Receiver for producing a non-blocking iterator Currently, the `mpsc::Receiver` offers methods for receiving values in both blocking (`recv`) and non-blocking (`try_recv`) flavours. However only blocking iteration over values is supported. This PR adds a non-blocking iterator to complement the `try_recv` method, just as the blocking iterator complements the `recv` method. Use-case ------------- I predominantly use rust in my work on real-time systems and in particular real-time audio generation/processing. I use `mpsc::channel`s to communicate between threads in a purely non-blocking manner. I.e. I might send messages from the GUI thread to the audio thread to update the state of the dsp-graph, or from the audio thread to the GUI thread to display the RMS of each node. These are just a couple examples (I'm probably using 30+ channels across my various projects). I almost exclusively use the `mpsc::Receiver::try_recv` method to avoid blocking any of the real-time threads and causing unwanted glitching/stuttering. Now that I mention it, I can't think of a single time that I personally have used the `recv` method (though I can of course see why it would be useful, and perhaps the common case for many people). As a result of this experience, I can't help but feel there is a large hole in the `Receiver` API. | blocking | non-blocking | |------------|--------------------| | `recv` | `try_recv` | | `iter` | 🙀 | For the most part, I've been working around this using `while let Ok(v) = r.try_recv() { ... }`, however as nice as this is, it is clearly no match for the Iterator API. As an example, in the majority of my channel use cases I only want to check for *n* number of messages before breaking from the loop so that I don't miss the audio IO callback or hog the GUI thread for too long when an unexpectedly large number of messages are sent. Currently, I have to write something like this: ```rust let mut take = 100; while let Ok(msg) = rx.try_recv() { // Do stuff with msg if take == 0 { break; } take -= 1; } ``` or wrap the `try_recv` call in a `Range<usize>`/`FilterMap` iterator combo. On the other hand, this PR would allow for the following: ```rust for msg in rx.try_iter().take(100) { // Do stuff with msg } ``` I imagine this might also be useful to game devs, embedded or anyone doing message passing across real-time threads.
2 parents d46ed83 + 05af033 commit 0d75975

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

src/libstd/sync/mpsc/mod.rs

+56
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,17 @@ pub struct Iter<'a, T: 'a> {
311311
rx: &'a Receiver<T>
312312
}
313313

314+
/// An iterator that attempts to yield all pending values for a receiver.
315+
/// `None` will be returned when there are no pending values remaining or
316+
/// if the corresponding channel has hung up.
317+
///
318+
/// This Iterator will never block the caller in order to wait for data to
319+
/// become available. Instead, it will return `None`.
320+
#[unstable(feature = "receiver_try_iter", issue = "34931")]
321+
pub struct TryIter<'a, T: 'a> {
322+
rx: &'a Receiver<T>
323+
}
324+
314325
/// An owning iterator over messages on a receiver, this iterator will block
315326
/// whenever `next` is called, waiting for a new message, and `None` will be
316327
/// returned when the corresponding channel has hung up.
@@ -982,6 +993,16 @@ impl<T> Receiver<T> {
982993
pub fn iter(&self) -> Iter<T> {
983994
Iter { rx: self }
984995
}
996+
997+
/// Returns an iterator that will attempt to yield all pending values.
998+
/// It will return `None` if there are no more pending values or if the
999+
/// channel has hung up. The iterator will never `panic!` or block the
1000+
/// user by waiting for values.
1001+
#[unstable(feature = "receiver_try_iter", issue = "34931")]
1002+
pub fn try_iter(&self) -> TryIter<T> {
1003+
TryIter { rx: self }
1004+
}
1005+
9851006
}
9861007

9871008
impl<T> select::Packet for Receiver<T> {
@@ -1077,6 +1098,13 @@ impl<'a, T> Iterator for Iter<'a, T> {
10771098
fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
10781099
}
10791100

1101+
#[unstable(feature = "receiver_try_iter", issue = "34931")]
1102+
impl<'a, T> Iterator for TryIter<'a, T> {
1103+
type Item = T;
1104+
1105+
fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
1106+
}
1107+
10801108
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
10811109
impl<'a, T> IntoIterator for &'a Receiver<T> {
10821110
type Item = T;
@@ -1814,6 +1842,34 @@ mod tests {
18141842
assert_eq!(count_rx.recv().unwrap(), 4);
18151843
}
18161844

1845+
#[test]
1846+
fn test_recv_try_iter() {
1847+
let (request_tx, request_rx) = channel();
1848+
let (response_tx, response_rx) = channel();
1849+
1850+
// Request `x`s until we have `6`.
1851+
let t = thread::spawn(move|| {
1852+
let mut count = 0;
1853+
loop {
1854+
for x in response_rx.try_iter() {
1855+
count += x;
1856+
if count == 6 {
1857+
return count;
1858+
}
1859+
}
1860+
request_tx.send(()).unwrap();
1861+
}
1862+
});
1863+
1864+
for _ in request_rx.iter() {
1865+
if response_tx.send(2).is_err() {
1866+
break;
1867+
}
1868+
}
1869+
1870+
assert_eq!(t.join().unwrap(), 6);
1871+
}
1872+
18171873
#[test]
18181874
fn test_recv_into_iter_owned() {
18191875
let mut iter = {

0 commit comments

Comments
 (0)