Skip to content

Commit 0c8151d

Browse files
committed
Remove dependency on pipe, unless parallel
1 parent 2b52daf commit 0c8151d

File tree

7 files changed

+210
-315
lines changed

7 files changed

+210
-315
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ rust-version = "1.53"
2222
[target.'cfg(unix)'.dependencies]
2323
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
2424
# which is still an issue with `resolver = "1"`.
25-
libc = { version = "0.2.62", default-features = false }
25+
libc = { version = "0.2.62", default-features = false, optional = true }
2626

2727
[features]
28-
parallel = []
28+
parallel = ["libc"]
2929

3030
[dev-dependencies]
3131
tempfile = "3"

src/command_helpers.rs

+172-88
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@ use std::{
44
collections::hash_map,
55
ffi::OsString,
66
fmt::Display,
7-
fs::{self, File},
7+
fs,
88
hash::Hasher,
9-
io::{self, BufRead, BufReader, Read, Write},
9+
io::{self, Read, Write},
1010
path::Path,
11-
process::{Child, Command, Stdio},
11+
process::{Child, ChildStderr, Command, Stdio},
1212
sync::Arc,
13-
thread::{self, JoinHandle},
1413
};
1514

1615
use crate::{Error, ErrorKind, Object};
@@ -41,83 +40,175 @@ impl CargoOutput {
4140
}
4241
}
4342

44-
pub(crate) fn print_thread(&self) -> Result<Option<PrintThread>, Error> {
45-
self.warnings.then(PrintThread::new).transpose()
43+
fn stdio_for_warnings(&self) -> Stdio {
44+
if self.warnings {
45+
Stdio::piped()
46+
} else {
47+
Stdio::null()
48+
}
4649
}
4750
}
4851

