Skip to content

Commit 3bbfefe

Browse files
committed
StreamFut => PollStreamFut with updated constructor
1 parent aba9d26 commit 3bbfefe

File tree

1 file changed

+14
-16
lines changed

1 file changed

+14
-16
lines changed

futures-util/src/stream/stream/flat_map_unordered.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const NEED_TO_POLL_FUTURES: u8 = 0b1;
2424
/// Indicates that `stream` needs to be polled.
2525
const NEED_TO_POLL_STREAM: u8 = 0b10;
2626

27-
/// Indicates that we need to poll something.
27+
/// Indicates that it needs to poll something.
2828
const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;
2929

3030
/// Indicates that current stream is polled at the moment.
@@ -76,7 +76,7 @@ struct PollWaker {
7676
impl ArcWake for PollWaker {
7777
fn wake_by_ref(self_arc: &Arc<Self>) {
7878
let poll_state_value = self_arc.poll_state.set_or(self_arc.need_to_poll);
79-
// Only call waker if we're not polling because we will call it at the end
79+
// Only call waker if stream isn't polled because it will called at the end
8080
// of polling if it needs to poll something.
8181
if poll_state_value & POLLING == NONE {
8282
self_arc.inner_waker.wake_by_ref();
@@ -90,25 +90,25 @@ impl ArcWake for PollWaker {
9090
/// If `poll_next` will return `Poll::Pending`, it will be forwared to
9191
/// the future, and current task will be notified by waker.
9292
#[must_use = "futures do nothing unless you `.await` or poll them"]
93-
struct StreamFut<St> {
93+
struct PollStreamFut<St> {
9494
stream: Option<St>,
9595
}
9696

97-
impl<St> StreamFut<St> {
97+
impl<St> PollStreamFut<St> {
9898
unsafe_pinned!(stream: Option<St>);
9999

100-
/// Constructs new `StreamFut` using given `stream`.
101-
fn new(stream: St) -> Self {
100+
/// Constructs new `PollStreamFut` using given `stream`.
101+
fn new(stream: impl Into<Option<St>>) -> Self {
102102
Self {
103103
stream: stream.into(),
104104
}
105105
}
106106
}
107107

108-
impl<St: Stream + Unpin> Unpin for StreamFut<St> {}
108+
impl<St: Stream + Unpin> Unpin for PollStreamFut<St> {}
109109

110-
impl<St: Stream> Future for StreamFut<St> {
111-
type Output = Option<(St::Item, StreamFut<St>)>;
110+
impl<St: Stream> Future for PollStreamFut<St> {
111+
type Output = Option<(St::Item, PollStreamFut<St>)>;
112112

113113
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
114114
let item = if let Some(stream) = self.as_mut().stream().as_pin_mut() {
@@ -120,9 +120,7 @@ impl<St: Stream> Future for StreamFut<St> {
120120
Poll::Ready(item.map(|item| {
121121
(
122122
item,
123-
StreamFut {
124-
stream: unsafe { self.get_unchecked_mut().stream.take() },
125-
},
123+
PollStreamFut::new(unsafe { self.get_unchecked_mut().stream.take() }),
126124
)
127125
}))
128126
}
@@ -133,7 +131,7 @@ impl<St: Stream> Future for StreamFut<St> {
133131
#[must_use = "streams do nothing unless polled"]
134132
pub struct FlatMapUnordered<St: Stream, U: Stream, F: FnMut(St::Item) -> U> {
135133
poll_state: SharedPollState,
136-
futures: FuturesUnordered<StreamFut<U>>,
134+
futures: FuturesUnordered<PollStreamFut<U>>,
137135
stream: Map<St, F>,
138136
limit: Option<NonZeroUsize>,
139137
is_stream_done: bool,
@@ -170,15 +168,15 @@ where
170168
U: Stream,
171169
F: FnMut(St::Item) -> U,
172170
{
173-
unsafe_pinned!(futures: FuturesUnordered<StreamFut<U>>);
171+
unsafe_pinned!(futures: FuturesUnordered<PollStreamFut<U>>);
174172
unsafe_pinned!(stream: Map<St, F>);
175173
unsafe_unpinned!(is_stream_done: bool);
176174
unsafe_unpinned!(limit: Option<NonZeroUsize>);
177175
unsafe_unpinned!(poll_state: SharedPollState);
178176

179177
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> FlatMapUnordered<St, U, F> {
180178
FlatMapUnordered {
181-
// Because to create first future, we need to get inner
179+
// Because to create first future, it needs to get inner
182180
// stream from `stream`
183181
poll_state: SharedPollState::new(NEED_TO_POLL_STREAM),
184182
futures: FuturesUnordered::new(),
@@ -290,7 +288,7 @@ where
290288
self.as_mut().stream().poll_next(ctx)
291289
} {
292290
Poll::Ready(Some(inner_stream)) => {
293-
self.as_mut().futures().push(StreamFut::new(inner_stream));
291+
self.as_mut().futures().push(PollStreamFut::new(inner_stream));
294292
need_to_poll_next |= NEED_TO_POLL_STREAM;
295293
// Polling futures in current iteration with the same context
296294
// is ok because we already received `Poll::Ready` from

0 commit comments

Comments
 (0)