diff --git a/Cargo.toml b/Cargo.toml index 52c74cb..1f59b87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,9 @@ unstable = [] [dependencies] failure = "0.1.3" -futures = "0.1.13" +futures-util = "0.3.5" log = "0.4.6" -tokio = "0.1.11" -tokio-process = "0.2.3" +tokio = { version = "0.2.21", features = ["process", "time"] } nix = "0.11.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -32,7 +31,7 @@ scopeguard = "1.0.0" lazy_static = "1.0.0" winapi = "0.3" tempfile = "3.0.0" -reqwest = "0.9" +reqwest = { version = "0.10.4", features = ["blocking"] } flate2 = "1" tar = "0.4.0" percent-encoding = "2.1.0" @@ -40,9 +39,9 @@ walkdir = "2.2" toml = "0.5" fs2 = "0.4.3" remove_dir_all = "0.5.2" -base64 = "0.10.1" +base64 = "0.11.0" getrandom = { version = "0.1.12", features = ["std"] } [dev-dependencies] env_logger = "0.6.1" -tiny_http = "0.6.2" +tiny_http = "0.7.0" diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 8183fca..a64ff37 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -9,23 +9,34 @@ pub use sandbox::*; use crate::native; use crate::workspace::Workspace; use failure::{Error, Fail}; -use futures::{future, Future, Stream}; +use futures_util::{ + future::{self, FutureExt}, + stream::{self, TryStreamExt}, +}; use log::{error, info}; use process_lines_actions::InnerState; use std::convert::AsRef; use std::env::consts::EXE_SUFFIX; use std::ffi::{OsStr, OsString}; -use std::io::BufReader; use std::path::{Path, PathBuf}; -use std::process::{Command as StdCommand, ExitStatus, Stdio}; +use std::process::{ExitStatus, Stdio}; use std::time::{Duration, Instant}; -use tokio::{io::lines, runtime::current_thread::block_on_all, util::*}; -use tokio_process::CommandExt; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::Command as AsyncCommand, + runtime::Runtime, + stream::StreamExt, + time, +}; + +lazy_static::lazy_static! { + // TODO: Migrate to asynchronous code and remove runtime + pub(super) static ref RUNTIME: Runtime = Runtime::new().expect("Failed to construct tokio runtime"); +} pub(crate) mod container_dirs { - use std::path::{Path, PathBuf}; - use lazy_static::lazy_static; + use std::path::{Path, PathBuf}; #[cfg(windows)] lazy_static! { @@ -371,7 +382,7 @@ impl<'w, 'pl> Command<'w, 'pl> { ), Binary::__NonExaustive => panic!("do not create __NonExaustive variants manually"), }; - let mut cmd = StdCommand::new(crate::utils::normalize_path(&binary)); + let mut cmd = AsyncCommand::new(crate::utils::normalize_path(&binary)); cmd.args(&self.args); @@ -411,18 +422,21 @@ impl<'w, 'pl> Command<'w, 'pl> { if self.log_command { info!("running `{}`", cmdstr); } - let out = log_command( - cmd, - self.process_lines, - capture, - self.timeout, - self.no_output_timeout, - self.log_output, - ) - .map_err(|e| { - error!("error running command: {}", e); - e - })?; + + let out = RUNTIME + .handle() + .block_on(log_command( + cmd, + self.process_lines, + capture, + self.timeout, + self.no_output_timeout, + self.log_output, + )) + .map_err(|e| { + error!("error running command: {}", e); + e + })?; if out.status.success() { Ok(out.into()) @@ -481,8 +495,8 @@ impl OutputKind { } } -fn log_command( - mut cmd: StdCommand, +async fn log_command( + mut cmd: AsyncCommand, mut process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>, capture: bool, timeout: Option, @@ -503,34 +517,35 @@ fn log_command( timeout }; - let mut child = cmd - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn_async()?; + let mut child = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).spawn()?; let child_id = child.id(); - let stdout = lines(BufReader::new(child.stdout().take().unwrap())) + let stdout = BufReader::new(child.stdout.take().unwrap()) + .lines() .map(|line| (OutputKind::Stdout, line)); - let stderr = lines(BufReader::new(child.stderr().take().unwrap())) + let stderr = BufReader::new(child.stderr.take().unwrap()) + .lines() .map(|line| (OutputKind::Stderr, line)); let start = Instant::now(); let mut actions = ProcessLinesActions::new(); - let output = stdout - .select(stderr) + let output = stream::select(stdout, stderr) .timeout(no_output_timeout) - .map_err(move |err| { - if err.is_elapsed() { - match native::kill_process(child_id) { - Ok(()) => Error::from(CommandError::NoOutputFor(no_output_timeout.as_secs())), - Err(err) => err, - } - } else { - Error::from(err) - } + .map(move |result| match result { + // If the timeout elapses, kill the process + Err(_timeout) => Err(match native::kill_process(child_id) { + Ok(()) => Error::from(CommandError::NoOutputFor(no_output_timeout.as_secs())), + Err(err) => err, + }), + + // If an error occurred reading the line, flatten the error + Ok((_, Err(read_err))) => Err(Error::from(read_err)), + + // If the read was successful, return the `OutputKind` and the read line + Ok((out_kind, Ok(line))) => Ok((out_kind, line)), }) - .and_then(move |(kind, line)| { + .and_then(move |(kind, line): (OutputKind, String)| { // If the process is in a tight output loop the timeout on the process might fail to // be executed, so this extra check prevents the process to run without limits. if start.elapsed() > timeout { @@ -555,31 +570,44 @@ fn log_command( future::ok((kind, lines)) }) - .fold( - (Vec::new(), Vec::new()), - move |mut res, (kind, mut lines)| -> Result<_, Error> { + .try_fold( + (Vec::::new(), Vec::::new()), + move |(mut stdout, mut stderr), (kind, mut lines)| async move { + // If stdio/stdout is supposed to be captured, append it to + // the accumulated stdio/stdout if capture { match kind { - OutputKind::Stdout => res.0.append(&mut lines), - OutputKind::Stderr => res.1.append(&mut lines), + OutputKind::Stdout => stdout.append(&mut lines), + OutputKind::Stderr => stderr.append(&mut lines), } } - Ok(res) + + Ok((stdout, stderr)) }, ); - let child = child.timeout(timeout).map_err(move |err| { - if err.is_elapsed() { - match native::kill_process(child_id) { + let child = time::timeout(timeout, child).map(move |result| { + match result { + // If the timeout elapses, kill the process + Err(_timeout) => Err(match native::kill_process(child_id) { Ok(()) => Error::from(CommandError::Timeout(timeout.as_secs())), Err(err) => err, - } - } else { - Error::from(err) + }), + + // If an error occurred with the child + Ok(Err(err)) => Err(Error::from(err)), + + // If the read was successful, return the process's exit status + Ok(Ok(exit_status)) => Ok(exit_status), } }); - let ((stdout, stderr), status) = block_on_all(output.join(child))?; + let ((stdout, stderr), status) = { + let (output, child) = future::join(output, child).await; + let (stdout, stderr) = output?; + + ((stdout, stderr), child?) + }; Ok(InnerProcessOutput { status, diff --git a/src/workspace.rs b/src/workspace.rs index 587a389..c4065fa 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -154,7 +154,7 @@ impl WorkspaceBuilder { let mut headers = reqwest::header::HeaderMap::new(); headers.insert(reqwest::header::USER_AGENT, self.user_agent.parse()?); - let http = reqwest::ClientBuilder::new() + let http = reqwest::blocking::ClientBuilder::new() .default_headers(headers) .build()?; @@ -183,7 +183,7 @@ impl WorkspaceBuilder { } struct WorkspaceInner { - http: reqwest::Client, + http: reqwest::blocking::Client, path: PathBuf, sandbox_image: SandboxImage, command_timeout: Option, @@ -244,7 +244,7 @@ impl Workspace { crate::toolchain::list_installed_toolchains(&self.rustup_home()) } - pub(crate) fn http_client(&self) -> &reqwest::Client { + pub(crate) fn http_client(&self) -> &reqwest::blocking::Client { &self.inner.http }