From d474c4ec56c1b229effde7319cfa167ba8ac6700 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 13 Dec 2019 15:43:54 +0100 Subject: [PATCH 1/6] Fix broken doc tests --- src/stream/double_ended_stream/mod.rs | 29 ++++++++++++++++----------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs index dc2a45c9f..f9b2b1d67 100644 --- a/src/stream/double_ended_stream/mod.rs +++ b/src/stream/double_ended_stream/mod.rs @@ -105,9 +105,10 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::stream::from_iter; - let mut s = double_ended_stream::from_iter(vec![7u8]); + let mut s = from_iter(vec![7u8]); assert_eq!(s.next_back().await, Some(7)); assert_eq!(s.next_back().await, None); @@ -132,9 +133,10 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::stream::from_iter; - let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let mut s = from_iter(vec![1u8, 2, 3, 4, 5]); let second = s.nth_back(1).await; assert_eq!(second, Some(4)); @@ -159,9 +161,10 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::stream::from_iter; - let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let mut s = from_iter(vec![1u8, 2, 3, 4, 5]); let second = s.rfind(|v| v % 2 == 0).await; assert_eq!(second, Some(4)); @@ -185,11 +188,12 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::stream::from_iter; - let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let s = from_iter(vec![1u8, 2, 3, 4, 5]); - let second = s.rfold(0, |acc, v| v + acc).await; + let second = s.rfold(0u8, |acc, v| v + acc).await; assert_eq!(second, 15); # @@ -215,10 +219,11 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::stream::from_iter; - let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); - let sum = s.try_rfold(0, |acc, v| { + let s = from_iter(vec![1u8, 2, 3, 4, 5]); + let sum = s.try_rfold(0u8, |acc, v| { if (acc+v) % 2 == 1 { Ok(v+3) } else { From 9c67118d43a8073b1d0e44e77f4c6f3e784331e2 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 13 Dec 2019 15:53:48 +0100 Subject: [PATCH 2/6] Don't expose the double_ended_stream sub-module --- src/stream/double_ended_stream/mod.rs | 10 +++++----- src/stream/mod.rs | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs index f9b2b1d67..5492db163 100644 --- a/src/stream/double_ended_stream/mod.rs +++ b/src/stream/double_ended_stream/mod.rs @@ -105,7 +105,7 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::prelude::*; use async_std::stream::from_iter; let mut s = from_iter(vec![7u8]); @@ -133,7 +133,7 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::prelude::*; use async_std::stream::from_iter; let mut s = from_iter(vec![1u8, 2, 3, 4, 5]); @@ -161,7 +161,7 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::prelude::*; use async_std::stream::from_iter; let mut s = from_iter(vec![1u8, 2, 3, 4, 5]); @@ -188,7 +188,7 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::prelude::*; use async_std::stream::from_iter; let s = from_iter(vec![1u8, 2, 3, 4, 5]); @@ -219,7 +219,7 @@ pub trait DoubleEndedStream: Stream { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::double_ended_stream::DoubleEndedStream; + use async_std::prelude::*; use async_std::stream::from_iter; let s = from_iter(vec![1u8, 2, 3, 4, 5]); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index ebce3a36b..bbafe6f1f 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -318,8 +318,7 @@ mod repeat; mod repeat_with; cfg_unstable! { - #[doc(hidden)] - pub mod double_ended_stream; + mod double_ended_stream; mod exact_size_stream; mod extend; mod from_stream; From a900cf3ecf5780f063b49d4da87ea0f28e2be510 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 13 Dec 2019 16:35:33 +0100 Subject: [PATCH 3/6] Temporarily disable tests --- src/stream/double_ended_stream/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs index 5492db163..8440595ec 100644 --- a/src/stream/double_ended_stream/mod.rs +++ b/src/stream/double_ended_stream/mod.rs @@ -36,7 +36,7 @@ pub trait DoubleEndedStream: Stream { # Examples - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use std::pin::Pin; @@ -102,7 +102,7 @@ pub trait DoubleEndedStream: Stream { # Examples - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -130,7 +130,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -158,7 +158,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -185,7 +185,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -216,7 +216,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; From 3d98d49fe1c4f9286dcb892f065ae4d67b27dc53 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 13 Dec 2019 16:43:41 +0100 Subject: [PATCH 4/6] Revert "Temporarily disable tests" This reverts commit a900cf3ecf5780f063b49d4da87ea0f28e2be510. --- src/stream/double_ended_stream/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs index 8440595ec..5492db163 100644 --- a/src/stream/double_ended_stream/mod.rs +++ b/src/stream/double_ended_stream/mod.rs @@ -36,7 +36,7 @@ pub trait DoubleEndedStream: Stream { # Examples - ```ignore + ``` # fn main() { async_std::task::block_on(async { # use std::pin::Pin; @@ -102,7 +102,7 @@ pub trait DoubleEndedStream: Stream { # Examples - ```ignore + ``` # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -130,7 +130,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ```ignore + ``` # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -158,7 +158,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ```ignore + ``` # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -185,7 +185,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ```ignore + ``` # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; @@ -216,7 +216,7 @@ pub trait DoubleEndedStream: Stream { Basic usage: - ```ignore + ``` # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; From f918874896d838d63e66f32bd530bc8c03c7bacb Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 13 Dec 2019 16:46:47 +0100 Subject: [PATCH 5/6] Try bumping the delay to 300 to see if it would eventually pass --- src/stream/stream/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index f6822d241..0124d6d86 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -587,13 +587,16 @@ extension_trait! { assert_eq!(s.next().await, Some(1)); // There will be no delay after the first time. - assert!(start.elapsed().as_millis() <= 210); + println!("Elapsed time 1: {}", start.elapsed().as_millis()); + assert!(start.elapsed().as_millis() <= 300); assert_eq!(s.next().await, Some(2)); - assert!(start.elapsed().as_millis() <= 210); + println!("Elapsed time 2: {}", start.elapsed().as_millis()); + assert!(start.elapsed().as_millis() <= 300); assert_eq!(s.next().await, None); - assert!(start.elapsed().as_millis() <= 210); + println!("Elapsed time 3: {}", start.elapsed().as_millis()); + assert!(start.elapsed().as_millis() <= 300); # # }) } ``` From 04de31c7a22032b2e2f497036a4cc79380130acc Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 13 Dec 2019 17:00:16 +0100 Subject: [PATCH 6/6] Revert "Merge pull request #562 from felipesere/double_ended_ext" This reverts commit 3d3bf914eae0fe9f6ea068e5732778bf13555714, reversing changes made to 63b6a2b961746480c82b48cfe7fc4986f5ea0b0b. Conflicts: src/stream/double_ended_stream/mod.rs --- src/stream/double_ended_stream.rs | 24 ++ src/stream/double_ended_stream/mod.rs | 246 -------------------- src/stream/double_ended_stream/next_back.rs | 19 -- src/stream/double_ended_stream/nth_back.rs | 41 ---- src/stream/double_ended_stream/rfind.rs | 41 ---- src/stream/double_ended_stream/rfold.rs | 52 ----- src/stream/double_ended_stream/try_rfold.rs | 56 ----- src/stream/from_iter.rs | 9 - src/stream/mod.rs | 2 +- src/stream/once.rs | 10 - 10 files changed, 25 insertions(+), 475 deletions(-) create mode 100644 src/stream/double_ended_stream.rs delete mode 100644 src/stream/double_ended_stream/mod.rs delete mode 100644 src/stream/double_ended_stream/next_back.rs delete mode 100644 src/stream/double_ended_stream/nth_back.rs delete mode 100644 src/stream/double_ended_stream/rfind.rs delete mode 100644 src/stream/double_ended_stream/rfold.rs delete mode 100644 src/stream/double_ended_stream/try_rfold.rs diff --git a/src/stream/double_ended_stream.rs b/src/stream/double_ended_stream.rs new file mode 100644 index 000000000..129bb1cdf --- /dev/null +++ b/src/stream/double_ended_stream.rs @@ -0,0 +1,24 @@ +use crate::stream::Stream; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A stream able to yield elements from both ends. +/// +/// Something that implements `DoubleEndedStream` has one extra capability +/// over something that implements [`Stream`]: the ability to also take +/// `Item`s from the back, as well as the front. +/// +/// [`Stream`]: trait.Stream.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub trait DoubleEndedStream: Stream { + /// Removes and returns an element from the end of the stream. + /// + /// Returns `None` when there are no more elements. + /// + /// The [trait-level] docs contain more details. + /// + /// [trait-level]: trait.DoubleEndedStream.html + fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +} diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs deleted file mode 100644 index 5492db163..000000000 --- a/src/stream/double_ended_stream/mod.rs +++ /dev/null @@ -1,246 +0,0 @@ -use crate::stream::Stream; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -mod next_back; -mod nth_back; -mod rfind; -mod rfold; -mod try_rfold; - -use next_back::NextBackFuture; -use nth_back::NthBackFuture; -use rfind::RFindFuture; -use rfold::RFoldFuture; -use try_rfold::TryRFoldFuture; - -/// A stream able to yield elements from both ends. -/// -/// Something that implements `DoubleEndedStream` has one extra capability -/// over something that implements [`Stream`]: the ability to also take -/// `Item`s from the back, as well as the front. -/// -/// [`Stream`]: trait.Stream.html -#[cfg(feature = "unstable")] -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub trait DoubleEndedStream: Stream { - #[doc = r#" - Attempts to receive the next item from the back of the stream. - - There are several possible return values: - - * `Poll::Pending` means this stream's next_back value is not ready yet. - * `Poll::Ready(None)` means this stream has been exhausted. - * `Poll::Ready(Some(item))` means `item` was received out of the stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use std::pin::Pin; - - use async_std::prelude::*; - use async_std::stream; - use async_std::task::{Context, Poll}; - - fn increment( - s: impl DoubleEndedStream + Unpin, - ) -> impl DoubleEndedStream + Unpin { - struct Increment(S); - - impl + Unpin> Stream for Increment { - type Item = S::Item; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match Pin::new(&mut self.0).poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), - } - } - } - - impl + Unpin> DoubleEndedStream for Increment { - fn poll_next_back( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match Pin::new(&mut self.0).poll_next_back(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), - } - } - } - - Increment(s) - } - - let mut s = increment(stream::once(7)); - - assert_eq!(s.next_back().await, Some(8)); - assert_eq!(s.next_back().await, None); - # - # }) } - ``` - "#] - fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - #[doc = r#" - Advances the stream and returns the next value. - - Returns [`None`] when iteration is finished. Individual stream implementations may - choose to resume iteration, and so calling `next()` again may or may not eventually - start returning more values. - - [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use async_std::stream::from_iter; - - let mut s = from_iter(vec![7u8]); - - assert_eq!(s.next_back().await, Some(7)); - assert_eq!(s.next_back().await, None); - # - # }) } - ``` - "#] - fn next_back(&mut self) -> NextBackFuture<'_, Self> - where - Self: Unpin, - { - NextBackFuture { stream: self } - } - - #[doc = r#" - Returns the nth element from the back of the stream. - - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use async_std::stream::from_iter; - - let mut s = from_iter(vec![1u8, 2, 3, 4, 5]); - - let second = s.nth_back(1).await; - assert_eq!(second, Some(4)); - # - # }) } - ``` - "#] - fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self> - where - Self: Unpin + Sized, - { - NthBackFuture::new(self, n) - } - - #[doc = r#" - Returns the the frist element from the right that matches the predicate. - - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use async_std::stream::from_iter; - - let mut s = from_iter(vec![1u8, 2, 3, 4, 5]); - - let second = s.rfind(|v| v % 2 == 0).await; - assert_eq!(second, Some(4)); - # - # }) } - ``` - "#] - fn rfind

