Skip to content

Commit 1fab5bf

Browse files
author
Pietro Albini
authored
Merge pull request #23 from Kixiron/master
2 parents 6b0abb8 + f82dca8 commit 1fab5bf

File tree

3 files changed

+89
-62
lines changed

3 files changed

+89
-62
lines changed

Cargo.toml

+5-6
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,27 @@ unstable = []
2121

2222
[dependencies]
2323
failure = "0.1.3"
24-
futures = "0.1.13"
24+
futures-util = "0.3.5"
2525
log = "0.4.6"
26-
tokio = "0.1.11"
27-
tokio-process = "0.2.3"
26+
tokio = { version = "0.2.21", features = ["process", "time"] }
2827
nix = "0.11.0"
2928
serde = { version = "1.0", features = ["derive"] }
3029
serde_json = "1.0"
3130
scopeguard = "1.0.0"
3231
lazy_static = "1.0.0"
3332
winapi = "0.3"
3433
tempfile = "3.0.0"
35-
reqwest = "0.9"
34+
reqwest = { version = "0.10.4", features = ["blocking"] }
3635
flate2 = "1"
3736
tar = "0.4.0"
3837
percent-encoding = "2.1.0"
3938
walkdir = "2.2"
4039
toml = "0.5"
4140
fs2 = "0.4.3"
4241
remove_dir_all = "0.5.2"
43-
base64 = "0.10.1"
42+
base64 = "0.11.0"
4443
getrandom = { version = "0.1.12", features = ["std"] }
4544

4645
[dev-dependencies]
4746
env_logger = "0.6.1"
48-
tiny_http = "0.6.2"
47+
tiny_http = "0.7.0"

src/cmd/mod.rs

+81-53
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,34 @@ pub use sandbox::*;
99
use crate::native;
1010
use crate::workspace::Workspace;
1111
use failure::{Error, Fail};
12-
use futures::{future, Future, Stream};
12+
use futures_util::{
13+
future::{self, FutureExt},
14+
stream::{self, TryStreamExt},
15+
};
1316
use log::{error, info};
1417
use process_lines_actions::InnerState;
1518
use std::convert::AsRef;
1619
use std::env::consts::EXE_SUFFIX;
1720
use std::ffi::{OsStr, OsString};
18-
use std::io::BufReader;
1921
use std::path::{Path, PathBuf};
20-
use std::process::{Command as StdCommand, ExitStatus, Stdio};
22+
use std::process::{ExitStatus, Stdio};
2123
use std::time::{Duration, Instant};
22-
use tokio::{io::lines, runtime::current_thread::block_on_all, util::*};
23-
use tokio_process::CommandExt;
24+
use tokio::{
25+
io::{AsyncBufReadExt, BufReader},
26+
process::Command as AsyncCommand,
27+
runtime::Runtime,
28+
stream::StreamExt,
29+
time,
30+
};
31+
32+
lazy_static::lazy_static! {
33+
// TODO: Migrate to asynchronous code and remove runtime
34+
pub(super) static ref RUNTIME: Runtime = Runtime::new().expect("Failed to construct tokio runtime");
35+
}
2436

