Skip to content

Commit 8b5dbbb

Browse files
committed
Expose waitable handles in Windows
This commit allows waitable handles to be polled in Windows. This allows I/O constructs like processes, mutexes and waitable events be registered into the poller and be polled just like anything else. cc #25 Signed-off-by: John Nunley <[email protected]>
1 parent 6499077 commit 8b5dbbb

File tree

5 files changed

+287
-14
lines changed

5 files changed

+287
-14
lines changed

examples/windows-command.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//! Runs a command using waitable handles on Windows.
2+
//!
3+
//! Run with:
4+
//!
5+
//! ```
6+
//! cargo run --example windows-command
7+
//! ```
8+
9+
#[cfg(windows)]
10+
fn main() -> std::io::Result<()> {
11+
use async_io::os::windows::Waitable;
12+
use std::process::Command;
13+
14+
futures_lite::future::block_on(async {
15+
// Spawn a process.
16+
let process = Command::new("cmd")
17+
.args(["/C", "echo hello"])
18+
.spawn()
19+
.expect("failed to spawn process");
20+
21+
// Wrap the process in an `Async` object that waits for it to exit.
22+
let process = Waitable::new(process)?;
23+
24+
// Wait for the process to exit.
25+
process.ready().await?;
26+
27+
Ok(())
28+
})
29+
}
30+
31+
#[cfg(not(windows))]
32+
fn main() {
33+
println!("This example is only supported on Windows.");
34+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
//! # std::io::Result::Ok(()) });
5454
//! ```
5555
56+
#![allow(clippy::needless_pass_by_ref_mut)]
5657
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
5758
#![doc(
5859
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"

src/os.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ pub mod unix;
1414
target_os = "dragonfly",
1515
))]
1616
pub mod kqueue;
17+
18+
#[cfg(windows)]
19+
pub mod windows;

