From 5b720ab1e26cf644f55f953522e34df736be7424 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 09:54:25 +0300 Subject: [PATCH 1/4] adds stream::fold combinator --- src/stream/stream/fold.rs | 61 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 30 +++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 src/stream/stream/fold.rs diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs new file mode 100644 index 000000000..0e3dd6744 --- /dev/null +++ b/src/stream/stream/fold.rs @@ -0,0 +1,61 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct FoldFuture { + stream: S, + f: F, + acc: Option, + __t: PhantomData, +} + +impl FoldFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_unpinned!(acc: Option); + + pub(super) fn new(stream: S, init: B, f: F) -> Self { + FoldFuture { + stream, + f, + acc: Some(init), + __t: PhantomData, + } + } +} + +impl Future for FoldFuture +where + S: futures_core::stream::Stream + Unpin + Sized, + F: FnMut(B, S::Item) -> B, +{ + type Output = B; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => { + cx.waker().wake_by_ref(); + let old = self + .as_mut() + .acc() + .take() + .expect("FoldFuture should never contain None"); + let new = (self.as_mut().f())(old, v); + *self.as_mut().acc() = Some(new); + Poll::Pending + } + None => Poll::Ready( + self.as_mut() + .acc() + .take() + .expect("FoldFuture should never contain None"), + ), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eddafe286..78253969e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -25,6 +25,7 @@ mod all; mod any; mod filter_map; mod find_map; +mod fold; mod min_by; mod next; mod nth; @@ -36,6 +37,7 @@ use all::AllFuture; use any::AnyFuture; use filter_map::FilterMap; use find_map::FindMapFuture; +use fold::FoldFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -344,6 +346,34 @@ pub trait Stream { FindMapFuture::new(self, f) } + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # + /// # }) } + /// ``` + fn fold(self, init: B, f: F) -> FoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + /// Tests if any element of the stream matches a predicate. /// /// `any()` takes a closure that returns `true` or `false`. It applies From 6c3f8af62d2b6899aa16867164a48b5c4184f27c Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 22:07:20 +0300 Subject: [PATCH 2/4] fixes after #145 --- src/stream/stream/fold.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 0e3dd6744..91a1c8c87 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; use std::pin::Pin; use crate::future::Future; +use crate::stream::Stream; use crate::task::{Context, Poll}; #[doc(hidden)] @@ -30,7 +31,7 @@ impl FoldFuture { impl Future for FoldFuture where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(B, S::Item) -> B, { type Output = B; From 0080a0da8ceda7b7b00af0a512cffa3d9dee8b09 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Thu, 12 Sep 2019 18:15:20 +0300 Subject: [PATCH 3/4] change expect to unwrap --- src/stream/stream/fold.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 91a1c8c87..05d493711 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -42,21 +42,12 @@ where match next { Some(v) => { cx.waker().wake_by_ref(); - let old = self - .as_mut() - .acc() - .take() - .expect("FoldFuture should never contain None"); + let old = self.as_mut().acc().take().unwrap(); let new = (self.as_mut().f())(old, v); *self.as_mut().acc() = Some(new); Poll::Pending } - None => Poll::Ready( - self.as_mut() - .acc() - .take() - .expect("FoldFuture should never contain None"), - ), + None => Poll::Ready(self.as_mut().acc().take().unwrap()), } } } From efe351659fb7bb439a12e5c9017836bcddbc1cc3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 17 Sep 2019 12:25:02 +0300 Subject: [PATCH 4/4] Fixes review issues --- src/stream/stream/fold.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 05d493711..18ddcd815 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -31,23 +31,23 @@ impl FoldFuture { impl Future for FoldFuture where - S: Stream + Unpin + Sized, + S: Stream + Sized, F: FnMut(B, S::Item) -> B, { type Output = B; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => { - cx.waker().wake_by_ref(); - let old = self.as_mut().acc().take().unwrap(); - let new = (self.as_mut().f())(old, v); - *self.as_mut().acc() = Some(new); - Poll::Pending + match next { + Some(v) => { + let old = self.as_mut().acc().take().unwrap(); + let new = (self.as_mut().f())(old, v); + *self.as_mut().acc() = Some(new); + } + None => return Poll::Ready(self.as_mut().acc().take().unwrap()), } - None => Poll::Ready(self.as_mut().acc().take().unwrap()), } } }