|
| 1 | +use crate::convert::TryFrom; |
| 2 | +use crate::ptr; |
| 3 | +use crate::sync::atomic::{ |
| 4 | + AtomicI8, AtomicUsize, |
| 5 | + Ordering::{Acquire, Relaxed, Release}, |
| 6 | +}; |
| 7 | +use crate::sys::{c, dur2timeout}; |
| 8 | +use crate::time::Duration; |
| 9 | + |
| 10 | +pub struct Parker { |
| 11 | + state: AtomicI8, |
| 12 | +} |
| 13 | + |
| 14 | +const PARKED: i8 = -1; |
| 15 | +const EMPTY: i8 = 0; |
| 16 | +const NOTIFIED: i8 = 1; |
| 17 | + |
| 18 | +// Notes about memory ordering: |
| 19 | +// |
| 20 | +// Memory ordering is only relevant for the relative ordering of operations |
| 21 | +// between different variables. Even Ordering::Relaxed guarantees a |
| 22 | +// monotonic/consistent order when looking at just a single atomic variable. |
| 23 | +// |
| 24 | +// So, since this parker is just a single atomic variable, we only need to look |
| 25 | +// at the ordering guarantees we need to provide to the 'outside world'. |
| 26 | +// |
| 27 | +// The only memory ordering guarantee that parking and unparking provide, is |
| 28 | +// that things which happened before unpark() are visible on the thread |
| 29 | +// returning from park() afterwards. Otherwise, it was effectively unparked |
| 30 | +// before unpark() was called while still consuming the 'token'. |
| 31 | +// |
| 32 | +// In other words, unpark() needs to synchronize with the part of park() that |
| 33 | +// consumes the token and returns. |
| 34 | +// |
| 35 | +// This is done with a release-acquire synchronization, by using |
| 36 | +// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using |
| 37 | +// Ordering::Acquire when checking for this state in park(). |
| 38 | +impl Parker { |
| 39 | + pub fn new() -> Self { |
| 40 | + Self { state: AtomicI8::new(EMPTY) } |
| 41 | + } |
| 42 | + |
| 43 | + // Assumes this is only called by the thread that owns the Parker, |
| 44 | + // which means that `self.state != PARKED`. |
| 45 | + pub unsafe fn park(&self) { |
| 46 | + // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the |
| 47 | + // first case. |
| 48 | + if self.state.fetch_sub(1, Acquire) == NOTIFIED { |
| 49 | + return; |
| 50 | + } |
| 51 | + |
| 52 | + loop { |
| 53 | + // Wait for something to happen. |
| 54 | + if c::WaitOnAddress::is_available() { |
| 55 | + c::WaitOnAddress(self.ptr(), &PARKED as *const _ as c::LPVOID, 1, c::INFINITE); |
| 56 | + } else { |
| 57 | + c::NtWaitForKeyedEvent(keyed_event_handle(), self.ptr(), 0, ptr::null_mut()); |
| 58 | + } |
| 59 | + // Change NOTIFIED=>EMPTY and return in that case. |
| 60 | + if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED { |
| 61 | + return; |
| 62 | + } else { |
| 63 | + // Spurious wake up. We loop to try again. |
| 64 | + } |
| 65 | + } |
| 66 | + } |
| 67 | + |
| 68 | + // Assumes this is only called by the thread that owns the Parker, |
| 69 | + // which means that `self.state != PARKED`. |
| 70 | + pub unsafe fn park_timeout(&self, timeout: Duration) { |
| 71 | + // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the |
| 72 | + // first case. |
| 73 | + if self.state.fetch_sub(1, Acquire) == NOTIFIED { |
| 74 | + return; |
| 75 | + } |
| 76 | + |
| 77 | + if c::WaitOnAddress::is_available() { |
| 78 | + // Wait for something to happen, assuming it's still set to PARKED. |
| 79 | + c::WaitOnAddress(self.ptr(), &PARKED as *const _ as c::LPVOID, 1, dur2timeout(timeout)); |
| 80 | + // Change NOTIFIED=>EMPTY and return in that case. |
| 81 | + if self.state.swap(EMPTY, Acquire) == NOTIFIED { |
| 82 | + return; |
| 83 | + } else { |
| 84 | + // Timeout or spurious wake up. |
| 85 | + // We return either way, because we can't easily tell if it was the |
| 86 | + // timeout or not. |
| 87 | + } |
| 88 | + } else { |
| 89 | + // Need to wait for unpark() using NtWaitForKeyedEvent. |
| 90 | + let handle = keyed_event_handle(); |
| 91 | + |
| 92 | + // NtWaitForKeyedEvent uses a unit of 100ns, and uses negative values |
| 93 | + // to indicate the monotonic clock. |
| 94 | + let mut timeout = match i64::try_from((timeout.as_nanos() + 99) / 100) { |
| 95 | + Ok(t) => -t, |
| 96 | + Err(_) => i64::MIN, |
| 97 | + }; |
| 98 | + |
| 99 | + // Wait for unpark() to produce this event. |
| 100 | + if c::NtWaitForKeyedEvent(handle, self.ptr(), 0, &mut timeout) == c::STATUS_SUCCESS { |
| 101 | + // Awoken by another thread. |
| 102 | + self.state.swap(EMPTY, Acquire); |
| 103 | + } else { |
| 104 | + // Not awoken by another thread (spurious or timeout). |
| 105 | + if self.state.swap(EMPTY, Acquire) == NOTIFIED { |
| 106 | + // If the state is NOTIFIED, we *just* missed an unpark(), |
| 107 | + // which is now waiting for us to wait for it. |
| 108 | + // Wait for it to consume the event and unblock it. |
| 109 | + c::NtWaitForKeyedEvent(handle, self.ptr(), 0, ptr::null_mut()); |
| 110 | + } |
| 111 | + } |
| 112 | + } |
| 113 | + } |
| 114 | + |
| 115 | + pub fn unpark(&self) { |
| 116 | + // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and |
| 117 | + // wake the thread in the first case. |
| 118 | + // |
| 119 | + // Note that even NOTIFIED=>NOTIFIED results in a write. This is on |
| 120 | + // purpose, to make sure every unpark() has a release-acquire ordering |
| 121 | + // with park(). |
| 122 | + if self.state.swap(NOTIFIED, Release) == PARKED { |
| 123 | + if c::WakeByAddressSingle::is_available() { |
| 124 | + unsafe { |
| 125 | + c::WakeByAddressSingle(self.ptr()); |
| 126 | + } |
| 127 | + } else { |
| 128 | + // If we run NtReleaseKeyedEvent before the waiting thread runs |
| 129 | + // NtWaitForKeyedEvent, this (shortly) blocks until we can wake it up. |
| 130 | + // If the waiting thread wakes up before we run NtReleaseKeyedEvent |
| 131 | + // (e.g. due to a timeout), this blocks until we do wake up a thread. |
| 132 | + // To prevent this thread from blocking indefinitely in that case, |
| 133 | + // park_impl() will, after seeing the state set to NOTIFIED after |
| 134 | + // waking up, call NtWaitForKeyedEvent again to unblock us. |
| 135 | + unsafe { |
| 136 | + c::NtReleaseKeyedEvent(keyed_event_handle(), self.ptr(), 0, ptr::null_mut()); |
| 137 | + } |
| 138 | + } |
| 139 | + } |
| 140 | + } |
| 141 | + |
| 142 | + fn ptr(&self) -> c::LPVOID { |
| 143 | + &self.state as *const _ as c::LPVOID |
| 144 | + } |
| 145 | +} |
| 146 | + |
| 147 | +fn keyed_event_handle() -> c::HANDLE { |
| 148 | + const INVALID: usize = !0; |
| 149 | + static HANDLE: AtomicUsize = AtomicUsize::new(INVALID); |
| 150 | + match HANDLE.load(Relaxed) { |
| 151 | + INVALID => { |
| 152 | + let mut handle = c::INVALID_HANDLE_VALUE; |
| 153 | + unsafe { |
| 154 | + match c::NtCreateKeyedEvent( |
| 155 | + &mut handle, |
| 156 | + c::GENERIC_READ | c::GENERIC_WRITE, |
| 157 | + ptr::null_mut(), |
| 158 | + 0, |
| 159 | + ) { |
| 160 | + c::STATUS_SUCCESS => {} |
| 161 | + r => panic!("Unable to create keyed event handle: error {}", r), |
| 162 | + } |
| 163 | + } |
| 164 | + match HANDLE.compare_exchange(INVALID, handle as usize, Relaxed, Relaxed) { |
| 165 | + Ok(_) => handle, |
| 166 | + Err(h) => { |
| 167 | + // Lost the race to another thread initializing HANDLE before we did. |
| 168 | + // Closing our handle and using theirs instead. |
| 169 | + unsafe { |
| 170 | + c::CloseHandle(handle); |
| 171 | + } |
| 172 | + h as c::HANDLE |
| 173 | + } |
| 174 | + } |
| 175 | + } |
| 176 | + handle => handle as c::HANDLE, |
| 177 | + } |
| 178 | +} |
0 commit comments