Skip to content

Commit fe55019

Browse files
committed
Improve logic and docs
1 parent b39e690 commit fe55019

File tree

2 files changed

+124
-123
lines changed

2 files changed

+124
-123
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 87 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,49 @@
1-
use crate::stream::FuturesUnordered;
21
use alloc::sync::Arc;
3-
use core::cell::UnsafeCell;
4-
use core::fmt;
5-
use core::num::NonZeroUsize;
6-
use core::pin::Pin;
7-
use core::sync::atomic::{AtomicU8, Ordering};
8-
use futures_core::future::Future;
9-
use futures_core::stream::FusedStream;
10-
use futures_core::stream::Stream;
2+
use core::{
3+
cell::UnsafeCell,
4+
fmt,
5+
num::NonZeroUsize,
6+
pin::Pin,
7+
sync::atomic::{AtomicU8, Ordering},
8+
};
9+
10+
use pin_project_lite::pin_project;
11+
1112
use futures_core::{
13+
future::Future,
1214
ready,
15+
stream::{FusedStream, Stream},
1316
task::{Context, Poll, Waker},
1417
};
1518
#[cfg(feature = "sink")]
1619
use futures_sink::Sink;
1720
use futures_task::{waker, ArcWake};
18-
use pin_project_lite::pin_project;
21+
22+
use crate::stream::FuturesUnordered;
1923

2024
/// Indicates that there is nothing to poll and stream isn't being polled at
2125
/// the moment.
2226
const NONE: u8 = 0;
2327

24-
/// This indicates that inner streams need to be polled.
28+
/// Indicates that inner streams need to be polled.
2529
const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
2630

27-
/// This indicates that stream needs to be polled.
31+
/// Indicates that stream needs to be polled.
2832
const NEED_TO_POLL_STREAM: u8 = 0b10;
2933

30-
/// This indicates that it needs to poll stream and inner streams.
34+
/// Indicates that it needs to poll stream and inner streams.
3135
const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
3236

33-
/// This indicates that the current stream is being polled at the moment.
37+
/// Indicates that the current stream is being polled at the moment.
3438
const POLLING: u8 = 0b100;
3539

36-
/// This indicates that inner streams are being waked at the moment.
40+
/// Indicates that inner streams are being waked at the moment.
3741
const WAKING_INNER_STREAMS: u8 = 0b1000;
3842

39-
/// This indicates that the current stream is being waked at the moment.
43+
/// Indicates that the current stream is being waked at the moment.
4044
const WAKING_STREAM: u8 = 0b10000;
4145

42-
/// This indicates that the current stream or inner streams are being waked at the moment.
46+
/// Indicates that the current stream or inner streams are being waked at the moment.
4347
const WAKING_ANYTHING: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
4448

4549
/// Determines what needs to be polled, and is stream being polled at the
@@ -56,7 +60,7 @@ impl SharedPollState {
5660
}
5761

