|
1 | | -use crate::task::{ArcWake, waker_ref}; |
| 1 | +use crate::task::{waker_ref, ArcWake}; |
2 | 2 | use futures_core::future::{FusedFuture, Future}; |
3 | 3 | use futures_core::task::{Context, Poll, Waker}; |
4 | 4 | use slab::Slab; |
@@ -54,13 +54,15 @@ unsafe impl<Fut> Send for Inner<Fut> |
54 | 54 | where |
55 | 55 | Fut: Future + Send, |
56 | 56 | Fut::Output: Send + Sync, |
57 | | -{} |
| 57 | +{ |
| 58 | +} |
58 | 59 |
|
59 | 60 | unsafe impl<Fut> Sync for Inner<Fut> |
60 | 61 | where |
61 | 62 | Fut: Future + Send, |
62 | 63 | Fut::Output: Send + Sync, |
63 | | -{} |
| 64 | +{ |
| 65 | +} |
64 | 66 |
|
65 | 67 | const IDLE: usize = 0; |
66 | 68 | const POLLING: usize = 1; |
@@ -125,27 +127,20 @@ where |
125 | 127 | fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { |
126 | 128 | let mut wakers_guard = self.notifier.wakers.lock().unwrap(); |
127 | 129 |
|
128 | | - let wakers = if let Some(wakers) = wakers_guard.as_mut() { |
129 | | - wakers |
130 | | - } else { |
131 | | - return; |
| 130 | + let wakers = match wakers_guard.as_mut() { |
| 131 | + Some(wakers) => wakers, |
| 132 | + None => return, |
132 | 133 | }; |
133 | 134 |
|
| 135 | + let new_waker = cx.waker(); |
| 136 | + |
134 | 137 | if *waker_key == NULL_WAKER_KEY { |
135 | | - *waker_key = wakers.insert(Some(cx.waker().clone())); |
| 138 | + *waker_key = wakers.insert(Some(new_waker.clone())); |
136 | 139 | } else { |
137 | | - let waker_slot = &mut wakers[*waker_key]; |
138 | | - let needs_replacement = if let Some(_old_waker) = waker_slot { |
139 | | - // If there's still an unwoken waker in the slot, only replace |
140 | | - // if the current one wouldn't wake the same task. |
141 | | - // TODO: This API is currently not available, so replace always |
142 | | - // !waker.will_wake_nonlocal(old_waker) |
143 | | - true |
144 | | - } else { |
145 | | - true |
146 | | - }; |
147 | | - if needs_replacement { |
148 | | - *waker_slot = Some(cx.waker().clone()); |
| 140 | + match wakers[*waker_key] { |
| 141 | + Some(ref old_waker) if new_waker.will_wake(old_waker) => {} |
| 142 | + // Could use clone_from here, but Waker doesn't specialize it. |
| 143 | + ref mut slot => *slot = Some(new_waker.clone()), |
149 | 144 | } |
150 | 145 | } |
151 | 146 | debug_assert!(*waker_key != NULL_WAKER_KEY); |
@@ -184,7 +179,10 @@ where |
184 | 179 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
185 | 180 | let this = &mut *self; |
186 | 181 |
|
187 | | - let inner = this.inner.take().expect("Shared future polled again after completion"); |
| 182 | + let inner = this |
| 183 | + .inner |
| 184 | + .take() |
| 185 | + .expect("Shared future polled again after completion"); |
188 | 186 |
|
189 | 187 | // Fast path for when the wrapped future has already completed |
190 | 188 | if inner.notifier.state.load(Acquire) == COMPLETE { |
@@ -262,8 +260,7 @@ where |
262 | 260 | }; |
263 | 261 |
|
264 | 262 | unsafe { |
265 | | - *inner.future_or_output.get() = |
266 | | - FutureOrOutput::Output(output); |
| 263 | + *inner.future_or_output.get() = FutureOrOutput::Output(output); |
267 | 264 | } |
268 | 265 |
|
269 | 266 | inner.notifier.state.store(COMPLETE, SeqCst); |
|
0 commit comments