Skip to content

Commit 658a16b

Browse files
committed
Adds stream map combinator
1 parent 33d2191 commit 658a16b

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

src/stream/stream/map.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::stream::Stream;
5+
use crate::task::{Context, Poll};
6+
7+
#[doc(hidden)]
8+
#[allow(missing_debug_implementations)]
9+
pub struct Map<S, F, T, B> {
10+
stream: S,
11+
f: F,
12+
__from: PhantomData<T>,
13+
__to: PhantomData<B>,
14+
}
15+
16+
impl<S, F, T, B> Map<S, F, T, B> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_unpinned!(f: F);
19+
20+
pub(crate) fn new(stream: S, f: F) -> Self {
21+
Map {
22+
stream,
23+
f,
24+
__from: PhantomData,
25+
__to: PhantomData,
26+
}
27+
}
28+
}
29+
30+
impl<S, F, B> Stream for Map<S, F, S::Item, B>
31+
where
32+
S: Stream,
33+
F: FnMut(S::Item) -> B,
34+
{
35+
type Item = B;
36+
37+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
39+
Poll::Ready(next.map(self.as_mut().f()))
40+
}
41+
}

src/stream/stream/mod.rs

+33
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod find_map;
3232
mod fold;
3333
mod fuse;
3434
mod inspect;
35+
mod map;
3536
mod min_by;
3637
mod next;
3738
mod nth;
@@ -57,6 +58,7 @@ pub use chain::Chain;
5758
pub use filter::Filter;
5859
pub use fuse::Fuse;
5960
pub use inspect::Inspect;
61+
pub use map::Map;
6062
pub use scan::Scan;
6163
pub use skip::Skip;
6264
pub use skip_while::SkipWhile;
@@ -334,6 +336,37 @@ extension_trait! {
334336
Enumerate::new(self)
335337
}
336338

339+
#[doc = r#"
340+
Takes a closure and creates a stream that calls that closure on every element of this stream.
341+
342+
# Examples
343+
344+
```
345+
# fn main() { async_std::task::block_on(async {
346+
#
347+
use async_std::prelude::*;
348+
use std::collections::VecDeque;
349+
350+
let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect();
351+
let mut s = s.map(|x| 2 * x);
352+
353+
assert_eq!(s.next().await, Some(2));
354+
assert_eq!(s.next().await, Some(4));
355+
assert_eq!(s.next().await, Some(6));
356+
assert_eq!(s.next().await, None);
357+
358+
#
359+
# }) }
360+
```
361+
"#]
362+
fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B>
363+
where
364+
Self: Sized,
365+
F: FnMut(Self::Item) -> B,
366+
{
367+
Map::new(self, f)
368+
}
369+
337370
#[doc = r#"
338371
A combinator that does something with each element in the stream, passing the value
339372
on.

0 commit comments

Comments
 (0)