Skip to content

Update asyncronous dependencies #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,27 @@ unstable = []

[dependencies]
failure = "0.1.3"
futures = "0.1.13"
futures-util = "0.3.5"
log = "0.4.6"
tokio = "0.1.11"
tokio-process = "0.2.3"
tokio = { version = "0.2.21", features = ["process", "time"] }
nix = "0.11.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
scopeguard = "1.0.0"
lazy_static = "1.0.0"
winapi = "0.3"
tempfile = "3.0.0"
reqwest = "0.9"
reqwest = { version = "0.10.4", features = ["blocking"] }
flate2 = "1"
tar = "0.4.0"
percent-encoding = "2.1.0"
walkdir = "2.2"
toml = "0.5"
fs2 = "0.4.3"
remove_dir_all = "0.5.2"
base64 = "0.10.1"
base64 = "0.11.0"
getrandom = { version = "0.1.12", features = ["std"] }

[dev-dependencies]
env_logger = "0.6.1"
tiny_http = "0.6.2"
tiny_http = "0.7.0"
134 changes: 81 additions & 53 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,34 @@ pub use sandbox::*;
use crate::native;
use crate::workspace::Workspace;
use failure::{Error, Fail};
use futures::{future, Future, Stream};
use futures_util::{
future::{self, FutureExt},
stream::{self, TryStreamExt},
};
use log::{error, info};
use process_lines_actions::InnerState;
use std::convert::AsRef;
use std::env::consts::EXE_SUFFIX;
use std::ffi::{OsStr, OsString};
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, ExitStatus, Stdio};
use std::process::{ExitStatus, Stdio};
use std::time::{Duration, Instant};
use tokio::{io::lines, runtime::current_thread::block_on_all, util::*};
use tokio_process::CommandExt;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command as AsyncCommand,
runtime::Runtime,
stream::StreamExt,
time,
};

lazy_static::lazy_static! {
// TODO: Migrate to asynchronous code and remove runtime
pub(super) static ref RUNTIME: Runtime = Runtime::new().expect("Failed to construct tokio runtime");
}

