Skip to content

Commit 724a9f4

Browse files
author
Stjepan Glavina
committed
Add Stream::poll_next
1 parent 2c02037 commit 724a9f4

File tree

6 files changed

+104
-62
lines changed

6 files changed

+104
-62
lines changed

src/stream/stream/all.rs

+15-22
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,36 @@
1-
use crate::future::Future;
2-
use crate::task::{Context, Poll};
3-
41
use std::marker::PhantomData;
52
use std::pin::Pin;
63

7-
#[derive(Debug)]
8-
pub struct AllFuture<'a, S, F, T>
9-
where
10-
F: FnMut(T) -> bool,
11-
{
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct AllFuture<'a, S, F, T> {
1211
pub(crate) stream: &'a mut S,
1312
pub(crate) f: F,
1413
pub(crate) result: bool,
15-
pub(crate) __item: PhantomData<T>,
14+
pub(crate) _marker: PhantomData<T>,
1615
}
1716

18-
impl<'a, S, F, T> AllFuture<'a, S, F, T>
19-
where
20-
F: FnMut(T) -> bool,
21-
{
22-
pin_utils::unsafe_pinned!(stream: &'a mut S);
23-
pin_utils::unsafe_unpinned!(result: bool);
24-
pin_utils::unsafe_unpinned!(f: F);
25-
}
17+
impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}
2618