5862
/// Attempts to start polling, returning stored state in case of success.
59-
/// Returns `None` if state some waker is waking at the moment.
63+
/// Returns `None` if some waker is waking at the moment.
6064
fn start_polling(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState)>)> {
6165
self.state
6266
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
@@ -70,7 +74,7 @@ impl SharedPollState {
7074
.map(|value| {
7175
(
7276
value,
73-
PollStateBomb::new(self, move |state| {
77+
PollStateBomb::new(self, |state| {
7478
state.stop_polling(NEED_TO_POLL_ALL);
7579
}),
7680
)
@@ -108,8 +112,34 @@ impl SharedPollState {
108112
}
109113
}
110114

111-
/// Waker which will update `poll_state` with `need_to_poll` value on
112-
/// `wake_by_ref` call and then, if there is a need, call `inner_waker`.
115+
/// Used to execute some function on the given state when dropped.
116+
struct PollStateBomb<'a, F: FnOnce(&SharedPollState)> {
117+
state: &'a SharedPollState,
118+
drop: Option<F>,
119+
}
120+
121+
impl<'a, F: FnOnce(&SharedPollState)> PollStateBomb<'a, F> {
122+
/// Constructs new bomb with the given state.
123+
fn new(state: &'a SharedPollState, drop: F) -> Self {
124+
Self { state, drop: Some(drop) }
125+
}
126+
127+
/// Deactivates bomb, forces it to not call provided function when dropped.
128+
fn deactivate(mut self) {
129+
self.drop.take();
130+
}
131+
}
132+
133+
impl<F: FnOnce(&SharedPollState)> Drop for PollStateBomb<'_, F> {
134+
fn drop(&mut self) {
135+
if let Some(drop) = self.drop.take() {
136+
(drop)(self.state);
137+
}
138+
}
139+
}
140+
141+
/// Will update state with the provided value on `wake_by_ref` call
142+
/// and then, if there is a need, call `inner_waker`.
113143
struct InnerWaker {
114144
inner_waker: UnsafeCell<Option<Waker>>,
115145
poll_state: SharedPollState,
@@ -139,30 +169,6 @@ impl InnerWaker {
139169
}
140170
}
141171

142-
///
143-
struct PollStateBomb<'a, F: FnOnce(&SharedPollState)> {
144-
state: &'a SharedPollState,
145-
drop: Option<F>,
146-
}
147-
148-
impl<'a, F: FnOnce(&SharedPollState)> PollStateBomb<'a, F> {
149-
fn new(state: &'a SharedPollState, drop: F) -> Self {
150-
Self { state, drop: Some(drop) }
151-
}
152-
153-
fn deactivate(mut self) {
154-
self.drop.take();
155-
}
156-
}
157-
158-
impl<F: FnOnce(&SharedPollState)> Drop for PollStateBomb<'_, F> {
159-
fn drop(&mut self) {
160-
if let Some(drop) = self.drop.take() {
161-
(drop)(&self.state);
162-
}
163-
}
164-
}
165-
166172
impl ArcWake for InnerWaker {
167173
fn wake_by_ref(self_arc: &Arc<Self>) {
168174
let (poll_state_value, state_bomb) = self_arc.start_waking();
@@ -175,7 +181,7 @@ impl ArcWake for InnerWaker {
175181
{
176182
// First, stop waking to allow polling stream
177183
drop(state_bomb);
178-
// Wake inner waker
184+
// Wake up inner waker
179185
inner_waker.wake();
180186
}
181187
}
@@ -221,8 +227,8 @@ impl<St: Stream + Unpin> Future for PollStreamFut<St> {
221227
pin_project! {
222228
/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
223229
/// method.
224-
#[must_use = "streams do nothing unless polled"]
225230
#[project = FlattenUnorderedProj]
231+
#[must_use = "streams do nothing unless polled"]
226232
pub struct FlattenUnordered<St, U> {
227233
#[pin]
228234
inner_streams: FuturesUnordered<PollStreamFut<U>>,
@@ -325,45 +331,36 @@ where
325331
}
326332
};
327333

328-
let mut polling_with_two_wakers =
329-
!this.is_exceeded_limit() && poll_state_value & NEED_TO_POLL_ALL == NEED_TO_POLL_ALL;
330-
331334
if poll_state_value & NEED_TO_POLL_STREAM != NONE {
335+
// Safety: now state is `POLLING`.
336+
let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };
337+
338+
// Here we need to poll the inner stream.
339+
//
340+
// To improve performance, we will attempt to place as many items as we can
341+
// to the `FuturesUnordered` bucket before polling inner streams
332342
loop {
333343
if this.is_exceeded_limit() || *this.is_stream_done {
334-
polling_with_two_wakers = false;
335-
need_to_poll_next |= NEED_TO_POLL_STREAM;
344+
// We either exceeded the limit or the stream is exhausted
345+
if !*this.is_stream_done {
346+
// The stream needs to be polled in the next iteration
347+
need_to_poll_next |= NEED_TO_POLL_STREAM;
348+
}
336349

337350
break;
338351
} else {
339-
match if polling_with_two_wakers {
340-
// Safety: now state is `POLLING`.
341-
let waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };
342-
let mut cx = Context::from_waker(&waker);
343-
this.stream.as_mut().poll_next(&mut cx)
344-
} else {
345-
this.stream.as_mut().poll_next(cx)
346-
} {
352+
match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) {
347353
Poll::Ready(Some(inner_stream)) => {
354+
// Add new stream to the inner streams bucket
348355
this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream));
349-
need_to_poll_next |= NEED_TO_POLL_STREAM;
350-
// Polling inner streams in current iteration with the same context
351-
// is ok because we already received `Poll::Ready` from
352-
// stream
356+
// Inner streams must be polled afterward
353357
poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
354-
*this.is_stream_done = false;
355358
}
356359
Poll::Ready(None) => {
357-
// Polling inner streams in current iteration with the same context
358-
// is ok because we already received `Poll::Ready` from
359-
// stream
360+
// Mark the stream as done
360361
*this.is_stream_done = true;
361362
}
362363
Poll::Pending => {
363-
if !polling_with_two_wakers {
364-
need_to_poll_next |= NEED_TO_POLL_STREAM;
365-
}
366-
*this.is_stream_done = false;
367364
break;
368365
}
369366
}
@@ -372,41 +369,43 @@ where
372369
}
373370

374371
if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
375-
match if polling_with_two_wakers {
376-
// Safety: now state is `POLLING`.
377-
let waker = unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) };
378-
let mut cx = Context::from_waker(&waker);
379-
this.inner_streams.as_mut().poll_next(&mut cx)
380-
} else {
381-
this.inner_streams.as_mut().poll_next(cx)
382-
} {
372+
// Safety: now state is `POLLING`.
373+
let inner_streams_waker =
374+
unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) };
375+
376+
match this
377+
.inner_streams
378+
.as_mut()
379+
.poll_next(&mut Context::from_waker(&inner_streams_waker))
380+
{
383381
Poll::Ready(Some(Some((item, next_item_fut)))) => {
382+
// Push next inner stream item future to the list of inner streams futures
384383
this.inner_streams.as_mut().push(next_item_fut);
384+
// Take the received item
385385
next_item = Some(item);
386+
// On the next iteration, inner streams must be polled again
386387
need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
387388
}
388389
Poll::Ready(Some(None)) => {
390+
// On the next iteration, inner streams must be polled again
389391
need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
390392
}
391-
Poll::Pending => {
392-
if !polling_with_two_wakers {
393-
need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
394-
}
395-
}
396-
Poll::Ready(None) => {
397-
need_to_poll_next &= !NEED_TO_POLL_INNER_STREAMS;
398-
}
393+
_ => {}
399394
}
400395
}
401396

397+
// We didn't have any `poll_next` panic, so it's time to deactivate the bomb
402398
state_bomb.deactivate();
399+
// Stop polling and swap the latest state
403400
poll_state_value = this.poll_state.stop_polling(need_to_poll_next);
404401
let is_done = *this.is_stream_done && this.inner_streams.is_empty();
405402

406403
if next_item.is_some() || is_done {
407404
Poll::Ready(next_item)
408405
} else {
406+
// We need to call the waker if state was changed during the polling phase
409407
if poll_state_value & NEED_TO_POLL_ALL != NONE
408+
// or we need to poll the stream and didn't reach the limit yet
410409
|| !this.is_exceeded_limit() && need_to_poll_next & NEED_TO_POLL_STREAM != NONE
411410
{
412411
cx.waker().wake_by_ref();

0 commit comments

Comments
 (0)