Skip to content

Commit 23beab4

Browse files
montekkiStjepan Glavina
authored and
Stjepan Glavina
committed
Adds a from_fn stream implementation (#277)
* Adds a from_fn stream implementation * Update src/stream/from_fn.rs Co-Authored-By: Yoshua Wuyts <[email protected]> * Fix review nits * Use async_std Mutex
1 parent e938527 commit 23beab4

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

src/stream/from_fn.rs

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
/// A stream that yields elements by calling a closure.
9+
///
10+
/// This stream is constructed by [`from_fn`] function.
11+
///
12+
/// [`from_fn`]: fn.from_fn.html
13+
#[derive(Debug)]
14+
pub struct FromFn<F, Fut, T> {
15+
f: F,
16+
future: Option<Fut>,
17+
__t: PhantomData<T>,
18+
}
19+
20+
/// Creates a new stream where to produce each new element a provided closure is called.
21+
///
22+
/// This allows creating a custom stream with any behaviour without using the more verbose
23+
/// syntax of creating a dedicated type and implementing a `Stream` trait for it.
24+
///
25+
/// # Examples
26+
///
27+
/// ```
28+
/// # fn main() { async_std::task::block_on(async {
29+
/// #
30+
/// use async_std::prelude::*;
31+
/// use async_std::sync::Mutex;
32+
/// use std::sync::Arc;
33+
/// use async_std::stream;
34+
///
35+
/// let count = Arc::new(Mutex::new(0u8));
36+
/// let s = stream::from_fn(|| {
37+
/// let count = Arc::clone(&count);
38+
///
39+
/// async move {
40+
/// *count.lock().await += 1;
41+
///
42+
/// if *count.lock().await > 3 {
43+
/// None
44+
/// } else {
45+
/// Some(*count.lock().await)
46+
/// }
47+
/// }
48+
/// });
49+
///
50+
/// pin_utils::pin_mut!(s);
51+
/// assert_eq!(s.next().await, Some(1));
52+
/// assert_eq!(s.next().await, Some(2));
53+
/// assert_eq!(s.next().await, Some(3));
54+
/// assert_eq!(s.next().await, None);
55+
/// #
56+
/// # }) }
57+
///
58+
/// ```
59+
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
60+
where
61+
F: FnMut() -> Fut,
62+
Fut: Future<Output = Option<T>>,
63+
{
64+
FromFn {
65+
f,
66+
future: None,
67+
__t: PhantomData,
68+
}
69+
}
70+
71+
impl<F, Fut, T> FromFn<F, Fut, T> {
72+
pin_utils::unsafe_unpinned!(f: F);
73+
pin_utils::unsafe_pinned!(future: Option<Fut>);
74+
}
75+
76+
impl<F, Fut, T> Stream for FromFn<F, Fut, T>
77+
where
78+
F: FnMut() -> Fut,
79+
Fut: Future<Output = Option<T>>,
80+
{
81+
type Item = T;
82+
83+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84+
loop {
85+
match &self.future {
86+
Some(_) => {
87+
let next =
88+
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
89+
self.as_mut().future().set(None);
90+
91+
return Poll::Ready(next);
92+
}
93+
None => {
94+
let fut = (self.as_mut().f())();
95+
self.as_mut().future().set(Some(fut));
96+
}
97+
}
98+
}
99+
}
100+
}

src/stream/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use cfg_if::cfg_if;
2525

2626
pub use empty::{empty, Empty};
27+
pub use from_fn::{from_fn, FromFn};
2728
pub use once::{once, Once};
2829
pub use repeat::{repeat, Repeat};
2930
pub use stream::{
@@ -33,6 +34,7 @@ pub use stream::{
3334
pub(crate) mod stream;
3435

3536
mod empty;
37+
mod from_fn;
3638
mod once;
3739
mod repeat;
3840

0 commit comments

Comments
 (0)