2719
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
2820
where
29-
S: futures_core::stream::Stream + Unpin + Sized,
21+
S: Stream + Unpin + Sized,
3022
F: FnMut(S::Item) -> bool,
3123
{
3224
type Output = bool;
3325

3426
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35-
use futures_core::stream::Stream;
36-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
27+
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
28+
3729
match next {
3830
Some(v) => {
39-
let result = (self.as_mut().f())(v);
40-
*self.as_mut().result() = result;
31+
let result = (&mut self.f)(v);
32+
self.result = result;
33+
4134
if result {
4235
// don't forget to wake this task again to pull the next item from stream
4336
cx.waker().wake_by_ref();

src/stream/stream/any.rs

+15-22
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,36 @@
1-
use crate::future::Future;
2-
use crate::task::{Context, Poll};
3-
41
use std::marker::PhantomData;
52
use std::pin::Pin;
63

7-
#[derive(Debug)]
8-
pub struct AnyFuture<'a, S, F, T>
9-
where
10-
F: FnMut(T) -> bool,
11-
{
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct AnyFuture<'a, S, F, T> {
1211
pub(crate) stream: &'a mut S,
1312
pub(crate) f: F,
1413
pub(crate) result: bool,
15-
pub(crate) __item: PhantomData<T>,
14+
pub(crate) _marker: PhantomData<T>,
1615
}
1716

18-
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
19-
where
20-
F: FnMut(T) -> bool,
21-
{
22-
pin_utils::unsafe_pinned!(stream: &'a mut S);
23-
pin_utils::unsafe_unpinned!(result: bool);
24-
pin_utils::unsafe_unpinned!(f: F);
25-
}
17+
impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}
2618

2719
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
2820
where
29-
S: futures_core::stream::Stream + Unpin + Sized,
21+
S: Stream + Unpin + Sized,
3022
F: FnMut(S::Item) -> bool,
3123
{
3224
type Output = bool;
3325

3426
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35-
use futures_core::stream::Stream;
36-
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
27+
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
28+
3729
match next {
3830
Some(v) => {
39-
let result = (self.as_mut().f())(v);
40-
*self.as_mut().result() = result;
31+
let result = (&mut self.f)(v);
32+
self.result = result;
33+
4134
if result {
4235
Poll::Ready(true)
4336
} else {

src/stream/stream/min_by.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use crate::stream::Stream;
66
use crate::task::{Context, Poll};
77

88
/// A future that yields the minimum item in a stream by a given comparison function.
9-
#[derive(Clone, Debug)]
9+
#[doc(hidden)]
10+
#[allow(missing_debug_implementations)]
1011
pub struct MinByFuture<S: Stream, F> {
1112
stream: S,
1213
compare: F,
@@ -27,7 +28,7 @@ impl<S: Stream + Unpin, F> MinByFuture<S, F> {
2728

2829
impl<S, F> Future for MinByFuture<S, F>
2930
where
30-
S: futures_core::stream::Stream + Unpin,
31+
S: Stream + Unpin,
3132
S::Item: Copy,
3233
F: FnMut(&S::Item, &S::Item) -> Ordering,
3334
{

src/stream/stream/mod.rs

+62-10
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@ use next::NextFuture;
3636

3737
use std::cmp::Ordering;
3838
use std::marker::PhantomData;
39+
use std::pin::Pin;
3940

4041
use cfg_if::cfg_if;
4142

43+
use crate::task::{Context, Poll};
44+
4245
cfg_if! {
4346
if #[cfg(feature = "docs")] {
4447
#[doc(hidden)]
@@ -73,6 +76,55 @@ pub trait Stream {
7376
/// The type of items yielded by this stream.
7477
type Item;
7578

79+
/// Attempts to receive the next item from the stream.
80+
///
81+
/// There are several possible return values:
82+
///
83+
/// * `Poll::Pending` means this stream's next value is not ready yet.
84+
/// * `Poll::Ready(None)` means this stream has been exhausted.
85+
/// * `Poll::Ready(Some(item))` means `item` was received out of the stream.
86+
///
87+
/// # Examples
88+
///
89+
/// ```
90+
/// # fn main() { async_std::task::block_on(async {
91+
/// #
92+
/// use std::pin::Pin;
93+
///
94+
/// use async_std::prelude::*;
95+
/// use async_std::stream;
96+
/// use async_std::task::{Context, Poll};
97+
///
98+
/// fn increment(s: impl Stream<Item = i32> + Unpin) -> impl Stream<Item = i32> + Unpin {
99+
/// struct Increment<S>(S);
100+
///
101+
/// impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
102+
/// type Item = S::Item;
103+
///
104+
/// fn poll_next(
105+
/// mut self: Pin<&mut Self>,
106+
/// cx: &mut Context<'_>,
107+
/// ) -> Poll<Option<Self::Item>> {
108+
/// match Pin::new(&mut self.0).poll_next(cx) {
109+
/// Poll::Pending => Poll::Pending,
110+
/// Poll::Ready(None) => Poll::Ready(None),
111+
/// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
112+
/// }
113+
/// }
114+
/// }
115+
///
116+
/// Increment(s)
117+
/// }
118+
///
119+
/// let mut s = increment(stream::once(7));
120+
///
121+
/// assert_eq!(s.next().await, Some(8));
122+
/// assert_eq!(s.next().await, None);
123+
/// #
124+
/// # }) }
125+
/// ```
126+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
127+
76128
/// Advances the stream and returns the next value.
77129
///
78130
/// Returns [`None`] when iteration is finished. Individual stream implementations may
@@ -98,7 +150,10 @@ pub trait Stream {
98150
/// ```
99151
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
100152
where
101-
Self: Unpin;
153+
Self: Unpin,
154+
{
155+
NextFuture { stream: self }
156+
}
102157

103158
/// Creates a stream that yields its first `n` elements.
104159
///
@@ -207,13 +262,13 @@ pub trait Stream {
207262
#[inline]
208263
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item)
209264
where
210-
Self: Sized,
265+
Self: Unpin + Sized,
211266
F: FnMut(Self::Item) -> bool,
212267
{
213268
AllFuture {
214269
stream: self,
215270
result: true, // the default if the empty stream
216-
__item: PhantomData,
271+
_marker: PhantomData,
217272
f,
218273
}
219274
}
@@ -264,13 +319,13 @@ pub trait Stream {
264319
#[inline]
265320
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item)
266321
where
267-
Self: Sized,
322+
Self: Unpin + Sized,
268323
F: FnMut(Self::Item) -> bool,
269324
{
270325
AnyFuture {
271326
stream: self,
272327
result: false, // the default if the empty stream
273-
__item: PhantomData,
328+
_marker: PhantomData,
274329
f,
275330
}
276331
}
@@ -279,10 +334,7 @@ pub trait Stream {
279334
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
280335
type Item = <Self as futures_core::stream::Stream>::Item;
281336

282-
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
283-
where
284-
Self: Unpin,
285-
{
286-
NextFuture { stream: self }
337+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
338+
futures_core::stream::Stream::poll_next(self, cx)
287339
}
288340
}

src/stream/stream/next.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1+
use std::pin::Pin;
2+
13
use crate::future::Future;
4+
use crate::stream::Stream;
25
use crate::task::{Context, Poll};
3-
use std::pin::Pin;
46

57
#[doc(hidden)]
68
#[allow(missing_debug_implementations)]
79
pub struct NextFuture<'a, T: Unpin + ?Sized> {
810
pub(crate) stream: &'a mut T,
911
}
1012

11-
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
13+
impl<T: Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
1214
type Output = Option<T::Item>;
1315

1416
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

src/stream/stream/take.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use crate::task::{Context, Poll};
2-
31
use std::pin::Pin;
42

3+
use crate::stream::Stream;
4+
use crate::task::{Context, Poll};
5+
56
/// A stream that yields the first `n` items of another stream.
67
#[derive(Clone, Debug)]
78
pub struct Take<S> {
@@ -11,12 +12,12 @@ pub struct Take<S> {
1112

1213
impl<S: Unpin> Unpin for Take<S> {}
1314

14-
impl<S: futures_core::stream::Stream> Take<S> {
15+
impl<S: Stream> Take<S> {
1516
pin_utils::unsafe_pinned!(stream: S);
1617
pin_utils::unsafe_unpinned!(remaining: usize);
1718
}
1819

19-
impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> {
20+
impl<S: Stream> futures_core::stream::Stream for Take<S> {
2021
type Item = S::Item;
2122

2223
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {

0 commit comments

Comments
 (0)