Skip to content

Commit 02fad0d

Browse files
committed
join_stream done
Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent d2fde7a commit 02fad0d

File tree

3 files changed

+58
-8
lines changed

3 files changed

+58
-8
lines changed

examples/main.rs

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ fn main() {
1010
let c = future::ready(Ok(1u8));
1111

1212
assert_eq!(try_select!(a, b, c).await?, 1u8);
13+
14+
use async_macros::JoinStream;
15+
use futures::stream::{self, StreamExt};
16+
use futures::future::ready;
17+
1318
Ok(())
1419
}
1520
main().await.unwrap();

src/join_stream.rs

+53-7
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,75 @@ use futures_core::Stream;
88
///
99
/// This stream is returned by `join!`.
1010
#[derive(Debug)]
11-
pub struct JoinStream<'a, L, R, T> {
12-
left: &'a mut L,
13-
right: &'a mut R,
11+
pub struct JoinStream<L, R, T> {
12+
left: L,
13+
right: R,
1414
_marker: PhantomData<T>,
1515
}
1616

17-
impl<L, R, T> Unpin for JoinStream<'_, L, R, T> {}
17+
impl<L, R, T> Unpin for JoinStream<L, R, T> {}
1818

19-
impl<'a, L, R, T> Stream for JoinStream<'a, L, R, T>
19+
impl<L, R, T> JoinStream<L, R, T> {
20+
#[doc(hidden)]
21+
pub fn new(left: L, right: R) -> Self {
22+
Self {
23+
left,
24+
right,
25+
_marker: PhantomData,
26+
}
27+
}
28+
}
29+
30+
impl<L, R, T> Stream for JoinStream<L, R, T>
2031
where
2132
L: Stream<Item = T> + Unpin,
2233
R: Stream<Item = T> + Unpin,
2334
{
2435
type Item = T;
2536

2637
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
27-
if let Poll::Ready(Some(item)) = Pin::new(&mut *self.left).poll_next(cx) {
38+
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
2839
// The first stream made progress. The JoinStream needs to be polled
2940
// again to check the progress of the second stream.
3041
cx.waker().wake_by_ref();
3142
Poll::Ready(Some(item))
3243
} else {
33-
Pin::new(&mut *self.right).poll_next(cx)
44+
Pin::new(&mut self.right).poll_next(cx)
3445
}
3546
}
3647
}
48+
49+
/// Combines multiple streams into a single stream of all their outputs.
50+
///
51+
/// This macro is only usable inside of async functions, closures, and blocks.
52+
///
53+
/// # Examples
54+
///
55+
/// ```
56+
/// # futures::executor::block_on(async {
57+
/// use async_macros::join_stream as join;
58+
/// use futures::stream::{self, StreamExt};
59+
/// use futures::future::ready;
60+
///
61+
/// let a = &mut stream::once(ready(1u8));
62+
/// let b = &mut stream::once(ready(2u8));
63+
/// let c = &mut stream::once(ready(3u8));
64+
///
65+
/// let mut s = join!(a, b, c);
66+
///
67+
/// assert_eq!(s.next().await, Some(1u8));
68+
/// assert_eq!(s.next().await, Some(2u8));
69+
/// assert_eq!(s.next().await, Some(3u8));
70+
/// assert_eq!(s.next().await, None);
71+
/// # });
72+
/// ```
73+
#[macro_export]
74+
macro_rules! join_stream {
75+
($stream1:ident, $stream2:ident, $($stream:ident),* $(,)?) => {{
76+
let joined = $crate::JoinStream::new($stream1, $stream2);
77+
$(
78+
let joined = $crate::JoinStream::new(joined, $stream);
79+
)*
80+
joined
81+
}};
82+
}

src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
//! # Examples
44
//!
55
//! ```
6-
//! #![feature(async_await)]
76
//! # futures::executor::block_on(async {
87
//! use async_macros::join;
98
//! use futures::future;

0 commit comments

Comments
 (0)