Skip to content

Commit 6096ba6

Browse files
committed
Misc improvements
1 parent 4091d66 commit 6096ba6

File tree

1 file changed

+60
-38
lines changed

1 file changed

+60
-38
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const WAKING_INNER_STREAMS: u8 = 0b1000;
4343
/// The base stream is being woken at the moment.
4444
const WAKING_STREAM: u8 = 0b10000;
4545

46-
/// The base stream or inner streams are being woken at the moment.
46+
/// The base stream and inner streams are being woken at the moment.
4747
const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
4848

4949
/// Determines what needs to be polled, and is stream being polled at the
@@ -61,7 +61,9 @@ impl SharedPollState {
6161

6262
/// Attempts to start polling, returning stored state in case of success.
6363
/// Returns `None` if some waker is waking at the moment.
64-
fn start_polling(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState)>)> {
64+
fn start_polling(
65+
&self,
66+
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
6567
self.state
6668
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
6769
if value & WAKING_ALL == NONE {
@@ -72,12 +74,9 @@ impl SharedPollState {
7274
})
7375
.ok()
7476
.map(|value| {
75-
(
76-
value,
77-
PollStateBomb::new(self, |state| {
78-
state.stop_polling(NEED_TO_POLL_ALL);
79-
}),
80-
)
77+
let bomb = PollStateBomb::new(self, |state| state.stop_polling(NEED_TO_POLL_ALL));
78+
79+
(value, bomb)
8180
})
8281
}
8382

@@ -86,39 +85,39 @@ impl SharedPollState {
8685
&self,
8786
to_poll: u8,
8887
waking: u8,
89-
) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState)>) {
88+
) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
9089
let value = self.state.fetch_or(to_poll | waking, Ordering::SeqCst);
90+
let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
9191

92-
(
93-
value,
94-
PollStateBomb::new(self, move |state| {
95-
state.stop_waking(waking);
96-
}),
97-
)
92+
(value, bomb)
9893
}
9994

100-
/// Toggles state to non-waking, allowing to start polling.
101-
fn stop_waking(&self, waking: u8) -> u8 {
102-
self.state.fetch_and(!waking, Ordering::SeqCst)
103-
}
104-
105-
/// Sets current state to `!POLLING`, allowing to use wakers.
95+
/// Sets current state to `!POLLING` allowing to use wakers and `!WALING_ALL` as
96+
/// - Wakers called during the `POLLING` phase won't propagate their calls
97+
/// - `POLLING` phase can't start if some of the wakers are active
98+
///
99+
/// So no waker can touch the inner wakers, it's safe to poll again.
106100
fn stop_polling(&self, to_poll: u8) -> u8 {
107101
self.state
108102
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
109-
Some((value | to_poll) & !POLLING)
103+
Some((value | to_poll) & !POLLING & !WAKING_ALL)
110104
})
111105
.unwrap()
112106
}
107+
108+
/// Toggles state to non-waking, allowing to start polling.
109+
fn stop_waking(&self, waking: u8) -> u8 {
110+
self.state.fetch_and(!waking, Ordering::SeqCst)
111+
}
113112
}
114113

115114
/// Used to execute some function on the given state when dropped.
116-
struct PollStateBomb<'a, F: FnOnce(&SharedPollState)> {
115+
struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> {
117116
state: &'a SharedPollState,
118117
drop: Option<F>,
119118
}
120119

121-
impl<'a, F: FnOnce(&SharedPollState)> PollStateBomb<'a, F> {
120+
impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
122121
/// Constructs new bomb with the given state.
123122
fn new(state: &'a SharedPollState, drop: F) -> Self {
124123
Self { state, drop: Some(drop) }
@@ -128,9 +127,14 @@ impl<'a, F: FnOnce(&SharedPollState)> PollStateBomb<'a, F> {
128127
fn deactivate(mut self) {
129128
self.drop.take();
130129
}
130+
131+
/// Manually fires the bomb, returning supplied state.
132+
fn fire(mut self) -> Option<u8> {
133+
self.drop.take().map(|drop| (drop)(self.state))
134+
}
131135
}
132136

133-
impl<F: FnOnce(&SharedPollState)> Drop for PollStateBomb<'_, F> {
137+
impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
134138
fn drop(&mut self) {
135139
if let Some(drop) = self.drop.take() {
136140
(drop)(self.state);
@@ -163,27 +167,46 @@ impl InnerWaker {
163167
waker(self_arc.clone())
164168
}
165169

166-
// Flags state that waking is started for the waker with the given value.
167-
fn start_waking(&self) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState)>) {
168-
self.poll_state.start_waking(self.need_to_poll, self.need_to_poll << 3)
170+
/// Flags state that waking is started for the waker with the given value.
171+
fn start_waking(&self) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
172+
self.poll_state.start_waking(self.need_to_poll, self.waking_state())
173+
}
174+
175+
/// Returns the corresponding waking state toggled by this waker.
176+
fn waking_state(&self) -> u8 {
177+
self.need_to_poll << 3
169178
}
170179
}
171180

172181
impl ArcWake for InnerWaker {
173182
fn wake_by_ref(self_arc: &Arc<Self>) {
174183
let (poll_state_value, state_bomb) = self_arc.start_waking();
175184

176-
// Only call waker if stream isn't being polled because of safety reasons.
177-
// Waker will be called at the end of polling if state was changed.
185+
// Only call waker if stream isn't being polled because of safety reasons
186+
//
187+
// Waker will be called at the end of polling if state was changed
178188
if poll_state_value & POLLING == NONE {
179-
if let Some(inner_waker) =
180-
unsafe { self_arc.inner_waker.get().as_ref().cloned().flatten() }
181-
{
182-
// First, stop waking to allow polling stream
183-
drop(state_bomb);
184-
// Wake up inner waker
185-
inner_waker.wake();
189+
// Safety: now state is not `POLLING`
190+
let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
191+
192+
if let Some(inner_waker) = waker_opt.clone() {
193+
// Stop waking to allow polling stream
194+
let poll_state_value = state_bomb.fire().unwrap();
195+
196+
// Here we want to optimize the case when two wakers are called at the same time
197+
//
198+
// In this case the best strategy will be to propagate only the latest waker's awake,
199+
// and then poll both entities in a single `poll_next` call
200+
201+
// Check if this is the latest waker being waking
202+
if poll_state_value & WAKING_ALL == self_arc.waking_state() {
203+
// Wake up inner waker
204+
inner_waker.wake();
205+
}
186206
}
207+
} else {
208+
// At the end of polling state will become `!WAKING_ALL`
209+
state_bomb.deactivate();
187210
}
188211
}
189212
}
@@ -426,7 +449,6 @@ where
426449
impl<St, Item> Sink<Item> for FlattenUnordered<St>
427450
where
428451
St: Stream + Sink<Item>,
429-
St::Item: Stream,
430452
{
431453
type Error = St::Error;
432454

0 commit comments

Comments
 (0)