pub(crate) mod container_dirs {
use std::path::{Path, PathBuf};

use lazy_static::lazy_static;
use std::path::{Path, PathBuf};

#[cfg(windows)]
lazy_static! {
Expand Down Expand Up @@ -371,7 +382,7 @@ impl<'w, 'pl> Command<'w, 'pl> {
),
Binary::__NonExaustive => panic!("do not create __NonExaustive variants manually"),
};
let mut cmd = StdCommand::new(crate::utils::normalize_path(&binary));
let mut cmd = AsyncCommand::new(crate::utils::normalize_path(&binary));

cmd.args(&self.args);

Expand Down Expand Up @@ -411,18 +422,21 @@ impl<'w, 'pl> Command<'w, 'pl> {
if self.log_command {
info!("running `{}`", cmdstr);
}
let out = log_command(
cmd,
self.process_lines,
capture,
self.timeout,
self.no_output_timeout,
self.log_output,
)
.map_err(|e| {
error!("error running command: {}", e);
e
})?;

let out = RUNTIME
.handle()
.block_on(log_command(
cmd,
self.process_lines,
capture,
self.timeout,
self.no_output_timeout,
self.log_output,
))
.map_err(|e| {
error!("error running command: {}", e);
e
})?;

if out.status.success() {
Ok(out.into())
Expand Down Expand Up @@ -481,8 +495,8 @@ impl OutputKind {
}
}

fn log_command(
mut cmd: StdCommand,
async fn log_command(
mut cmd: AsyncCommand,
mut process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
capture: bool,
timeout: Option<Duration>,
Expand All @@ -503,34 +517,35 @@ fn log_command(
timeout
};

let mut child = cmd
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn_async()?;
let mut child = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).spawn()?;
let child_id = child.id();

let stdout = lines(BufReader::new(child.stdout().take().unwrap()))
let stdout = BufReader::new(child.stdout.take().unwrap())
.lines()
.map(|line| (OutputKind::Stdout, line));
let stderr = lines(BufReader::new(child.stderr().take().unwrap()))
let stderr = BufReader::new(child.stderr.take().unwrap())
.lines()
.map(|line| (OutputKind::Stderr, line));

let start = Instant::now();
let mut actions = ProcessLinesActions::new();

let output = stdout
.select(stderr)
let output = stream::select(stdout, stderr)
.timeout(no_output_timeout)
.map_err(move |err| {
if err.is_elapsed() {
match native::kill_process(child_id) {
Ok(()) => Error::from(CommandError::NoOutputFor(no_output_timeout.as_secs())),
Err(err) => err,
}
} else {
Error::from(err)
}
.map(move |result| match result {
// If the timeout elapses, kill the process
Err(_timeout) => Err(match native::kill_process(child_id) {
Ok(()) => Error::from(CommandError::NoOutputFor(no_output_timeout.as_secs())),
Err(err) => err,
}),

// If an error occurred reading the line, flatten the error
Ok((_, Err(read_err))) => Err(Error::from(read_err)),

// If the read was successful, return the `OutputKind` and the read line
Ok((out_kind, Ok(line))) => Ok((out_kind, line)),
})
.and_then(move |(kind, line)| {
.and_then(move |(kind, line): (OutputKind, String)| {
// If the process is in a tight output loop the timeout on the process might fail to
// be executed, so this extra check prevents the process to run without limits.
if start.elapsed() > timeout {
Expand All @@ -555,31 +570,44 @@ fn log_command(

future::ok((kind, lines))
})
.fold(
(Vec::new(), Vec::new()),
move |mut res, (kind, mut lines)| -> Result<_, Error> {
.try_fold(
(Vec::<String>::new(), Vec::<String>::new()),
move |(mut stdout, mut stderr), (kind, mut lines)| async move {
// If stdio/stdout is supposed to be captured, append it to
// the accumulated stdio/stdout
if capture {
match kind {
OutputKind::Stdout => res.0.append(&mut lines),
OutputKind::Stderr => res.1.append(&mut lines),
OutputKind::Stdout => stdout.append(&mut lines),
OutputKind::Stderr => stderr.append(&mut lines),
}
}
Ok(res)

Ok((stdout, stderr))
},
);

let child = child.timeout(timeout).map_err(move |err| {
if err.is_elapsed() {
match native::kill_process(child_id) {
let child = time::timeout(timeout, child).map(move |result| {
match result {
// If the timeout elapses, kill the process
Err(_timeout) => Err(match native::kill_process(child_id) {
Ok(()) => Error::from(CommandError::Timeout(timeout.as_secs())),
Err(err) => err,
}
} else {
Error::from(err)
}),

// If an error occurred with the child
Ok(Err(err)) => Err(Error::from(err)),

// If the read was successful, return the process's exit status
Ok(Ok(exit_status)) => Ok(exit_status),
}
});

let ((stdout, stderr), status) = block_on_all(output.join(child))?;
let ((stdout, stderr), status) = {
let (output, child) = future::join(output, child).await;
let (stdout, stderr) = output?;

((stdout, stderr), child?)
};

Ok(InnerProcessOutput {
status,
Expand Down
6 changes: 3 additions & 3 deletions src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl WorkspaceBuilder {

let mut headers = reqwest::header::HeaderMap::new();
headers.insert(reqwest::header::USER_AGENT, self.user_agent.parse()?);
let http = reqwest::ClientBuilder::new()
let http = reqwest::blocking::ClientBuilder::new()
.default_headers(headers)
.build()?;

Expand Down Expand Up @@ -183,7 +183,7 @@ impl WorkspaceBuilder {
}

struct WorkspaceInner {
http: reqwest::Client,
http: reqwest::blocking::Client,
path: PathBuf,
sandbox_image: SandboxImage,
command_timeout: Option<Duration>,
Expand Down Expand Up @@ -244,7 +244,7 @@ impl Workspace {
crate::toolchain::list_installed_toolchains(&self.rustup_home())
}

pub(crate) fn http_client(&self) -> &reqwest::Client {
pub(crate) fn http_client(&self) -> &reqwest::blocking::Client {
&self.inner.http
}

Expand Down