Skip to content

Commit d600a6a

Browse files
committed
Tweaks
1 parent 69f60cc commit d600a6a

File tree

1 file changed

+42
-32
lines changed

1 file changed

+42
-32
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use alloc::sync::Arc;
22
use core::{
33
cell::UnsafeCell,
4+
convert::identity,
45
fmt,
56
num::NonZeroUsize,
67
pin::Pin,
@@ -67,41 +68,48 @@ impl SharedPollState {
6768
fn start_polling(
6869
&self,
6970
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
70-
self.state
71+
let value = self
72+
.state
7173
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
7274
if value & WAKING_ALL == NONE {
7375
Some(POLLING)
7476
} else {
7577
None
7678
}
7779
})
78-
.ok()
79-
.map(|value| {
80-
let bomb = PollStateBomb::new(self, SharedPollState::reset);
80+
.ok()?;
81+
let bomb = PollStateBomb::new(self, SharedPollState::reset);
8182

82-
(value, bomb)
83-
})
83+
Some((value, bomb))
8484
}
8585

8686
/// Starts the waking process and performs bitwise or with the given value.
8787
fn start_waking(
8888
&self,
8989
to_poll: u8,
9090
waking: u8,
91-
) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
91+
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
9292
let value = self
9393
.state
9494
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
95+
let next_value = value | to_poll;
96+
9597
if value & (WOKEN | POLLING) == NONE {
96-
Some(value | to_poll | waking)
98+
Some(next_value | waking)
99+
} else if next_value != value {
100+
Some(next_value)
97101
} else {
98-
Some(value | to_poll)
102+
None
99103
}
100104
})
101-
.unwrap();
105+
.ok()?;
102106
let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
103107

104-
(value, bomb)
108+
if value & (WOKEN | POLLING) == NONE {
109+
Some((value, bomb))
110+
} else {
111+
None
112+
}
105113
}
106114

107115
/// Sets current state to
@@ -129,13 +137,23 @@ impl SharedPollState {
129137
/// Toggles state to non-waking, allowing to start polling.
130138
fn stop_waking(&self, waking: u8) -> u8 {
131139
self.state
132-
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| Some(value & !waking | WOKEN))
133-
.unwrap()
140+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
141+
let next_value = value & !waking;
142+
143+
if value & WAKING_ALL == waking {
144+
Some(next_value | WOKEN)
145+
} else if next_value != value {
146+
Some(next_value)
147+
} else {
148+
None
149+
}
150+
})
151+
.unwrap_or_else(identity)
134152
}
135153

136154
/// Resets current state allowing to poll the stream and wake up wakers.
137155
fn reset(&self) -> u8 {
138-
self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
156+
self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
139157
}
140158
}
141159

@@ -195,8 +213,9 @@ impl InnerWaker {
195213
waker(self_arc.clone())
196214
}
197215

198-
/// Flags state that waking is started for the waker with the given value.
199-
fn start_waking(&self) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
216+
/// Attempts to start the waking process for the waker with the given value.
217+
/// If succeeded, then the stream isn't yet woken and not being polled at the moment.
218+
fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
200219
self.poll_state.start_waking(self.need_to_poll, self.waking_state())
201220
}
202221

@@ -208,33 +227,24 @@ impl InnerWaker {
208227

209228
impl ArcWake for InnerWaker {
210229
fn wake_by_ref(self_arc: &Arc<Self>) {
211-
let (poll_state_value, state_bomb) = self_arc.start_waking();
212-
213-
// Only call waker if the stream
214-
// - isn't woken already
215-
// - isn't being polled at the moment because of safety reasons
216-
//
217-
// Waker will be called at the end of polling if state was changed
218-
if poll_state_value & (WOKEN | POLLING) == NONE {
230+
if let Some((_, state_bomb)) = self_arc.start_waking() {
219231
// Safety: now state is not `POLLING`
220232
let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
221233

222234
if let Some(inner_waker) = waker_opt.clone() {
223235
// Stop waking to allow polling stream
224236
let poll_state_value = state_bomb.fire().unwrap();
225237

226-
// Check if the stream isn't woken yet
227-
if poll_state_value & WOKEN == NONE {
238+
// Here we want to call waker only if stream isn't woken yet and
239+
// also to optimize the case when two wakers are called at the same time.
240+
//
241+
// In this case the best strategy will be to propagate only the latest waker's awake,
242+
// and then poll both entities in a single `poll_next` call
243+
if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() {
228244
// Wake up inner waker
229245
inner_waker.wake();
230246
}
231247
}
232-
} else {
233-
// At the end of polling state will become `!WAKING_ALL` if stream is being polled
234-
//
235-
// And it's already non-waking if it's woken
236-
237-
state_bomb.deactivate();
238248
}
239249
}
240250
}

0 commit comments

Comments
 (0)