Skip to content

Commit ef4e09a

Browse files
committed
join stream works!
Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent 5fe130d commit ef4e09a

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

src/join_stream.rs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use futures_core::Stream;
6+
7+
/// A stream joining two or more streams.
8+
///
9+
/// This stream is returned by `join!`.
10+
#[derive(Debug)]
11+
pub struct JoinStream<'a, L, R, T> {
12+
left: &'a mut L,
13+
right: &'a mut R,
14+
_marker: PhantomData<T>,
15+
}
16+
17+
impl<L, R, T> Unpin for JoinStream<'_, L, R, T> {}
18+
19+
impl<'a, L, R, T> Stream for JoinStream<'a, L, R, T>
20+
where
21+
L: Stream<Item = T> + Unpin,
22+
R: Stream<Item = T> + Unpin,
23+
{
24+
type Item = T;
25+
26+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
27+
if let Poll::Ready(Some(item)) = Pin::new(&mut *self.left).poll_next(cx) {
28+
// The first stream made progress. The JoinStream needs to be polled
29+
// again to check the progress of the second stream.
30+
cx.waker().wake_by_ref();
31+
Poll::Ready(Some(item))
32+
} else {
33+
Pin::new(&mut *self.right).poll_next(cx)
34+
}
35+
}
36+
}

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
#![cfg_attr(test, deny(warnings))]
2222

2323
mod join;
24+
mod join_stream;
2425
mod maybe_done;
2526
mod poll_fn;
2627
mod ready;
2728
mod select;
2829
mod try_join;
2930
mod try_select;
3031

32+
pub use join_stream::JoinStream;
3133
pub use maybe_done::MaybeDone;
3234

3335
/// Helper re-exports for use in macros.

0 commit comments

Comments
 (0)