src/os/windows.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
//! Functionality that is only available on Windows.
2+
3+
use crate::reactor::{Reactor, Readable, Registration};
4+
use crate::Async;
5+
6+
use std::convert::TryFrom;
7+
use std::future::Future;
8+
use std::io::{self, Result};
9+
use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, OwnedHandle, RawHandle};
10+
use std::pin::Pin;
11+
use std::task::{Context, Poll};
12+
13+
/// A waitable handle registered in the reactor.
14+
///
15+
/// Some handles in Windows are “waitable”, which means that they emit a “readiness” signal after some event occurs. This function can be used to wait for such events to occur on a handle. This function can be used in addition to regular socket polling.
16+
///
17+
/// Waitable objects include the following:
18+
///
19+
/// - Console inputs
20+
/// - Waitable events
21+
/// - Mutexes
22+
/// - Processes
23+
/// - Semaphores
24+
/// - Threads
25+
/// - Timer
26+
///
27+
/// This structure can be used to wait for any of these objects to become ready.
28+
///
29+
/// ## Implementation
30+
///
31+
/// The current implementation waits on the handle by registering it in the application-global
32+
/// Win32 threadpool. However, in the futur it may be possible to migrate to an implementation
33+
/// on Windows 10 that uses a mechanism similar to [`MsgWaitForMultipleObjectsEx`].
34+
///
35+
/// [`MsgWaitForMultipleObjectsEx`]: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-msgwaitformultipleobjectsex
36+
///
37+
/// ## Caveats
38+
///
39+
/// Read the documentation for the [`Async`](crate::Async) type for more information regarding the
40+
/// abilities and caveats with using this type.
41+
#[derive(Debug)]
42+
pub struct Waitable<T>(Async<T>);
43+
44+
impl<T> AsRef<T> for Waitable<T> {
45+
fn as_ref(&self) -> &T {
46+
self.0.as_ref()
47+
}
48+
}
49+
50+
impl<T: AsHandle> Waitable<T> {
51+
/// Create a new [`Waitable`] around a waitable handle.
52+
///
53+
/// # Examples
54+
///
55+
/// ```no_run
56+
/// use std::process::Command;
57+
/// use async_io::os::windows::Waitable;
58+
///
59+
/// // Create a new process to wait for.
60+
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
61+
///
62+
/// // Wrap the process in an `Async` object that waits for it to exit.
63+
/// let process = Waitable::new(child).unwrap();
64+
///
65+
/// // Wait for the process to exit.
66+
/// # async_io::block_on(async {
67+
/// process.ready().await.unwrap();
68+
/// # });
69+
/// ```
70+
pub fn new(handle: T) -> Result<Self> {
71+
Ok(Self(Async {
72+
source: Reactor::get().insert_io(unsafe { Registration::new_waitable(&handle) })?,
73+
io: Some(handle),
74+
}))
75+
}
76+
}
77+
78+
impl<T: AsRawHandle> AsRawHandle for Waitable<T> {
79+
fn as_raw_handle(&self) -> RawHandle {
80+
self.get_ref().as_raw_handle()
81+
}
82+
}
83+
84+
impl<T: AsHandle> AsHandle for Waitable<T> {
85+
fn as_handle(&self) -> BorrowedHandle<'_> {
86+
self.get_ref().as_handle()
87+
}
88+
}
89+
90+
impl<T: AsHandle + From<OwnedHandle>> TryFrom<OwnedHandle> for Waitable<T> {
91+
type Error = io::Error;
92+
93+
fn try_from(handle: OwnedHandle) -> Result<Self> {
94+
Self::new(handle.into())
95+
}
96+
}
97+
98+
impl<T: Into<OwnedHandle>> TryFrom<Waitable<T>> for OwnedHandle {
99+
type Error = io::Error;
100+
101+
fn try_from(value: Waitable<T>) -> std::result::Result<Self, Self::Error> {
102+
value.into_inner().map(|handle| handle.into())
103+
}
104+
}
105+
106+
impl<T> Waitable<T> {
107+
/// Get a reference to the inner handle.
108+
pub fn get_ref(&self) -> &T {
109+
self.0.get_ref()
110+
}
111+
112+
/// Get a mutable reference to the inner handle.
113+
///
114+
/// # Safety
115+
///
116+
/// The underlying I/O source must not be dropped or moved out using this function.
117+
pub unsafe fn get_mut(&mut self) -> &mut T {
118+
self.0.get_mut()
119+
}
120+
121+
/// Consumes the [`Waitable`], returning the inner handle.
122+
pub fn into_inner(self) -> Result<T> {
123+
self.0.into_inner()
124+
}
125+
126+
/// Waits until the [`Waitable`] object is ready.
127+
///
128+
/// This method completes when the underlying [`Waitable`] object has completed. See the documentation
129+
/// for the [`Waitable`] object for more information.
130+
///
131+
/// # Examples
132+
///
133+
/// ```no_run
134+
/// use std::process::Command;
135+
/// use async_io::os::windows::Waitable;
136+
///
137+
/// # futures_lite::future::block_on(async {
138+
/// let child = Command::new("sleep").arg("5").spawn()?;
139+
/// let process = Waitable::new(child)?;
140+
///
141+
/// // Wait for the process to exit.
142+
/// process.ready().await?;
143+
/// # std::io::Result::Ok(()) });
144+
/// ```
145+
pub fn ready(&self) -> Ready<'_, T> {
146+
Ready(self.0.readable())
147+
}
148+
149+
/// Polls the I/O handle for readiness.
150+
///
151+
/// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
152+
/// that the underlying [`Waitable`] object is ready. See the documentation for the [`Waitable`]
153+
/// object for more information.
154+
///
155+
/// # Caveats
156+
///
157+
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
158+
/// will just keep waking each other in turn, thus wasting CPU time.
159+
///
160+
/// # Examples
161+
///
162+
/// ```no_run
163+
/// use std::process::Command;
164+
/// use async_io::os::windows::Waitable;
165+
/// use futures_lite::future;
166+
///
167+
/// # futures_lite::future::block_on(async {
168+
/// let child = Command::new("sleep").arg("5").spawn()?;
169+
/// let process = Waitable::new(child)?;
170+
///
171+
/// // Wait for the process to exit.
172+
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
173+
/// # std::io::Result::Ok(()) });
174+
/// ```
175+
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
176+
self.0.poll_readable(cx)
177+
}
178+
}
179+
180+
/// Future for [`Filter::ready`].
181+
#[must_use = "futures do nothing unless you `.await` or poll them"]
182+
#[derive(Debug)]
183+
pub struct Ready<'a, T>(Readable<'a, T>);
184+
185+
impl<T> Future for Ready<'_, T> {
186+
type Output = Result<()>;
187+
188+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189+
Pin::new(&mut self.0).poll(cx)
190+
}
191+
}