(&mut self, p: P) -> RFindFuture<'_, Self, P> - where - Self: Unpin + Sized, - P: FnMut(&Self::Item) -> bool, - { - RFindFuture::new(self, p) - } - - #[doc = r#" - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use async_std::stream::from_iter; - - let s = from_iter(vec![1u8, 2, 3, 4, 5]); - - let second = s.rfold(0u8, |acc, v| v + acc).await; - - assert_eq!(second, 15); - # - # }) } - ``` - "#] - fn rfold(self, accum: B, f: F) -> RFoldFuture - where - Self: Sized, - F: FnMut(B, Self::Item) -> B, - { - RFoldFuture::new(self, accum, f) - } - - #[doc = r#" - A combinator that applies a function as long as it returns successfully, producing a single, final value. - Immediately returns the error when the function returns unsuccessfully. - - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use async_std::stream::from_iter; - - let s = from_iter(vec![1u8, 2, 3, 4, 5]); - let sum = s.try_rfold(0u8, |acc, v| { - if (acc+v) % 2 == 1 { - Ok(v+3) - } else { - Err("fail") - } - }).await; - - assert_eq!(sum, Err("fail")); - # - # }) } - ``` - "#] - fn try_rfold(self, accum: B, f: F) -> TryRFoldFuture - where - Self: Sized, - F: FnMut(B, Self::Item) -> Result, - { - TryRFoldFuture::new(self, accum, f) - } -} diff --git a/src/stream/double_ended_stream/next_back.rs b/src/stream/double_ended_stream/next_back.rs deleted file mode 100644 index aa642d094..000000000 --- a/src/stream/double_ended_stream/next_back.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::pin::Pin; -use std::future::Future; - -use crate::stream::DoubleEndedStream; -use crate::task::{Context, Poll}; - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct NextBackFuture<'a, T: Unpin + ?Sized> { - pub(crate) stream: &'a mut T, -} - -impl Future for NextBackFuture<'_, T> { - type Output = Option; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut *self.stream).poll_next_back(cx) - } -} diff --git a/src/stream/double_ended_stream/nth_back.rs b/src/stream/double_ended_stream/nth_back.rs deleted file mode 100644 index e32a28fd6..000000000 --- a/src/stream/double_ended_stream/nth_back.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use crate::stream::DoubleEndedStream; - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct NthBackFuture<'a, S> { - stream: &'a mut S, - n: usize, -} - -impl<'a, S> NthBackFuture<'a, S> { - pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { - NthBackFuture { stream, n } - } -} - -impl<'a, S> Future for NthBackFuture<'a, S> -where - S: DoubleEndedStream + Sized + Unpin, -{ - type Output = Option; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); - match next { - Some(v) => match self.n { - 0 => Poll::Ready(Some(v)), - _ => { - self.n -= 1; - cx.waker().wake_by_ref(); - Poll::Pending - } - }, - None => Poll::Ready(None), - } - } -} - diff --git a/src/stream/double_ended_stream/rfind.rs b/src/stream/double_ended_stream/rfind.rs deleted file mode 100644 index 947269342..000000000 --- a/src/stream/double_ended_stream/rfind.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::task::{Context, Poll}; -use std::future::Future; -use std::pin::Pin; - -use crate::stream::DoubleEndedStream; - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct RFindFuture<'a, S, P> { - stream: &'a mut S, - p: P, -} - -impl<'a, S, P> RFindFuture<'a, S, P> { - pub(super) fn new(stream: &'a mut S, p: P) -> Self { - RFindFuture { stream, p } - } -} - -impl Unpin for RFindFuture<'_, S, P> {} - -impl<'a, S, P> Future for RFindFuture<'a, S, P> -where - S: DoubleEndedStream + Unpin + Sized, - P: FnMut(&S::Item) -> bool, -{ - type Output = Option; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); - - match item { - Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)), - Some(_) => { - cx.waker().wake_by_ref(); - Poll::Pending - } - None => Poll::Ready(None), - } - } -} diff --git a/src/stream/double_ended_stream/rfold.rs b/src/stream/double_ended_stream/rfold.rs deleted file mode 100644 index 9002f8d9e..000000000 --- a/src/stream/double_ended_stream/rfold.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use pin_project_lite::pin_project; - -use crate::stream::DoubleEndedStream; - -pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct RFoldFuture { - #[pin] - stream: S, - f: F, - acc: Option, - } -} - -impl RFoldFuture { - pub(super) fn new(stream: S, init: B, f: F) -> Self { - RFoldFuture { - stream, - f, - acc: Some(init), - } - } -} - -impl Future for RFoldFuture -where - S: DoubleEndedStream + Sized, - F: FnMut(B, S::Item) -> B, -{ - type Output = B; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); - - match next { - Some(v) => { - let old = this.acc.take().unwrap(); - let new = (this.f)(old, v); - *this.acc = Some(new); - } - None => return Poll::Ready(this.acc.take().unwrap()), - } - } - } -} diff --git a/src/stream/double_ended_stream/try_rfold.rs b/src/stream/double_ended_stream/try_rfold.rs deleted file mode 100644 index 9e6295a74..000000000 --- a/src/stream/double_ended_stream/try_rfold.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::future::Future; -use std::pin::Pin; -use crate::task::{Context, Poll}; - -use pin_project_lite::pin_project; - -use crate::stream::DoubleEndedStream; - -pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct TryRFoldFuture { - #[pin] - stream: S, - f: F, - acc: Option, - } -} - -impl TryRFoldFuture { - pub(super) fn new(stream: S, init: T, f: F) -> Self { - TryRFoldFuture { - stream, - f, - acc: Some(init), - } - } -} - -impl Future for TryRFoldFuture -where - S: DoubleEndedStream + Unpin, - F: FnMut(T, S::Item) -> Result, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); - - match next { - Some(v) => { - let old = this.acc.take().unwrap(); - let new = (this.f)(old, v); - - match new { - Ok(o) => *this.acc = Some(o), - Err(e) => return Poll::Ready(Err(e)), - } - } - None => return Poll::Ready(Ok(this.acc.take().unwrap())), - } - } - } -} diff --git a/src/stream/from_iter.rs b/src/stream/from_iter.rs index 705d15048..d7a31d6c4 100644 --- a/src/stream/from_iter.rs +++ b/src/stream/from_iter.rs @@ -3,8 +3,6 @@ use std::pin::Pin; use pin_project_lite::pin_project; use crate::stream::Stream; -#[cfg(feature = "unstable")] -use crate::stream::double_ended_stream::DoubleEndedStream; use crate::task::{Context, Poll}; pin_project! { @@ -53,10 +51,3 @@ impl Stream for FromIter { Poll::Ready(self.iter.next()) } } - -#[cfg(feature = "unstable")] -impl DoubleEndedStream for FromIter { - fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.iter.next_back()) - } -} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index bbafe6f1f..d8b96ec22 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -308,7 +308,7 @@ pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; -pub mod stream; +pub(crate) mod stream; mod empty; mod from_fn; diff --git a/src/stream/once.rs b/src/stream/once.rs index 939722d9e..e4ac682cc 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -5,9 +5,6 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[cfg(feature = "unstable")] -use crate::stream::DoubleEndedStream; - /// Creates a stream that yields a single item. /// /// # Examples @@ -49,10 +46,3 @@ impl Stream for Once { Poll::Ready(self.project().value.take()) } } - -#[cfg(feature = "unstable")] -impl DoubleEndedStream for Once { - fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.project().value.take()) - } -}