diff --git a/src/child.rs b/src/child.rs index 973af0c..24e5687 100644 --- a/src/child.rs +++ b/src/child.rs @@ -1,6 +1,8 @@ use crate::{info, warn}; use crate::{process, CmdResult, FunResult}; use os_pipe::PipeReader; +use std::any::Any; +use std::fmt::Display; use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result}; use std::process::{Child, ExitStatus}; use std::thread::JoinHandle; @@ -115,32 +117,58 @@ impl FunChildren { } } - /// Waits for the children processes to exit completely, pipe content will be processed by - /// provided function. - pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(Box)) -> CmdResult { - let child = self.children.pop().unwrap(); - let stderr_thread = - StderrThread::new(&child.cmd, &child.file, child.line, child.stderr, false); - match child.handle { - CmdChildHandle::Proc(mut proc) => { - if let Some(stdout) = child.stdout { - f(Box::new(stdout)); - let _ = proc.kill(); - } - } - CmdChildHandle::Thread(_) => { - if let Some(stdout) = child.stdout { - f(Box::new(stdout)); - } - } - CmdChildHandle::SyncFn => { - if let Some(stdout) = child.stdout { - f(Box::new(stdout)); + /// Pipes stdout from the last child in the pipeline to the given function, which runs in + /// the current thread, then waits for all of the children to exit. + /// + /// If the function returns early, without reading from stdout until the last child exits, + /// then the rest of stdout is automatically read and discarded to allow the child to finish. + pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(&mut Box)) -> CmdResult { + let mut last_child = self.children.pop().unwrap(); + let mut stderr_thread = StderrThread::new( + &last_child.cmd, + &last_child.file, + last_child.line, + last_child.stderr.take(), + false, + ); + let last_child_res = if let Some(stdout) = last_child.stdout { + let mut stdout: Box = Box::new(stdout); + f(&mut stdout); + // The provided function may have left some of stdout unread. + // Continue reading stdout on its behalf, until the child exits. + let mut buf = vec![0; 65536]; + let outcome: Box = loop { + match last_child.handle { + CmdChildHandle::Proc(ref mut child) => { + if let Some(result) = child.try_wait().transpose() { + break Box::new(ProcWaitOutcome::from(result)); + } + } + CmdChildHandle::Thread(ref mut join_handle) => { + if let Some(handle) = join_handle.take() { + if handle.is_finished() { + break Box::new(ThreadJoinOutcome::from(handle.join())); + } else { + join_handle.replace(handle); + } + } + } + CmdChildHandle::SyncFn => { + break Box::new(SyncFnOutcome); + } } - } + let _ = stdout.read(&mut buf); + }; + outcome.to_io_result(&last_child.cmd, &last_child.file, last_child.line) + } else { + last_child.wait(true) }; - drop(stderr_thread); - CmdChildren::wait_children(&mut self.children) + let other_children_res = CmdChildren::wait_children(&mut self.children); + let _ = stderr_thread.join(); + + self.ignore_error + .then_some(Ok(())) + .unwrap_or(last_child_res.and(other_children_res)) } /// Returns the OS-assigned process identifiers associated with these children processes. @@ -253,62 +281,108 @@ impl CmdChild { pub(crate) enum CmdChildHandle { Proc(Child), - Thread(JoinHandle), + Thread(Option>), SyncFn, } -impl CmdChildHandle { - fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult { - match self { - CmdChildHandle::Proc(mut proc) => { - let status = proc.wait(); - match status { - Err(e) => return Err(process::new_cmd_io_error(&e, cmd, file, line)), - Ok(status) => { - if !status.success() { - return Err(Self::status_to_io_error(status, cmd, file, line)); - } - } - } - } - CmdChildHandle::Thread(thread) => { - let status = thread.join(); - match status { - Ok(result) => { - if let Err(e) = result { - return Err(process::new_cmd_io_error(&e, cmd, file, line)); - } - } - Err(e) => { - return Err(Error::new( - ErrorKind::Other, - format!( - "Running [{cmd}] thread joined with error: {e:?} at {file}:{line}" - ), - )) - } +#[derive(Debug)] +struct ProcWaitOutcome(std::io::Result); +impl From> for ProcWaitOutcome { + fn from(result: std::io::Result) -> Self { + Self(result) + } +} +impl Display for ProcWaitOutcome { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.0 { + Ok(status) => { + if status.success() { + write!(f, "Command process succeeded") + } else if let Some(code) = status.code() { + write!(f, "Command process exited normally with status code {code}") + } else { + write!(f, "Command process exited abnormally: {status}") } } - CmdChildHandle::SyncFn => {} + Err(error) => write!(f, "Failed to wait for command process: {error:?}"), } - Ok(()) } - - fn status_to_io_error(status: ExitStatus, cmd: &str, file: &str, line: u32) -> Error { - if let Some(code) = status.code() { - Error::new( - ErrorKind::Other, - format!("Running [{cmd}] exited with error; status code: {code} at {file}:{line}"), - ) +} +#[derive(Debug)] +enum ThreadJoinOutcome { + Ok, + Err(std::io::Error), + Panic(Box), +} +impl From> for ThreadJoinOutcome { + fn from(result: std::thread::Result) -> Self { + match result { + Ok(Ok(())) => Self::Ok, + Ok(Err(error)) => Self::Err(error), + Err(panic) => Self::Panic(panic), + } + } +} +impl Display for ThreadJoinOutcome { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Ok => write!(f, "Command thread succeeded"), + Self::Err(error) => write!(f, "Command thread returned error: {error:?}"), + Self::Panic(panic) => write!(f, "Command thread panicked: {panic:?}"), + } + } +} +#[derive(Debug)] +struct SyncFnOutcome; +impl Display for SyncFnOutcome { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Command finished") + } +} +trait ChildOutcome: Display { + fn success(&self) -> bool; + fn to_io_result(&self, cmd: &str, file: &str, line: u32) -> std::io::Result<()> { + if self.success() { + Ok(()) } else { - Error::new( + Err(Error::new( ErrorKind::Other, - format!( - "Running [{cmd}] exited with error; terminated by {status} at {file}:{line}" - ), - ) + format!("Running [{cmd}] exited with error; {self} at {file}:{line}"), + )) } } +} +impl ChildOutcome for ProcWaitOutcome { + fn success(&self) -> bool { + self.0.as_ref().is_ok_and(|status| status.success()) + } +} +impl ChildOutcome for ThreadJoinOutcome { + fn success(&self) -> bool { + matches!(self, Self::Ok) + } +} +impl ChildOutcome for SyncFnOutcome { + fn success(&self) -> bool { + true + } +} + +impl CmdChildHandle { + fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult { + let outcome: Box = match self { + CmdChildHandle::Proc(mut proc) => Box::new(ProcWaitOutcome::from(proc.wait())), + CmdChildHandle::Thread(mut thread) => { + if let Some(thread) = thread.take() { + Box::new(ThreadJoinOutcome::from(thread.join())) + } else { + unreachable!() + } + } + CmdChildHandle::SyncFn => return Ok(()), + }; + outcome.to_io_result(cmd, file, line) + } fn kill(self, cmd: &str, file: &str, line: u32) -> CmdResult { match self { diff --git a/src/process.rs b/src/process.rs index e89caaa..a7551e1 100644 --- a/src/process.rs +++ b/src/process.rs @@ -460,7 +460,7 @@ impl Cmd { if pipe_out || with_output { let handle = thread::Builder::new().spawn(move || internal_cmd(&mut env))?; Ok(CmdChild::new( - CmdChildHandle::Thread(handle), + CmdChildHandle::Thread(Some(handle)), full_cmds, self.file, self.line, diff --git a/tests/test_macros.rs b/tests/test_macros.rs index ff4dbce..1ce95e6 100644 --- a/tests/test_macros.rs +++ b/tests/test_macros.rs @@ -217,6 +217,17 @@ fn test_pipe() { test_case!(true, true, ($macro $bang (ignore true | false)) $($after)*), test_case!(true, true, ($macro $bang (ignore false | true)) $($after)*), test_case!(true, true, ($macro $bang (ignore false | false)) $($after)*), + // Built-ins should work too, without locking up. + test_case!(true, true, ($macro $bang (echo)) $($after)*), + test_case!(true, true, ($macro $bang (echo | true)) $($after)*), + test_case!(false, false, ($macro $bang (echo | false)) $($after)*), + test_case!(true, true, ($macro $bang (true | echo)) $($after)*), + test_case!(false, true, ($macro $bang (false | echo)) $($after)*), + test_case!(true, true, ($macro $bang (cd /)) $($after)*), + test_case!(true, true, ($macro $bang (cd / | true)) $($after)*), + test_case!(false, false, ($macro $bang (cd / | false)) $($after)*), + test_case!(true, true, ($macro $bang (true | cd /)) $($after)*), + test_case!(false, true, ($macro $bang (false | cd /)) $($after)*), ] }; } @@ -233,10 +244,9 @@ fn test_pipe() { test_cases_for_entry_point!((spawn_with_output!(...)) .unwrap() .wait_with_raw_output(&mut vec![])), - // FIXME: wait_with_pipe() is currently busted - // test_cases_for_entry_point!((spawn_with_output!(...)) - // .unwrap() - // .wait_with_pipe(&mut |_stdout| {})), + test_cases_for_entry_point!((spawn_with_output!(...)) + .unwrap() + .wait_with_pipe(&mut |_stdout| {})), ]; macro_rules! check_eq {