Skip to content

Commit b0bff71

Browse files
committed
Add stream::Peekable::{next_if, next_if_eq}
1 parent dddfc35 commit b0bff71

File tree

4 files changed

+246
-10
lines changed

4 files changed

+246
-10
lines changed

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1919
mod stream;
2020
pub use self::stream::{
2121
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
22-
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
22+
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
2323
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
2424
};
2525

futures-util/src/stream/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub use self::select_next_some::SelectNextSome;
123123

124124
mod peek;
125125
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126-
pub use self::peek::{Peek, Peekable};
126+
pub use self::peek::{Peek, Peekable, NextIf, NextIfEq};
127127

128128
mod skip;
129129
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411

futures-util/src/stream/stream/peek.rs

Lines changed: 218 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use crate::fns::FnOnce1;
12
use crate::stream::{Fuse, StreamExt};
23
use core::fmt;
4+
use core::marker::PhantomData;
35
use core::pin::Pin;
46
use futures_core::future::{FusedFuture, Future};
57
use futures_core::ready;
@@ -44,10 +46,7 @@ impl<St: Stream> Peekable<St> {
4446
///
4547
/// This method polls the underlying stream and return either a reference
4648
/// to the next item if the stream is ready or passes through any errors.
47-
pub fn poll_peek(
48-
self: Pin<&mut Self>,
49-
cx: &mut Context<'_>,
50-
) -> Poll<Option<&St::Item>> {
49+
pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
5150
let mut this = self.project();
5251

5352
Poll::Ready(loop {
@@ -60,6 +59,94 @@ impl<St: Stream> Peekable<St> {
6059
}
6160
})
6261
}
62+
63+
/// Consume and return the next value of this stream if a condition is true.
64+
///
65+
/// If `func` returns `true` for the next value of this stream, consume and return it.
66+
/// Otherwise, return `None`.
67+
///
68+
/// # Examples
69+
///
70+
/// Consume a number if it's equal to 0.
71+
///
72+
/// ```
73+
/// # futures::executor::block_on(async {
74+
/// use futures::stream::{self, StreamExt};
75+
/// use futures::pin_mut;
76+
///
77+
/// let stream = stream::iter(0..5).peekable();
78+
/// pin_mut!(stream);
79+
/// // The first item of the stream is 0; consume it.
80+
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0));
81+
/// // The next item returned is now 1, so `consume` will return `false`.
82+
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None);
83+
/// // `next_if` saves the value of the next item if it was not equal to `expected`.
84+
/// assert_eq!(stream.next().await, Some(1));
85+
/// # });
86+
/// ```
87+
///
88+
/// Consume any number less than 10.
89+
///
90+
/// ```
91+
/// # futures::executor::block_on(async {
92+
/// use futures::stream::{self, StreamExt};
93+
/// use futures::pin_mut;
94+
///
95+
/// let stream = stream::iter(1..20).peekable();
96+
/// pin_mut!(stream);
97+
/// // Consume all numbers less than 10
98+
/// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {}
99+
/// // The next value returned will be 10
100+
/// assert_eq!(stream.next().await, Some(10));
101+
/// # });
102+
/// ```
103+
pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F>
104+
where
105+
F: FnOnce(&St::Item) -> bool,
106+
{
107+
NextIf {
108+
inner: Some((self, func)),
109+
}
110+
}
111+
112+
/// Consume and return the next item if it is equal to `expected`.
113+
///
114+
/// # Example
115+
///
116+
/// Consume a number if it's equal to 0.
117+
///
118+
/// ```
119+
/// # futures::executor::block_on(async {
120+
/// use futures::stream::{self, StreamExt};
121+
/// use futures::pin_mut;
122+
///
123+
/// let stream = stream::iter(0..5).peekable();
124+
/// pin_mut!(stream);
125+
/// // The first item of the stream is 0; consume it.
126+
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0));
127+
/// // The next item returned is now 1, so `consume` will return `false`.
128+
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, None);
129+
/// // `next_if_eq` saves the value of the next item if it was not equal to `expected`.
130+
/// assert_eq!(stream.next().await, Some(1));
131+
/// # });
132+
/// ```
133+
pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T>
134+
where
135+
T: ?Sized,
136+
St::Item: PartialEq<T>,
137+
{
138+
NextIfEq {
139+
inner: NextIf {
140+
inner: Some((
141+
self,
142+
NextIfEqFn {
143+
expected,
144+
_next: PhantomData,
145+
},
146+
)),
147+
},
148+
}
149+
}
63150
}
64151

65152
impl<St: Stream> FusedStream for Peekable<St> {
@@ -103,7 +190,7 @@ where
103190
}
104191

105192
pin_project! {
106-
/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
193+
/// Future for the [`Peekable::peek`](self::Peekable::peek) method.
107194
#[must_use = "futures do nothing unless polled"]
108195
pub struct Peek<'a, St: Stream> {
109196
inner: Option<Pin<&'a mut Peekable<St>>>,
@@ -116,9 +203,7 @@ where
116203
St::Item: fmt::Debug,
117204
{
118205
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119-
f.debug_struct("Peek")
120-
.field("inner", &self.inner)
121-
.finish()
206+
f.debug_struct("Peek").field("inner", &self.inner).finish()
122207
}
123208
}
124209