src/reactor/windows.rs

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,43 @@
11
// SPDX-License-Identifier: MIT OR Apache-2.0
22

3-
use polling::{Event, Poller};
3+
use polling::os::iocp::PollerIocpExt;
4+
use polling::{Event, PollMode, Poller};
45
use std::fmt;
56
use std::io::Result;
6-
use std::os::windows::io::{AsRawSocket, BorrowedSocket, RawSocket};
7+
use std::os::windows::io::{
8+
AsHandle, AsRawHandle, AsRawSocket, AsSocket, BorrowedHandle, BorrowedSocket, RawHandle,
9+
RawSocket,
10+
};
711

812
/// The raw registration into the reactor.
913
#[doc(hidden)]
10-
pub struct Registration {
14+
pub enum Registration {
1115
/// Raw socket handle on Windows.
1216
///
1317
/// # Invariant
1418
///
1519
/// This describes a valid socket that has not been `close`d. It will not be
1620
/// closed while this object is alive.
17-
raw: RawSocket,
21+
Socket(RawSocket),
22+
23+
/// Waitable handle for Windows.
24+
///
25+
/// # Invariant
26+
///
27+
/// This describes a valid waitable handle that has not been `close`d. It will not be
28+
/// closed while this object is alive.
29+
Handle(RawHandle),
1830
}
1931

32+
unsafe impl Send for Registration {}
33+
unsafe impl Sync for Registration {}
34+
2035
impl fmt::Debug for Registration {
2136
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22-
fmt::Debug::fmt(&self.raw, f)
37+
match self {
38+
Self::Socket(raw) => fmt::Debug::fmt(raw, f),
39+
Self::Handle(handle) => fmt::Debug::fmt(handle, f),
40+
}
2341
}
2442
}
2543

@@ -29,32 +47,58 @@ impl Registration {
2947
/// # Safety
3048
///
3149
/// The provided file descriptor must be valid and not be closed while this object is alive.
32-
pub(crate) unsafe fn new(f: BorrowedSocket<'_>) -> Self {
33-
Self {
34-
raw: f.as_raw_socket(),
35-
}
50+
pub(crate) unsafe fn new(f: impl AsSocket) -> Self {
51+
Self::Socket(f.as_socket().as_raw_socket())
52+
}
53+
54+
/// Create a new [`Registration`] around a waitable handle.
55+
///
56+
/// # Safety
57+
///
58+
/// The provided handle must be valid and not be closed while this object is alive.
59+
pub(crate) unsafe fn new_waitable(f: impl AsHandle) -> Self {
60+
Self::Handle(f.as_handle().as_raw_handle())
3661
}
3762

3863
/// Registers the object into the reactor.
3964
#[inline]
4065
pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> {
4166
// SAFETY: This object's existence validates the invariants of Poller::add
42-
unsafe { poller.add(self.raw, Event::none(token)) }
67+
unsafe {
68+
match self {
69+
Self::Socket(raw) => poller.add(*raw, Event::none(token)),
70+
Self::Handle(handle) => {
71+
poller.add_waitable(*handle, Event::none(token), PollMode::Oneshot)
72+
}
73+
}
74+
}
4375
}
4476

4577
/// Re-registers the object into the reactor.
4678
#[inline]
4779
pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> {
4880
// SAFETY: self.raw is a valid file descriptor
49-
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
50-
poller.modify(fd, interest)
81+
match self {
82+
Self::Socket(raw) => {
83+
poller.modify(unsafe { BorrowedSocket::borrow_raw(*raw) }, interest)
84+
}
85+
Self::Handle(handle) => poller.modify_waitable(
86+
unsafe { BorrowedHandle::borrow_raw(*handle) },
87+
interest,
88+
PollMode::Oneshot,
89+
),
90+
}
5191
}
5292

5393
/// Deregisters the object from the reactor.
5494
#[inline]
5595
pub(crate) fn delete(&self, poller: &Poller) -> Result<()> {
5696
// SAFETY: self.raw is a valid file descriptor
57-
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
58-
poller.delete(fd)
97+
match self {
98+
Self::Socket(raw) => poller.delete(unsafe { BorrowedSocket::borrow_raw(*raw) }),
99+
Self::Handle(handle) => {
100+
poller.remove_waitable(unsafe { BorrowedHandle::borrow_raw(*handle) })
101+
}
102+
}
59103
}
60104
}

0 commit comments

Comments
 (0)