Skip to content

Fix wait_with_pipe() so it waits for last child and uses exit status #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 143 additions & 69 deletions src/child.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{info, warn};
use crate::{process, CmdResult, FunResult};
use os_pipe::PipeReader;
use std::any::Any;
use std::fmt::Display;
use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result};
use std::process::{Child, ExitStatus};
use std::thread::JoinHandle;
Expand Down Expand Up @@ -115,32 +117,58 @@ impl FunChildren {
}
}

/// Waits for the children processes to exit completely, pipe content will be processed by
/// provided function.
pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(Box<dyn Read>)) -> CmdResult {
let child = self.children.pop().unwrap();
let stderr_thread =
StderrThread::new(&child.cmd, &child.file, child.line, child.stderr, false);
match child.handle {
CmdChildHandle::Proc(mut proc) => {
if let Some(stdout) = child.stdout {
f(Box::new(stdout));
let _ = proc.kill();
}
}
CmdChildHandle::Thread(_) => {
if let Some(stdout) = child.stdout {
f(Box::new(stdout));
}
}
CmdChildHandle::SyncFn => {
if let Some(stdout) = child.stdout {
f(Box::new(stdout));
/// Pipes stdout from the last child in the pipeline to the given function, which runs in
/// the current thread, then waits for all of the children to exit.
///
/// If the function returns early, without reading from stdout until the last child exits,
/// then the rest of stdout is automatically read and discarded to allow the child to finish.
pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(&mut Box<dyn Read>)) -> CmdResult {
let mut last_child = self.children.pop().unwrap();
let mut stderr_thread = StderrThread::new(
&last_child.cmd,
&last_child.file,
last_child.line,
last_child.stderr.take(),
false,
);
let last_child_res = if let Some(stdout) = last_child.stdout {
let mut stdout: Box<dyn Read> = Box::new(stdout);
f(&mut stdout);
// The provided function may have left some of stdout unread.
// Continue reading stdout on its behalf, until the child exits.
let mut buf = vec![0; 65536];
let outcome: Box<dyn ChildOutcome> = loop {
match last_child.handle {
CmdChildHandle::Proc(ref mut child) => {
if let Some(result) = child.try_wait().transpose() {
break Box::new(ProcWaitOutcome::from(result));
}
}
CmdChildHandle::Thread(ref mut join_handle) => {
if let Some(handle) = join_handle.take() {
if handle.is_finished() {
break Box::new(ThreadJoinOutcome::from(handle.join()));
} else {
join_handle.replace(handle);
}
}
}
CmdChildHandle::SyncFn => {
break Box::new(SyncFnOutcome);
}
}
}
let _ = stdout.read(&mut buf);
};
outcome.to_io_result(&last_child.cmd, &last_child.file, last_child.line)
} else {
last_child.wait(true)
};
drop(stderr_thread);
CmdChildren::wait_children(&mut self.children)
let other_children_res = CmdChildren::wait_children(&mut self.children);
let _ = stderr_thread.join();

self.ignore_error
.then_some(Ok(()))
.unwrap_or(last_child_res.and(other_children_res))
}

/// Returns the OS-assigned process identifiers associated with these children processes.
Expand Down Expand Up @@ -253,62 +281,108 @@ impl CmdChild {

pub(crate) enum CmdChildHandle {
Proc(Child),
Thread(JoinHandle<CmdResult>),
Thread(Option<JoinHandle<CmdResult>>),
SyncFn,
}

impl CmdChildHandle {
fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult {
match self {
CmdChildHandle::Proc(mut proc) => {
let status = proc.wait();
match status {
Err(e) => return Err(process::new_cmd_io_error(&e, cmd, file, line)),
Ok(status) => {
if !status.success() {
return Err(Self::status_to_io_error(status, cmd, file, line));
}
}
}
}
CmdChildHandle::Thread(thread) => {
let status = thread.join();
match status {
Ok(result) => {
if let Err(e) = result {
return Err(process::new_cmd_io_error(&e, cmd, file, line));
}
}
Err(e) => {
return Err(Error::new(
ErrorKind::Other,
format!(
"Running [{cmd}] thread joined with error: {e:?} at {file}:{line}"
),
))
}
#[derive(Debug)]
struct ProcWaitOutcome(std::io::Result<ExitStatus>);
impl From<std::io::Result<ExitStatus>> for ProcWaitOutcome {
fn from(result: std::io::Result<ExitStatus>) -> Self {
Self(result)
}
}
impl Display for ProcWaitOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
Ok(status) => {
if status.success() {
write!(f, "Command process succeeded")
} else if let Some(code) = status.code() {
write!(f, "Command process exited normally with status code {code}")
} else {
write!(f, "Command process exited abnormally: {status}")
}
}
CmdChildHandle::SyncFn => {}
Err(error) => write!(f, "Failed to wait for command process: {error:?}"),
}
Ok(())
}

fn status_to_io_error(status: ExitStatus, cmd: &str, file: &str, line: u32) -> Error {
if let Some(code) = status.code() {
Error::new(
ErrorKind::Other,
format!("Running [{cmd}] exited with error; status code: {code} at {file}:{line}"),
)
}
#[derive(Debug)]
enum ThreadJoinOutcome {
Ok,
Err(std::io::Error),
Panic(Box<dyn Any + Send + 'static>),
}
impl From<std::thread::Result<CmdResult>> for ThreadJoinOutcome {
fn from(result: std::thread::Result<CmdResult>) -> Self {
match result {
Ok(Ok(())) => Self::Ok,
Ok(Err(error)) => Self::Err(error),
Err(panic) => Self::Panic(panic),
}
}
}
impl Display for ThreadJoinOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ok => write!(f, "Command thread succeeded"),
Self::Err(error) => write!(f, "Command thread returned error: {error:?}"),
Self::Panic(panic) => write!(f, "Command thread panicked: {panic:?}"),
}
}
}
#[derive(Debug)]
struct SyncFnOutcome;
impl Display for SyncFnOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Command finished")
}
}
trait ChildOutcome: Display {
fn success(&self) -> bool;
fn to_io_result(&self, cmd: &str, file: &str, line: u32) -> std::io::Result<()> {
if self.success() {
Ok(())
} else {
Error::new(
Err(Error::new(
ErrorKind::Other,
format!(
"Running [{cmd}] exited with error; terminated by {status} at {file}:{line}"
),
)
format!("Running [{cmd}] exited with error; {self} at {file}:{line}"),
))
}
}
}
impl ChildOutcome for ProcWaitOutcome {
fn success(&self) -> bool {
self.0.as_ref().is_ok_and(|status| status.success())
}
}
impl ChildOutcome for ThreadJoinOutcome {
fn success(&self) -> bool {
matches!(self, Self::Ok)
}
}
impl ChildOutcome for SyncFnOutcome {
fn success(&self) -> bool {
true
}
}

impl CmdChildHandle {
fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult {
let outcome: Box<dyn ChildOutcome> = match self {
CmdChildHandle::Proc(mut proc) => Box::new(ProcWaitOutcome::from(proc.wait())),
CmdChildHandle::Thread(mut thread) => {
if let Some(thread) = thread.take() {
Box::new(ThreadJoinOutcome::from(thread.join()))
} else {
unreachable!()
}
}
CmdChildHandle::SyncFn => return Ok(()),
};
outcome.to_io_result(cmd, file, line)
}

fn kill(self, cmd: &str, file: &str, line: u32) -> CmdResult {
match self {
Expand Down
2 changes: 1 addition & 1 deletion src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl Cmd {
if pipe_out || with_output {
let handle = thread::Builder::new().spawn(move || internal_cmd(&mut env))?;
Ok(CmdChild::new(
CmdChildHandle::Thread(handle),
CmdChildHandle::Thread(Some(handle)),
full_cmds,
self.file,
self.line,
Expand Down
18 changes: 14 additions & 4 deletions tests/test_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ fn test_pipe() {
test_case!(true, true, ($macro $bang (ignore true | false)) $($after)*),
test_case!(true, true, ($macro $bang (ignore false | true)) $($after)*),
test_case!(true, true, ($macro $bang (ignore false | false)) $($after)*),
// Built-ins should work too, without locking up.
test_case!(true, true, ($macro $bang (echo)) $($after)*),
test_case!(true, true, ($macro $bang (echo | true)) $($after)*),
test_case!(false, false, ($macro $bang (echo | false)) $($after)*),
test_case!(true, true, ($macro $bang (true | echo)) $($after)*),
test_case!(false, true, ($macro $bang (false | echo)) $($after)*),
test_case!(true, true, ($macro $bang (cd /)) $($after)*),
test_case!(true, true, ($macro $bang (cd / | true)) $($after)*),
test_case!(false, false, ($macro $bang (cd / | false)) $($after)*),
test_case!(true, true, ($macro $bang (true | cd /)) $($after)*),
test_case!(false, true, ($macro $bang (false | cd /)) $($after)*),
]
};
}
Expand All @@ -233,10 +244,9 @@ fn test_pipe() {
test_cases_for_entry_point!((spawn_with_output!(...))
.unwrap()
.wait_with_raw_output(&mut vec![])),
// FIXME: wait_with_pipe() is currently busted
// test_cases_for_entry_point!((spawn_with_output!(...))
// .unwrap()
// .wait_with_pipe(&mut |_stdout| {})),
test_cases_for_entry_point!((spawn_with_output!(...))
.unwrap()
.wait_with_pipe(&mut |_stdout| {})),
];

macro_rules! check_eq {
Expand Down