From ad7201c56b5339a8046f967366980bcce78b613f Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 12 Oct 2024 15:52:22 +0200 Subject: [PATCH 1/5] refactor: remove child ownership from Exit filter Pid is used instead --- src/os/kqueue.rs | 12 ++++++------ src/reactor/kqueue.rs | 18 +++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index bb7237b..d989b81 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -7,9 +7,9 @@ use crate::Async; use std::future::Future; use std::io::{Error, Result}; +use std::num::NonZeroI32; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; use std::pin::Pin; -use std::process::Child; use std::task::{Context, Poll}; /// A wrapper around a queueable object that waits until it is ready. @@ -233,23 +233,23 @@ impl QueueableSealed for Signal { } impl Queueable for Signal {} -/// Wait for a child process to exit. +/// Wait for a process to exit. /// /// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter), /// it will return a [`readable`](crate::Async::readable) event when the child process exits. #[derive(Debug)] -pub struct Exit(Option); +pub struct Exit(NonZeroI32); impl Exit { /// Create a new `Exit` object. - pub fn new(child: Child) -> Self { - Self(Some(child)) + pub fn new(pid: NonZeroI32) -> Self { + Self(pid) } } impl QueueableSealed for Exit { fn registration(&mut self) -> Registration { - Registration::Process(self.0.take().expect("Cannot reregister child")) + Registration::Process(self.0) } } impl Queueable for Exit {} diff --git a/src/reactor/kqueue.rs b/src/reactor/kqueue.rs index c1fa0b2..2bfc14b 100644 --- a/src/reactor/kqueue.rs +++ b/src/reactor/kqueue.rs @@ -7,8 +7,8 @@ use polling::{Event, PollMode, Poller}; use std::fmt; use std::io::Result; +use std::num::NonZeroI32; use std::os::unix::io::{AsRawFd, BorrowedFd, RawFd}; -use std::process::Child; /// The raw registration into the reactor. /// @@ -27,8 +27,8 @@ pub enum Registration { /// Raw signal number for signal delivery. Signal(Signal), - /// Process for process termination. - Process(Child), + /// Pid for process termination. + Process(NonZeroI32), } impl fmt::Debug for Registration { @@ -62,8 +62,8 @@ impl Registration { Self::Signal(signal) => { poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot) } - Self::Process(process) => poller.add_filter( - unsafe { Process::new(process, ProcessOps::Exit) }, + Self::Process(pid) => poller.add_filter( + unsafe { Process::from_pid(*pid, ProcessOps::Exit) }, token, PollMode::Oneshot, ), @@ -82,8 +82,8 @@ impl Registration { Self::Signal(signal) => { poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot) } - Self::Process(process) => poller.modify_filter( - unsafe { Process::new(process, ProcessOps::Exit) }, + Self::Process(pid) => poller.modify_filter( + unsafe { Process::from_pid(*pid, ProcessOps::Exit) }, interest.key, PollMode::Oneshot, ), @@ -100,8 +100,8 @@ impl Registration { poller.delete(fd) } Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)), - Self::Process(process) => { - poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) }) + Self::Process(pid) => { + poller.delete_filter(unsafe { Process::from_pid(*pid, ProcessOps::Exit) }) } } } From 374ebbbdb4e3c0665a981b1dd8bdec81b96a5cc9 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 12 Oct 2024 16:09:36 +0200 Subject: [PATCH 2/5] fix: use pid for exit example --- examples/kqueue-process.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/examples/kqueue-process.rs b/examples/kqueue-process.rs index 9cdae1c..aa7fb51 100644 --- a/examples/kqueue-process.rs +++ b/examples/kqueue-process.rs @@ -14,24 +14,34 @@ target_os = "dragonfly", ))] fn main() -> std::io::Result<()> { - use std::process::Command; + use std::{num::NonZeroI32, process::Command}; use async_io::os::kqueue::{Exit, Filter}; use futures_lite::future; future::block_on(async { // Spawn a process. - let process = Command::new("sleep") + let mut process = Command::new("sleep") .arg("3") .spawn() .expect("failed to spawn process"); // Wrap the process in an `Async` object that waits for it to exit. - let process = Filter::new(Exit::new(process))?; + let process_handle = Filter::new(Exit::new( + NonZeroI32::new(process.id().try_into().expect("invalid process pid")) + .expect("non zero pid"), + ))?; // Wait for the process to exit. - process.ready().await?; + process_handle.ready().await?; + println!( + "Process exit code {:?}", + process + .try_wait() + .expect("error while waiting process") + .expect("process did not exit yet") + ); Ok(()) }) } From 1fedf0378a31e125abbeb8c7a8e69638183a7092 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sat, 12 Oct 2024 16:29:28 +0200 Subject: [PATCH 3/5] docs: fix Exit examples --- src/os/kqueue.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index d989b81..efb7eed 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -41,13 +41,14 @@ impl Filter { /// /// ```no_run /// use std::process::Command; + /// use std::num::NonZeroI32; /// use async_io::os::kqueue::{Exit, Filter}; /// /// // Create a new process to wait for. /// let mut child = Command::new("sleep").arg("5").spawn().unwrap(); /// /// // Wrap the process in an `Async` object that waits for it to exit. - /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); /// /// // Wait for the process to exit. /// # async_io::block_on(async { @@ -97,10 +98,11 @@ impl Filter { /// /// ``` /// use async_io::os::kqueue::{Exit, Filter}; + /// use std::num::NonZeroI32; /// /// # futures_lite::future::block_on(async { /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); - /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); /// let inner = process.get_ref(); /// # }); /// ``` @@ -117,10 +119,11 @@ impl Filter { /// /// ``` /// use async_io::os::kqueue::{Exit, Filter}; + /// use std::num::NonZeroI32; /// /// # futures_lite::future::block_on(async { /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); - /// let mut process = Filter::new(Exit::new(child)).unwrap(); + /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); /// let inner = process.get_mut(); /// # }); /// ``` @@ -134,10 +137,11 @@ impl Filter { /// /// ``` /// use async_io::os::kqueue::{Exit, Filter}; + /// use std::num::NonZeroI32; /// /// # futures_lite::future::block_on(async { /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); - /// let process = Filter::new(Exit::new(child)).unwrap(); + /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); /// let inner = process.into_inner().unwrap(); /// # }); /// ``` @@ -153,12 +157,13 @@ impl Filter { /// # Examples /// /// ```no_run + /// use std::num::NonZeroI32; /// use std::process::Command; /// use async_io::os::kqueue::{Exit, Filter}; /// /// # futures_lite::future::block_on(async { /// let child = Command::new("sleep").arg("5").spawn()?; - /// let process = Filter::new(Exit::new(child))?; + /// let process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); /// /// // Wait for the process to exit. /// process.ready().await?; @@ -182,13 +187,14 @@ impl Filter { /// # Examples /// /// ```no_run + /// use std::num::NonZeroI32; /// use std::process::Command; /// use async_io::os::kqueue::{Exit, Filter}; /// use futures_lite::future; /// /// # futures_lite::future::block_on(async { /// let child = Command::new("sleep").arg("5").spawn()?; - /// let process = Filter::new(Exit::new(child))?; + /// let process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); /// /// // Wait for the process to exit. /// future::poll_fn(|cx| process.poll_ready(cx)).await?; From 430846412d260388f5375f4e2aec94aa907444c6 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Sun, 13 Oct 2024 11:59:29 +0200 Subject: [PATCH 4/5] feat: add new from_pid constructor --- examples/kqueue-process.rs | 10 ++++++---- src/os/kqueue.rs | 35 +++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/examples/kqueue-process.rs b/examples/kqueue-process.rs index aa7fb51..9c1d96d 100644 --- a/examples/kqueue-process.rs +++ b/examples/kqueue-process.rs @@ -27,10 +27,12 @@ fn main() -> std::io::Result<()> { .expect("failed to spawn process"); // Wrap the process in an `Async` object that waits for it to exit. - let process_handle = Filter::new(Exit::new( - NonZeroI32::new(process.id().try_into().expect("invalid process pid")) - .expect("non zero pid"), - ))?; + let process_handle = unsafe { + Filter::new(Exit::from_pid( + NonZeroI32::new(process.id().try_into().expect("invalid process pid")) + .expect("non zero pid"), + ))? + }; // Wait for the process to exit. process_handle.ready().await?; diff --git a/src/os/kqueue.rs b/src/os/kqueue.rs index efb7eed..8318363 100644 --- a/src/os/kqueue.rs +++ b/src/os/kqueue.rs @@ -10,6 +10,7 @@ use std::io::{Error, Result}; use std::num::NonZeroI32; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; use std::pin::Pin; +use std::process::Child; use std::task::{Context, Poll}; /// A wrapper around a queueable object that waits until it is ready. @@ -41,14 +42,13 @@ impl Filter { /// /// ```no_run /// use std::process::Command; - /// use std::num::NonZeroI32; /// use async_io::os::kqueue::{Exit, Filter}; /// /// // Create a new process to wait for. /// let mut child = Command::new("sleep").arg("5").spawn().unwrap(); /// /// // Wrap the process in an `Async` object that waits for it to exit. - /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); /// /// // Wait for the process to exit. /// # async_io::block_on(async { @@ -98,11 +98,10 @@ impl Filter { /// /// ``` /// use async_io::os::kqueue::{Exit, Filter}; - /// use std::num::NonZeroI32; /// /// # futures_lite::future::block_on(async { /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); - /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); /// let inner = process.get_ref(); /// # }); /// ``` @@ -119,11 +118,10 @@ impl Filter { /// /// ``` /// use async_io::os::kqueue::{Exit, Filter}; - /// use std::num::NonZeroI32; /// /// # futures_lite::future::block_on(async { /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); - /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); + /// let mut process = Filter::new(Exit::new(child)).unwrap(); /// let inner = process.get_mut(); /// # }); /// ``` @@ -137,11 +135,10 @@ impl Filter { /// /// ``` /// use async_io::os::kqueue::{Exit, Filter}; - /// use std::num::NonZeroI32; /// /// # futures_lite::future::block_on(async { /// let child = std::process::Command::new("sleep").arg("5").spawn().unwrap(); - /// let mut process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); + /// let process = Filter::new(Exit::new(child)).unwrap(); /// let inner = process.into_inner().unwrap(); /// # }); /// ``` @@ -157,13 +154,12 @@ impl Filter { /// # Examples /// /// ```no_run - /// use std::num::NonZeroI32; /// use std::process::Command; /// use async_io::os::kqueue::{Exit, Filter}; /// /// # futures_lite::future::block_on(async { /// let child = Command::new("sleep").arg("5").spawn()?; - /// let process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); + /// let process = Filter::new(Exit::new(child))?; /// /// // Wait for the process to exit. /// process.ready().await?; @@ -187,14 +183,13 @@ impl Filter { /// # Examples /// /// ```no_run - /// use std::num::NonZeroI32; /// use std::process::Command; /// use async_io::os::kqueue::{Exit, Filter}; /// use futures_lite::future; /// /// # futures_lite::future::block_on(async { /// let child = Command::new("sleep").arg("5").spawn()?; - /// let process = Filter::new(Exit::new(NonZeroI32::new(child.id().try_into().unwrap()).unwrap())).unwrap(); + /// let process = Filter::new(Exit::new(child))?; /// /// // Wait for the process to exit. /// future::poll_fn(|cx| process.poll_ready(cx)).await?; @@ -239,7 +234,7 @@ impl QueueableSealed for Signal { } impl Queueable for Signal {} -/// Wait for a process to exit. +/// Wait for a child process to exit. /// /// When registered into [`Async`](crate::Async) via [`with_filter`](AsyncKqueueExt::with_filter), /// it will return a [`readable`](crate::Async::readable) event when the child process exits. @@ -248,7 +243,19 @@ pub struct Exit(NonZeroI32); impl Exit { /// Create a new `Exit` object. - pub fn new(pid: NonZeroI32) -> Self { + pub fn new(child: Child) -> Self { + Self( + NonZeroI32::new(child.id().try_into().expect("unable to parse pid")) + .expect("cannot register pid with zero value"), + ) + } + + /// Create a new `Exit` object from a PID. + /// + /// # Safety + /// + /// The PID must be tied to an actual child process. + pub unsafe fn from_pid(pid: NonZeroI32) -> Self { Self(pid) } } From 610773a62c852404514c797884696bc1f4201c91 Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Mon, 14 Oct 2024 08:29:45 +0200 Subject: [PATCH 5/5] rollback to basic kqueue example --- examples/kqueue-process.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/examples/kqueue-process.rs b/examples/kqueue-process.rs index 9c1d96d..9cdae1c 100644 --- a/examples/kqueue-process.rs +++ b/examples/kqueue-process.rs @@ -14,36 +14,24 @@ target_os = "dragonfly", ))] fn main() -> std::io::Result<()> { - use std::{num::NonZeroI32, process::Command}; + use std::process::Command; use async_io::os::kqueue::{Exit, Filter}; use futures_lite::future; future::block_on(async { // Spawn a process. - let mut process = Command::new("sleep") + let process = Command::new("sleep") .arg("3") .spawn() .expect("failed to spawn process"); // Wrap the process in an `Async` object that waits for it to exit. - let process_handle = unsafe { - Filter::new(Exit::from_pid( - NonZeroI32::new(process.id().try_into().expect("invalid process pid")) - .expect("non zero pid"), - ))? - }; + let process = Filter::new(Exit::new(process))?; // Wait for the process to exit. - process_handle.ready().await?; + process.ready().await?; - println!( - "Process exit code {:?}", - process - .try_wait() - .expect("error while waiting process") - .expect("process did not exit yet") - ); Ok(()) }) }