@@ -133,6 +218,7 @@ where
133218
St: Stream,
134219
{
135220
type Output = Option<&'a St::Item>;
221+
136222
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137223
let inner = self.project().inner;
138224
if let Some(peekable) = inner {
@@ -144,3 +230,127 @@ where
144230
}
145231
}
146232
}
233+
234+
pin_project! {
235+
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
236+
#[must_use = "futures do nothing unless polled"]
237+
pub struct NextIf<'a, St: Stream, F> {
238+
inner: Option<(Pin<&'a mut Peekable<St>>, F)>,
239+
}
240+
}
241+
242+
impl<St, F> fmt::Debug for NextIf<'_, St, F>
243+
where
244+
St: Stream + fmt::Debug,
245+
St::Item: fmt::Debug,
246+
{
247+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248+
f.debug_struct("NextIf")
249+
.field("inner", &self.inner.as_ref().map(|(s, _f)| s))
250+
.finish()
251+
}
252+
}
253+
254+
#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
255+
impl<St, F> FusedFuture for NextIf<'_, St, F>
256+
where
257+
St: Stream,
258+
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
259+
{
260+
fn is_terminated(&self) -> bool {
261+
self.inner.is_none()
262+
}
263+
}
264+
265+
#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
266+
impl<St, F> Future for NextIf<'_, St, F>
267+
where
268+
St: Stream,
269+
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
270+
{
271+
type Output = Option<St::Item>;
272+
273+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274+
let inner = self.project().inner;
275+
if let Some((peekable, _)) = inner {
276+
let res = ready!(peekable.as_mut().poll_next(cx));
277+
278+
let (peekable, func) = inner.take().unwrap();
279+
match res {
280+
Some(ref matched) if func.call_once(matched) => Poll::Ready(res),
281+
other => {
282+
let peekable = peekable.project();
283+
// Since we called `self.next()`, we consumed `self.peeked`.
284+
assert!(peekable.peeked.is_none());
285+
*peekable.peeked = other;
286+
Poll::Ready(None)
287+
}
288+
}
289+
} else {
290+
panic!("NextIf polled after completion")
291+
}
292+
}
293+
}
294+
295+
pin_project! {
296+
/// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method.
297+
#[must_use = "futures do nothing unless polled"]
298+
pub struct NextIfEq<'a, St: Stream, T: ?Sized> {
299+
#[pin]
300+
inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>,
301+
}
302+
}
303+
304+
impl<St, T> fmt::Debug for NextIfEq<'_, St, T>
305+
where
306+
St: Stream + fmt::Debug,
307+
St::Item: fmt::Debug,
308+
T: ?Sized,
309+
{
310+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311+
f.debug_struct("NextIfEq")
312+
.field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s))
313+
.finish()
314+
}
315+
}
316+
317+
impl<St, T> FusedFuture for NextIfEq<'_, St, T>
318+
where
319+
St: Stream,
320+
T: ?Sized,
321+
St::Item: PartialEq<T>,
322+
{
323+
fn is_terminated(&self) -> bool {
324+
self.inner.is_terminated()
325+
}
326+
}
327+
328+
impl<St, T> Future for NextIfEq<'_, St, T>
329+
where
330+
St: Stream,
331+
T: ?Sized,
332+
St::Item: PartialEq<T>,
333+
{
334+
type Output = Option<St::Item>;
335+
336+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
337+
self.project().inner.poll(cx)
338+
}
339+
}
340+
341+
struct NextIfEqFn<'a, T: ?Sized, Item> {
342+
expected: &'a T,
343+
_next: PhantomData<Item>,
344+
}
345+
346+
impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item>
347+
where
348+
T: ?Sized,
349+
Item: PartialEq<T>,
350+
{
351+
type Output = bool;
352+
353+
fn call_once(self, next: &Item) -> Self::Output {
354+
next == self.expected
355+
}
356+
}

futures/tests/stream_peekable.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,29 @@ fn peekable() {
1111
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
1212
});
1313
}
14+
15+
#[test]
16+
fn peekable_next_if_eq() {
17+
block_on(async {
18+
// first, try on references
19+
let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable();
20+
pin_mut!(s);
21+
// try before `peek()`
22+
assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None);
23+
assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart"));
24+
// try after peek()
25+
assert_eq!(s.as_mut().peek().await, Some(&"of"));
26+
assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of"));
27+
assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None);
28+
// make sure `next()` still behaves
29+
assert_eq!(s.next().await, Some("Gold"));
30+
31+
// make sure comparison works for owned values
32+
let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable();
33+
pin_mut!(s);
34+
// make sure basic functionality works
35+
assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into()));
36+
assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into()));
37+
assert_eq!(s.as_mut().next_if_eq("").await, None);
38+
});
39+
}

0 commit comments

Comments
 (0)