Skip to content

Commit 4e5828e

Browse files
committed
add stream::max_by method
1 parent ec23632 commit 4e5828e

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

src/stream/stream/max_by.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::cmp::Ordering;
2+
use std::pin::Pin;
3+
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct MaxByFuture<S, F, T> {
11+
stream: S,
12+
compare: F,
13+
max: Option<T>,
14+
}
15+
16+
impl<S, F, T> MaxByFuture<S, F, T> {
17+
pin_utils::unsafe_pinned!(stream: S);
18+
pin_utils::unsafe_unpinned!(compare: F);
19+
pin_utils::unsafe_unpinned!(max: Option<T>);
20+
21+
pub(super) fn new(stream: S, compare: F) -> Self {
22+
MaxByFuture {
23+
stream,
24+
compare,
25+
max: None,
26+
}
27+
}
28+
}
29+
30+
impl<S, F> Future for MaxByFuture<S, F, S::Item>
31+
where
32+
S: Stream + Unpin + Sized,
33+
S::Item: Copy,
34+
F: FnMut(&S::Item, &S::Item) -> Ordering,
35+
{
36+
type Output = Option<S::Item>;
37+
38+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
39+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
40+
41+
match next {
42+
Some(new) => {
43+
cx.waker().wake_by_ref();
44+
match self.as_mut().max().take() {
45+
None => *self.as_mut().max() = Some(new),
46+
Some(old) => match (&mut self.as_mut().compare())(&new, &old) {
47+
Ordering::Greater => *self.as_mut().max() = Some(new),
48+
_ => *self.as_mut().max() = Some(old),
49+
},
50+
}
51+
Poll::Pending
52+
}
53+
None => Poll::Ready(self.max),
54+
}
55+
}
56+
}

src/stream/stream/mod.rs

+41
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod last;
4040
mod le;
4141
mod lt;
4242
mod map;
43+
mod max_by;
4344
mod min_by;
4445
mod next;
4546
mod nth;
@@ -68,6 +69,7 @@ use gt::GtFuture;
6869
use last::LastFuture;
6970
use le::LeFuture;
7071
use lt::LtFuture;
72+
use max_by::MaxByFuture;
7173
use min_by::MinByFuture;
7274
use next::NextFuture;
7375
use nth::NthFuture;
@@ -639,6 +641,45 @@ extension_trait! {
639641
MinByFuture::new(self, compare)
640642
}
641643

644+
#[doc = r#"
645+
Returns the element that gives the minimum value with respect to the
646+
specified comparison function. If several elements are equally minimum,
647+
the first element is returned. If the stream is empty, `None` is returned.
648+
649+
# Examples
650+
651+
```
652+
# fn main() { async_std::task::block_on(async {
653+
#
654+
use std::collections::VecDeque;
655+
656+
use async_std::prelude::*;
657+
658+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
659+
660+
let max = s.clone().max_by(|x, y| x.cmp(y)).await;
661+
assert_eq!(max, Some(3));
662+
663+
let max = s.max_by(|x, y| y.cmp(x)).await;
664+
assert_eq!(max, Some(1));
665+
666+
let max = VecDeque::<usize>::new().max_by(|x, y| x.cmp(y)).await;
667+
assert_eq!(max, None);
668+
#
669+
# }) }
670+
```
671+
"#]
672+
fn max_by<F>(
673+
self,
674+
compare: F,
675+
) -> impl Future<Output = Option<Self::Item>> [MaxByFuture<Self, F, Self::Item>]
676+
where
677+
Self: Sized,
678+
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
679+
{
680+
MaxByFuture::new(self, compare)
681+
}
682+
642683
#[doc = r#"
643684
Returns the nth element of the stream.
644685

0 commit comments

Comments
 (0)