From c644a7f248079fc62095d9670604e8326d1e1586 Mon Sep 17 00:00:00 2001 From: arheard Date: Mon, 30 Sep 2019 22:19:44 -0400 Subject: [PATCH 1/5] Adds partial_cmp.rs file and partial_cmp signature to mod.rs --- src/stream/stream/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 0e563f6d3..bae76be9d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1186,6 +1186,7 @@ extension_trait! { } #[doc = r#" +<<<<<<< HEAD Combines multiple streams into a single stream of all their outputs. Items are yielded as soon as they're received, and the stream continues yield until both @@ -1231,6 +1232,7 @@ extension_trait! { # use async_std::prelude::*; use std::collections::VecDeque; + use std::cmp::Ordering; let s1 = VecDeque::from(vec![1]); let s2 = VecDeque::from(vec![1, 2]); From 49532972b7c7ee736fd416c7b5d1cc8851451bb8 Mon Sep 17 00:00:00 2001 From: arheard Date: Wed, 2 Oct 2019 17:54:21 -0400 Subject: [PATCH 2/5] adds tests that compare streams of same length --- src/stream/stream/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index bae76be9d..ba2bc73e1 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1186,7 +1186,6 @@ extension_trait! { } #[doc = r#" -<<<<<<< HEAD Combines multiple streams into a single stream of all their outputs. Items are yielded as soon as they're received, and the stream continues yield until both From 338d16751a0ab66d0adba52cb0478580a65020a5 Mon Sep 17 00:00:00 2001 From: arheard Date: Sun, 6 Oct 2019 20:20:10 -0400 Subject: [PATCH 3/5] Adds Stream::ge --- src/stream/stream/ge.rs | 47 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 41 +++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 src/stream/stream/ge.rs diff --git a/src/stream/stream/ge.rs b/src/stream/stream/ge.rs new file mode 100644 index 000000000..eb9786b5f --- /dev/null +++ b/src/stream/stream/ge.rs @@ -0,0 +1,47 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::partial_cmp::PartialCmpFuture; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Determines if the elements of this `Stream` are lexicographically +// greater than or equal to those of another. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct GeFuture { + partial_cmp: PartialCmpFuture, +} + +impl GeFuture +where + L::Item: PartialOrd, +{ + pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture); + + pub(super) fn new(l: L, r: R) -> Self { + GeFuture { + partial_cmp: l.partial_cmp(r), + } + } +} + +impl Future for GeFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialOrd, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx)); + + match result { + Some(Ordering::Greater) | Some(Ordering::Equal) => Poll::Ready(true), + _ => Poll::Ready(false), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ba2bc73e1..7d53e2fee 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -32,6 +32,7 @@ mod find_map; mod fold; mod for_each; mod fuse; +mod ge; mod inspect; mod map; mod min_by; @@ -55,6 +56,7 @@ use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; use for_each::ForEachFuture; +use ge::GeFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -1257,6 +1259,45 @@ extension_trait! { { PartialCmpFuture::new(self, other) } + + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + greater than or equal to those of another. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let single: VecDeque = vec![1].into_iter().collect(); + let single_gt: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_gt: VecDeque = vec![1,5].into_iter().collect(); + + assert_eq!(single.clone().ge(single.clone()).await, true); + assert_eq!(single_gt.clone().ge(single.clone()).await, true); + + assert_eq!(multi.clone().ge(single_gt.clone()).await, false); + assert_eq!(multi_gt.clone().ge(multi.clone()).await, true); + + + # + # }) } + ``` + "#] + fn ge( + self, + other: S + ) -> impl Future + '_ [GeFuture] + where + Self: Sized + Stream, + S: Stream, + Self::Item: PartialOrd, + { + GeFuture::new(self, other) + } } impl Stream for Box { From 1a7aa4c9eda433f8966663c3c8128e74f6fa1aac Mon Sep 17 00:00:00 2001 From: assemblaj Date: Tue, 15 Oct 2019 10:24:47 -0400 Subject: [PATCH 4/5] cargo fmt --- src/stream/stream/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 7d53e2fee..b04f7cf21 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1290,14 +1290,14 @@ extension_trait! { fn ge( self, other: S - ) -> impl Future + '_ [GeFuture] + ) -> impl Future + '_ [GeFuture] where Self: Sized + Stream, - S: Stream, + S: Stream, Self::Item: PartialOrd, { GeFuture::new(self, other) - } + } } impl Stream for Box { From 804fbe6a6e0278bfaf24e8f8565a3ea65612564b Mon Sep 17 00:00:00 2001 From: assemblaj Date: Tue, 15 Oct 2019 10:39:47 -0400 Subject: [PATCH 5/5] fixes rustdoc error --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b04f7cf21..a4c6f9e72 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1290,11 +1290,11 @@ extension_trait! { fn ge( self, other: S - ) -> impl Future + '_ [GeFuture] + ) -> impl Future [GeFuture] where Self: Sized + Stream, S: Stream, - Self::Item: PartialOrd, + ::Item: PartialOrd, { GeFuture::new(self, other) }