Skip to content

Commit 5bd6acd

Browse files
authored
Merge pull request #263 from montekki/fs-stream-map
Adds stream map combinator
2 parents 6bae6b1 + 658a16b commit 5bd6acd

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

src/stream/stream/map.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
#[doc(hidden)]
8+
#[allow(missing_debug_implementations)]
9+
pub struct Map<S, F, T, B> {
10+
stream: S,
11+
f: F,
12+
__from: PhantomData<T>,
13+
__to: PhantomData<B>,
14+
}
15+
16+
impl<S, F, T, B> Map<S, F, T, B> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_unpinned!(f: F);
19+
20+
pub(crate) fn new(stream: S, f: F) -> Self {
21+
Map {
22+
stream,
23+
f,
24+
__from: PhantomData,
25+
__to: PhantomData,
26+
}
27+
}
28+
}
29+
30+
impl<S, F, B> Stream for Map<S, F, S::Item, B>
31+
where
32+
S: Stream,
33+
F: FnMut(S::Item) -> B,
34+
{
35+
type Item = B;
36+
37+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
39+
Poll::Ready(next.map(self.as_mut().f()))
40+
}
41+
}

src/stream/stream/mod.rs

+33
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod fold;
3333
mod for_each;
3434
mod fuse;
3535
mod inspect;
36+
mod map;
3637
mod min_by;
3738
mod next;
3839
mod nth;
@@ -61,6 +62,7 @@ pub use chain::Chain;
6162
pub use filter::Filter;
6263
pub use fuse::Fuse;
6364
pub use inspect::Inspect;
65+
pub use map::Map;
6466
pub use scan::Scan;
6567
pub use skip::Skip;
6668
pub use skip_while::SkipWhile;
@@ -338,6 +340,37 @@ extension_trait! {
338340
Enumerate::new(self)
339341
}
340342

343+
#[doc = r#"
344+
Takes a closure and creates a stream that calls that closure on every element of this stream.
345+
346+
# Examples
347+
348+
```
349+
# fn main() { async_std::task::block_on(async {
350+
#
351+
use async_std::prelude::*;
352+
use std::collections::VecDeque;
353+
354+
let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect();
355+
let mut s = s.map(|x| 2 * x);
356+
357+
assert_eq!(s.next().await, Some(2));
358+
assert_eq!(s.next().await, Some(4));
359+
assert_eq!(s.next().await, Some(6));
360+
assert_eq!(s.next().await, None);
361+
362+
#
363+
# }) }
364+
```
365+
"#]
366+
fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B>
367+
where
368+
Self: Sized,
369+
F: FnMut(Self::Item) -> B,
370+
{
371+
Map::new(self, f)
372+
}
373+
341374
#[doc = r#"
342375
A combinator that does something with each element in the stream, passing the value
343376
on.

0 commit comments

Comments
 (0)