2537
pub(crate) mod container_dirs {
26-
use std::path::{Path, PathBuf};
27-
2838
use lazy_static::lazy_static;
39+
use std::path::{Path, PathBuf};
2940

3041
#[cfg(windows)]
3142
lazy_static! {
@@ -371,7 +382,7 @@ impl<'w, 'pl> Command<'w, 'pl> {
371382
),
372383
Binary::__NonExaustive => panic!("do not create __NonExaustive variants manually"),
373384
};
374-
let mut cmd = StdCommand::new(crate::utils::normalize_path(&binary));
385+
let mut cmd = AsyncCommand::new(crate::utils::normalize_path(&binary));
375386

376387
cmd.args(&self.args);
377388

@@ -411,18 +422,21 @@ impl<'w, 'pl> Command<'w, 'pl> {
411422
if self.log_command {
412423
info!("running `{}`", cmdstr);
413424
}
414-
let out = log_command(
415-
cmd,
416-
self.process_lines,
417-
capture,
418-
self.timeout,
419-
self.no_output_timeout,
420-
self.log_output,
421-
)
422-
.map_err(|e| {
423-
error!("error running command: {}", e);
424-
e
425-
})?;
425+
426+
let out = RUNTIME
427+
.handle()
428+
.block_on(log_command(
429+
cmd,
430+
self.process_lines,
431+
capture,
432+
self.timeout,
433+
self.no_output_timeout,
434+
self.log_output,
435+
))
436+
.map_err(|e| {
437+
error!("error running command: {}", e);
438+
e
439+
})?;
426440

427441
if out.status.success() {
428442
Ok(out.into())
@@ -481,8 +495,8 @@ impl OutputKind {
481495
}
482496
}
483497

484-
fn log_command(
485-
mut cmd: StdCommand,
498+
async fn log_command(
499+
mut cmd: AsyncCommand,
486500
mut process_lines: Option<&mut dyn FnMut(&str, &mut ProcessLinesActions)>,
487501
capture: bool,
488502
timeout: Option<Duration>,
@@ -503,34 +517,35 @@ fn log_command(
503517
timeout
504518
};
505519

506-
let mut child = cmd
507-
.stdout(Stdio::piped())
508-
.stderr(Stdio::piped())
509-
.spawn_async()?;
520+
let mut child = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).spawn()?;
510521
let child_id = child.id();
511522

512-
let stdout = lines(BufReader::new(child.stdout().take().unwrap()))
523+
let stdout = BufReader::new(child.stdout.take().unwrap())
524+
.lines()
513525
.map(|line| (OutputKind::Stdout, line));
514-
let stderr = lines(BufReader::new(child.stderr().take().unwrap()))
526+
let stderr = BufReader::new(child.stderr.take().unwrap())
527+
.lines()
515528
.map(|line| (OutputKind::Stderr, line));
516529

517530
let start = Instant::now();
518531
let mut actions = ProcessLinesActions::new();
519532

520-
let output = stdout
521-
.select(stderr)
533+
let output = stream::select(stdout, stderr)
522534
.timeout(no_output_timeout)
523-
.map_err(move |err| {
524-
if err.is_elapsed() {
525-
match native::kill_process(child_id) {
526-
Ok(()) => Error::from(CommandError::NoOutputFor(no_output_timeout.as_secs())),
527-
Err(err) => err,
528-
}
529-
} else {
530-
Error::from(err)
531-
}
535+
.map(move |result| match result {
536+
// If the timeout elapses, kill the process
537+
Err(_timeout) => Err(match native::kill_process(child_id) {
538+
Ok(()) => Error::from(CommandError::NoOutputFor(no_output_timeout.as_secs())),
539+
Err(err) => err,
540+
}),
541+
542+
// If an error occurred reading the line, flatten the error
543+
Ok((_, Err(read_err))) => Err(Error::from(read_err)),
544+
545+
// If the read was successful, return the `OutputKind` and the read line
546+
Ok((out_kind, Ok(line))) => Ok((out_kind, line)),
532547
})
533-
.and_then(move |(kind, line)| {
548+
.and_then(move |(kind, line): (OutputKind, String)| {
534549
// If the process is in a tight output loop the timeout on the process might fail to
535550
// be executed, so this extra check prevents the process to run without limits.
536551
if start.elapsed() > timeout {
@@ -555,31 +570,44 @@ fn log_command(
555570

556571
future::ok((kind, lines))
557572
})
558-
.fold(
559-
(Vec::new(), Vec::new()),
560-
move |mut res, (kind, mut lines)| -> Result<_, Error> {
573+
.try_fold(
574+
(Vec::<String>::new(), Vec::<String>::new()),
575+
move |(mut stdout, mut stderr), (kind, mut lines)| async move {
576+
// If stdio/stdout is supposed to be captured, append it to
577+
// the accumulated stdio/stdout
561578
if capture {
562579
match kind {
563-
OutputKind::Stdout => res.0.append(&mut lines),
564-
OutputKind::Stderr => res.1.append(&mut lines),
580+
OutputKind::Stdout => stdout.append(&mut lines),
581+
OutputKind::Stderr => stderr.append(&mut lines),
565582
}
566583
}
567-
Ok(res)
584+
585+
Ok((stdout, stderr))
568586
},
569587
);
570588

571-
let child = child.timeout(timeout).map_err(move |err| {
572-
if err.is_elapsed() {
573-
match native::kill_process(child_id) {
589+
let child = time::timeout(timeout, child).map(move |result| {
590+
match result {
591+
// If the timeout elapses, kill the process
592+
Err(_timeout) => Err(match native::kill_process(child_id) {
574593
Ok(()) => Error::from(CommandError::Timeout(timeout.as_secs())),
575594
Err(err) => err,
576-
}
577-
} else {
578-
Error::from(err)
595+
}),
596+
597+
// If an error occurred with the child
598+
Ok(Err(err)) => Err(Error::from(err)),
599+
600+
// If the read was successful, return the process's exit status
601+
Ok(Ok(exit_status)) => Ok(exit_status),
579602
}
580603
});
581604

582-
let ((stdout, stderr), status) = block_on_all(output.join(child))?;
605+
let ((stdout, stderr), status) = {
606+
let (output, child) = future::join(output, child).await;
607+
let (stdout, stderr) = output?;
608+
609+
((stdout, stderr), child?)
610+
};
583611

584612
Ok(InnerProcessOutput {
585613
status,

src/workspace.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl WorkspaceBuilder {
154154

155155
let mut headers = reqwest::header::HeaderMap::new();
156156
headers.insert(reqwest::header::USER_AGENT, self.user_agent.parse()?);
157-
let http = reqwest::ClientBuilder::new()
157+
let http = reqwest::blocking::ClientBuilder::new()
158158
.default_headers(headers)
159159
.build()?;
160160

@@ -183,7 +183,7 @@ impl WorkspaceBuilder {
183183
}
184184

185185
struct WorkspaceInner {
186-
http: reqwest::Client,
186+
http: reqwest::blocking::Client,
187187
path: PathBuf,
188188
sandbox_image: SandboxImage,
189189
command_timeout: Option<Duration>,
@@ -244,7 +244,7 @@ impl Workspace {
244244
crate::toolchain::list_installed_toolchains(&self.rustup_home())
245245
}
246246

247-
pub(crate) fn http_client(&self) -> &reqwest::Client {
247+
pub(crate) fn http_client(&self) -> &reqwest::blocking::Client {
248248
&self.inner.http
249249
}
250250

0 commit comments

Comments
 (0)