Skip to content

Commit e402c5f

Browse files
committed
Poll both in case if ther's nothing to poll (poll was invoked independently from wakers) + update benchmark
1 parent 03a3814 commit e402c5f

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

futures-util/benches/flatten_unordered.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use std::collections::VecDeque;
1212
use std::thread;
1313

1414
#[bench]
15-
fn inner_oneshots(b: &mut Bencher) {
16-
const STREAM_COUNT: usize = 1_000;
17-
const STREAM_ITEM_COUNT: usize = 10;
15+
fn oneshot_streams(b: &mut Bencher) {
16+
const STREAM_COUNT: usize = 10_000;
17+
const STREAM_ITEM_COUNT: usize = 1;
1818

1919
b.iter(|| {
2020
let mut txs = VecDeque::with_capacity(STREAM_COUNT);
@@ -27,10 +27,10 @@ fn inner_oneshots(b: &mut Bencher) {
2727
}
2828

2929
thread::spawn(move || {
30-
let mut m = 1;
30+
let mut last = 1;
3131
while let Some(tx) = txs.pop_front() {
32-
let _ = tx.send(stream::iter(m..m + STREAM_ITEM_COUNT));
33-
m += STREAM_ITEM_COUNT;
32+
let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
33+
last += STREAM_ITEM_COUNT;
3434
}
3535
});
3636

@@ -52,11 +52,14 @@ fn inner_oneshots(b: &mut Bencher) {
5252
loop {
5353
match flatten.poll_next_unpin(cx) {
5454
Poll::Ready(None) => break,
55-
Poll::Ready(Some(_)) => count += 1,
55+
Poll::Ready(Some(_)) => {
56+
count += 1;
57+
}
5658
_ => {}
5759
}
5860
}
5961
assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
62+
6063
Poll::Ready(())
6164
}))
6265
});

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const NEED_TO_POLL_FUTURES: u8 = 1;
2525
const NEED_TO_POLL_STREAM: u8 = 0b10;
2626

2727
/// Indicates that it needs to poll something.
28-
const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;
28+
const NEED_TO_POLL_BOTH: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;
2929

3030
/// Indicates that current stream is polled at the moment.
3131
const POLLING: u8 = 0b100;
@@ -63,7 +63,7 @@ impl SharedPollState {
6363
/// with non-`POLLING` state, and returns disjunction result.
6464
fn end_polling(&self, mut to_poll: u8) -> u8 {
6565
to_poll |= self.state.swap(!POLLING & !WOKEN, Ordering::SeqCst);
66-
self.state.fetch_and(to_poll & !POLLING & !WOKEN, Ordering::SeqCst);
66+
self.state.swap(to_poll & !POLLING & !WOKEN, Ordering::SeqCst);
6767
to_poll
6868
}
6969
}
@@ -101,8 +101,8 @@ impl ArcWake for PollWaker {
101101
// Only call waker if stream isn't being polled because it will be called
102102
// at the end of polling if state was changed.
103103
if poll_state_value & (POLLING | WOKEN) == NONE {
104-
self_arc.poll_state.set_or(WOKEN);
105104
if let Some(Some(inner_waker)) = unsafe { self_arc.inner_waker.get().as_ref() } {
105+
self_arc.poll_state.set_or(WOKEN);
106106
inner_waker.wake_by_ref();
107107
}
108108
}
@@ -248,7 +248,12 @@ where
248248
let mut this = self.project();
249249

250250
let mut poll_state_value = this.poll_state.begin_polling();
251-
let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL == NEED_TO_POLL && !stream_will_be_woken;
251+
252+
if poll_state_value & NEED_TO_POLL_BOTH == NONE {
253+
poll_state_value = NEED_TO_POLL_STREAM | if this.futures.is_empty() { NEED_TO_POLL_FUTURES } else { NONE };
254+
}
255+
256+
let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL_BOTH == NEED_TO_POLL_BOTH && !stream_will_be_woken;
252257

253258
if poll_state_value & NEED_TO_POLL_STREAM != NONE {
254259
if !stream_will_be_woken {
@@ -323,18 +328,18 @@ where
323328

324329
let is_done = *this.is_stream_done && this.futures.is_empty();
325330

326-
if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL != NONE
331+
if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL_BOTH != NONE
327332
&& (polling_with_two_wakers
328333
|| poll_state_value & NEED_TO_POLL_FUTURES != NONE && !futures_will_be_woken
329334
|| poll_state_value & NEED_TO_POLL_STREAM != NONE && !stream_will_be_woken)
330335
{
331336
ctx.waker().wake_by_ref();
332337
}
333338

334-
if next_item.is_none() && !is_done {
335-
Poll::Pending
336-
} else {
339+
if next_item.is_some() || is_done {
337340
Poll::Ready(next_item)
341+
} else {
342+
Poll::Pending
338343
}
339344
}
340345
}

0 commit comments

Comments
 (0)