49-
pub(crate) struct PrintThread {
50-
handle: Option<JoinHandle<()>>,
51-
pipe_writer: Option<File>,
52+
pub(crate) struct StderrForwarder {
53+
inner: Option<(ChildStderr, Vec<u8>)>,
54+
#[cfg(feature = "parallel")]
55+
is_non_blocking: bool,
5256
}
5357

54-
impl PrintThread {
55-
pub(crate) fn new() -> Result<Self, Error> {
56-
let (pipe_reader, pipe_writer) = crate::os_pipe::pipe()?;
57-
58-
// Capture the standard error coming from compilation, and write it out
59-
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
60-
// requiring the output to be UTF-8, we instead just ship bytes from one
61-
// location to another.
62-
let print = thread::spawn(move || {
63-
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
64-
let mut line = Vec::with_capacity(20);
65-
let stdout = io::stdout();
58+
#[cfg(all(feature = "parallel", not(unix), not(windows)))]
59+
compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature.");
60+
61+
const MIN_BUFFER_CAPACITY: usize = 100;
62+
63+
impl StderrForwarder {
64+
pub(crate) fn new(child: &mut Child) -> Self {
65+
Self {
66+
inner: child
67+
.stderr
68+
.take()
69+
.map(|stderr| (stderr, Vec::with_capacity(MIN_BUFFER_CAPACITY))),
70+
#[cfg(feature = "parallel")]
71+
is_non_blocking: false,
72+
}
73+
}
6674

67-
// read_until returns 0 on Eof
68-
while stderr.read_until(b'\n', &mut line).unwrap() != 0 {
69-
{
70-
let mut stdout = stdout.lock();
75+
fn forward_available(&mut self) -> bool {
76+
if let Some((stderr, buffer)) = self.inner.as_mut() {
77+
let stdout = io::stdout();
78+
let write_warning = move |line: &[u8]| {
79+
let mut stdout = stdout.lock();
80+
stdout.write_all(b"cargo:warning=").unwrap();
81+
stdout.write_all(line).unwrap();
82+
stdout.write_all(b"\n").unwrap();
83+
};
7184

72-
stdout.write_all(b"cargo:warning=").unwrap();
73-
stdout.write_all(&line).unwrap();
74-
stdout.write_all(b"\n").unwrap();
85+
#[cfg(all(windows, feature = "parallel"))]
86+
let is_non_blocking = self.is_non_blocking;
87+
let mut read_stderr = move |buf: &mut [u8]| -> Result<usize, io::Error> {
88+
// On Unix, the pipe is non-blocking, so we can just read.
89+
// On Windows, take a peek at the pipe to see if there's data.
90+
#[cfg(all(windows, feature = "parallel"))]
91+
if is_non_blocking {
92+
use crate::windows::windows_sys::PeekNamedPipe;
93+
use std::os::windows::io::AsRawHandle;
94+
use std::ptr::null_mut;
95+
let mut bytes_available = 0;
96+
unsafe {
97+
if PeekNamedPipe(
98+
stderr.as_raw_handle(),
99+
null_mut(),
100+
0,
101+
null_mut(),
102+
&mut bytes_available,
103+
null_mut(),
104+
) == 0
105+
{
106+
return Err(io::Error::last_os_error());
107+
}
108+
}
109+
if bytes_available == 0 {
110+
return Err(io::Error::new(
111+
io::ErrorKind::WouldBlock,
112+
"The pipe is empty",
113+
));
114+
}
75115
}
76116

77-
// read_until does not clear the buffer
78-
line.clear();
79-
}
80-
});
117+
stderr.read(buf)
118+
};
81119

82-
Ok(Self {
83-
handle: Some(print),
84-
pipe_writer: Some(pipe_writer),
85-
})
120+
loop {
121+
buffer.reserve(MIN_BUFFER_CAPACITY);
122+
123+
let old_data_end = buffer.len();
124+
buffer.resize(buffer.capacity(), 0);
125+
match read_stderr(&mut buffer[old_data_end..]) {
126+
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
127+
// No data currently, yield back.
128+
buffer.truncate(old_data_end);
129+
return false;
130+
}
131+
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {
132+
// Interrupted, try again.
133+
buffer.truncate(old_data_end);
134+
}
135+
Ok(0) | Err(_) => {
136+
// End of stream: flush remaining data and bail.
137+
if old_data_end > 0 {
138+
write_warning(&buffer[..old_data_end]);
139+
}
140+
return true;
141+
}
142+
Ok(bytes_read) => {
143+
buffer.truncate(old_data_end + bytes_read);
144+
let mut consumed = 0;
145+
for line in buffer.split_inclusive(|&b| b == b'\n') {
146+
// Only forward complete lines, leave the rest in the buffer.
147+
if let Some((b'\n', line)) = line.split_last() {
148+
consumed += line.len() + 1;
149+
write_warning(line);
150+
}
151+
}
152+
buffer.drain(..consumed);
153+
}
154+
}
155+
}
156+
} else {
157+
true
158+
}
86159
}
87160

88-
/// # Panics
89-
///
90-
/// Will panic if the pipe writer has already been taken.
91-
pub(crate) fn take_pipe_writer(&mut self) -> File {
92-
self.pipe_writer.take().unwrap()
93-
}
161+
#[cfg(feature = "parallel")]
162+
pub(crate) fn set_non_blocking(&mut self) -> Result<(), Error> {
163+
assert!(!self.is_non_blocking);
164+
165+
// On Unix, switch the pipe to non-blocking mode.
166+
// On Windows, we have a different way to be non-blocking.
167+
#[cfg(unix)]
168+
if let Some((stderr, _)) = self.inner.as_mut() {
169+
use std::os::unix::io::AsRawFd;
170+
let fd = stderr.as_raw_fd();
171+
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
172+
if flags < 0 {
173+
return Err(Error::new(
174+
ErrorKind::IOError,
175+
format!(
176+
"Failed to get flags for child stderr: {}",
177+
io::Error::last_os_error()
178+
),
179+
));
180+
}
94181

95-
/// # Panics
96-
///
97-
/// Will panic if the pipe writer has already been taken.
98-
pub(crate) fn clone_pipe_writer(&self) -> Result<File, Error> {
99-
self.try_clone_pipe_writer().map(Option::unwrap)
100-
}
182+
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } != 0 {
183+
return Err(Error::new(
184+
ErrorKind::IOError,
185+
format!(
186+
"Failed to set flags for child stderr: {}",
187+
io::Error::last_os_error()
188+
),
189+
));
190+
}
191+
}
101192

102-
pub(crate) fn try_clone_pipe_writer(&self) -> Result<Option<File>, Error> {
103-
self.pipe_writer
104-
.as_ref()
105-
.map(File::try_clone)
106-
.transpose()
107-
.map_err(From::from)
193+
self.is_non_blocking = true;
194+
Ok(())
108195
}
109-
}
110196

111-
impl Drop for PrintThread {
112-
fn drop(&mut self) {
113-
// Drop pipe_writer first to avoid deadlock
114-
self.pipe_writer.take();
197+
#[cfg(feature = "parallel")]
198+
fn forward_all(&mut self) {
199+
while !self.forward_available() {}
200+
}
115201

116-
self.handle.take().unwrap().join().unwrap();
202+
#[cfg(not(feature = "parallel"))]
203+
fn forward_all(&mut self) {
204+
let forward_result = self.forward_available();
205+
assert!(forward_result, "Should have consumed all data");
117206
}
118207
}
119208

120209
fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
210+
StderrForwarder::new(child).forward_all();
211+
121212
let status = match child.wait() {
122213
Ok(s) => s,
123214
Err(e) => {
@@ -193,20 +284,13 @@ pub(crate) fn objects_from_files(files: &[Arc<Path>], dst: &Path) -> Result<Vec<
193284
Ok(objects)
194285
}
195286

196-
fn run_inner(cmd: &mut Command, program: &str, pipe_writer: Option<File>) -> Result<(), Error> {
197-
let mut child = spawn(cmd, program, pipe_writer)?;
198-
wait_on_child(cmd, program, &mut child)
199-
}
200-
201287
pub(crate) fn run(
202288
cmd: &mut Command,
203289
program: &str,
204-
print: Option<&PrintThread>,
290+
cargo_output: &CargoOutput,
205291
) -> Result<(), Error> {
206-
let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?;
207-
run_inner(cmd, program, pipe_writer)?;
208-
209-
Ok(())
292+
let mut child = spawn(cmd, program, cargo_output)?;
293+
wait_on_child(cmd, program, &mut child)
210294
}
211295

212296
pub(crate) fn run_output(
@@ -216,12 +300,7 @@ pub(crate) fn run_output(
216300
) -> Result<Vec<u8>, Error> {
217301
cmd.stdout(Stdio::piped());
218302

219-
let mut print = cargo_output.print_thread()?;
220-
let mut child = spawn(
221-
cmd,
222-
program,
223-
print.as_mut().map(PrintThread::take_pipe_writer),
224-
)?;
303+
let mut child = spawn(cmd, program, cargo_output)?;
225304

226305
let mut stdout = vec![];
227306
child
@@ -239,7 +318,7 @@ pub(crate) fn run_output(
239318
pub(crate) fn spawn(
240319
cmd: &mut Command,
241320
program: &str,
242-
pipe_writer: Option<File>,
321+
cargo_output: &CargoOutput,
243322
) -> Result<Child, Error> {
244323
struct ResetStderr<'cmd>(&'cmd mut Command);
245324

@@ -254,10 +333,7 @@ pub(crate) fn spawn(
254333
println!("running: {:?}", cmd);
255334

256335
let cmd = ResetStderr(cmd);
257-
let child = cmd
258-
.0
259-
.stderr(pipe_writer.map_or_else(Stdio::null, Stdio::from))
260-
.spawn();
336+
let child = cmd.0.stderr(cargo_output.stdio_for_warnings()).spawn();
261337
match child {
262338
Ok(child) => Ok(child),
263339
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
@@ -307,9 +383,14 @@ pub(crate) fn try_wait_on_child(
307383
program: &str,
308384
child: &mut Child,
309385
stdout: &mut dyn io::Write,
386+
stderr_forwarder: &mut StderrForwarder,
310387
) -> Result<Option<()>, Error> {
388+
stderr_forwarder.forward_available();
389+
311390
match child.try_wait() {
312391
Ok(Some(status)) => {
392+
stderr_forwarder.forward_all();
393+
313394
let _ = writeln!(stdout, "{}", status);
314395

315396
if status.success() {
@@ -325,12 +406,15 @@ pub(crate) fn try_wait_on_child(
325406
}
326407
}
327408
Ok(None) => Ok(None),
328-
Err(e) => Err(Error::new(
329-
ErrorKind::ToolExecError,
330-
format!(
331-
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
332-
cmd, program, e
333-
),
334-
)),
409+
Err(e) => {
410+
stderr_forwarder.forward_all();
411+
Err(Error::new(
412+
ErrorKind::ToolExecError,
413+
format!(
414+
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
415+
cmd, program, e
416+
),
417+
))
418+
}
335419
}
336420
}

0 commit comments

Comments
 (0)