Skip to content

Reflect the half-duplex nature of pipes in their types #19382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libstd/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub use self::net::ip::IpAddr;
pub use self::net::tcp::TcpListener;
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
pub use self::pipe::PipeStream;
pub use self::pipe::{PipeReader,PipeWriter};
pub use self::process::{Process, Command};
pub use self::tempfile::TempDir;

Expand Down
148 changes: 104 additions & 44 deletions src/libstd/io/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,74 @@ use sys_common;
use sys;
use sys::fs::FileDesc as FileDesc;

/// A synchronous, in-memory pipe.
pub struct PipeStream {
#[deriving(Clone)]
struct PipeImpl {
inner: Arc<FileDesc>
}

/// The reading end of a pipe
#[deriving(Clone)]
pub struct PipeReader {
inner: PipeImpl
}

/// The writing end of a pipe
#[deriving(Clone)]
pub struct PipeWriter {
inner: PipeImpl
}

pub struct PipePair {
pub reader: PipeStream,
pub writer: PipeStream,
pub reader: PipeReader,
pub writer: PipeWriter,
}

impl PipePair {
/// Creates a pair of in-memory OS pipes for a unidirectional communication
/// stream.
///
/// The structure returned contains a reader and writer I/O object. Data
/// written to the writer can be read from the reader.
///
/// # Errors
///
/// This function can fail to succeed if the underlying OS has run out of
/// available resources to allocate a new pipe.
pub fn new() -> IoResult<PipePair> {
let (reader, writer) = try!(unsafe { sys::os::pipe() });
Ok(PipePair {
reader: PipeReader::from_filedesc(reader),
writer: PipeWriter::from_filedesc(writer),
})
}
}

impl PipeImpl {
fn open(fd: libc::c_int) -> IoResult<PipeImpl> {
Ok(PipeImpl::from_filedesc(FileDesc::new(fd, true)))
}

#[doc(hidden)]
fn from_filedesc(fd: FileDesc) -> PipeImpl {
PipeImpl { inner: Arc::new(fd) }
}
}

impl sys_common::AsInner<sys::fs::FileDesc> for PipeReader {
fn as_inner(&self) -> &sys::fs::FileDesc {
&*self.inner.inner
}
}

impl sys_common::AsInner<sys::fs::FileDesc> for PipeWriter {
fn as_inner(&self) -> &sys::fs::FileDesc {
&*self.inner.inner
}
}

impl PipeStream {
/// Consumes a file descriptor to return a pipe stream that will have
/// synchronous, but non-blocking reads/writes. This is useful if the file
impl PipeReader {
/// Consumes a file descriptor to return a pipe reader that will have
/// synchronous, but non-blocking reads. This is useful if the file
/// descriptor is acquired via means other than the standard methods.
///
/// This operation consumes ownership of the file descriptor and it will be
Expand All @@ -49,64 +104,69 @@ impl PipeStream {
/// # #![allow(unused_must_use)]
/// extern crate libc;
///
/// use std::io::pipe::PipeStream;
/// use std::io::pipe::PipeReader;
///
/// fn main() {
/// let mut pipe = PipeStream::open(libc::STDERR_FILENO);
/// pipe.write(b"Hello, stderr!");
/// let mut pipe = PipeReader::open(libc::STDIN_FILENO);
/// let mut buf = [0, ..10];
/// pipe.read(&mut buf).unwrap();
/// }
/// ```
pub fn open(fd: libc::c_int) -> IoResult<PipeStream> {
Ok(PipeStream::from_filedesc(FileDesc::new(fd, true)))
pub fn open(fd: libc::c_int) -> IoResult<PipeReader> {
PipeImpl::open(fd).map(|x| PipeReader { inner: x })
}

// FIXME: expose this some other way
/// Wrap a FileDesc directly, taking ownership.
#[doc(hidden)]
pub fn from_filedesc(fd: FileDesc) -> PipeStream {
PipeStream { inner: Arc::new(fd) }
pub fn from_filedesc(fd: FileDesc) -> PipeReader {
PipeReader { inner: PipeImpl::from_filedesc(fd) }
}

/// Creates a pair of in-memory OS pipes for a unidirectional communication
/// stream.
}

impl PipeWriter {
/// Consumes a file descriptor to return a pipe writer that will have
/// synchronous, but non-blocking writes. This is useful if the file
/// descriptor is acquired via means other than the standard methods.
///
/// The structure returned contains a reader and writer I/O object. Data
/// written to the writer can be read from the reader.
/// This operation consumes ownership of the file descriptor and it will be
/// closed once the object is deallocated.
///
/// # Errors
/// # Example
///
/// This function can fail to succeed if the underlying OS has run out of
/// available resources to allocate a new pipe.
pub fn pair() -> IoResult<PipePair> {
let (reader, writer) = try!(unsafe { sys::os::pipe() });
Ok(PipePair {
reader: PipeStream::from_filedesc(reader),
writer: PipeStream::from_filedesc(writer),
})
}
}

impl sys_common::AsInner<sys::fs::FileDesc> for PipeStream {
fn as_inner(&self) -> &sys::fs::FileDesc {
&*self.inner
/// ```{rust,no_run}
/// # #![allow(unused_must_use)]
/// extern crate libc;
///
/// use std::io::pipe::PipeWriter;
///
/// fn main() {
/// let mut pipe = PipeWriter::open(libc::STDERR_FILENO);
/// pipe.write(b"Hello, World!");
/// }
/// ```
pub fn open(fd: libc::c_int) -> IoResult<PipeWriter> {
PipeImpl::open(fd).map(|x| PipeWriter { inner: x })
}
}

impl Clone for PipeStream {
fn clone(&self) -> PipeStream {
PipeStream { inner: self.inner.clone() }
// FIXME: expose this some other way
/// Wrap a FileDesc directly, taking ownership.
#[doc(hidden)]
pub fn from_filedesc(fd: FileDesc) -> PipeWriter {
PipeWriter { inner: PipeImpl::from_filedesc(fd) }
}
}

impl Reader for PipeStream {
impl Reader for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
self.inner.read(buf)
self.inner.inner.read(buf)
}
}

impl Writer for PipeStream {
impl Writer for PipeWriter {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
self.inner.write(buf)
self.inner.inner.write(buf)
}
}

Expand All @@ -117,11 +177,11 @@ mod test {
#[test]
fn partial_read() {
use os;
use io::pipe::PipeStream;
use io::pipe::{PipeReader,PipeWriter};

let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
let out = PipeStream::open(writer);
let mut input = PipeStream::open(reader);
let out = PipeWriter::open(writer);
let mut input = PipeReader::open(reader);
let (tx, rx) = channel();
spawn(proc() {
let mut out = out;
Expand Down
82 changes: 42 additions & 40 deletions src/libstd/io/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use collections::HashMap;
use hash::Hash;
#[cfg(windows)]
use std::hash::sip::SipState;
use io::pipe::{PipeStream, PipePair};
use io::pipe::{PipePair,PipeReader,PipeWriter};
use path::BytesContainer;

use sys;
Expand Down Expand Up @@ -82,15 +82,15 @@ pub struct Process {

/// Handle to the child's stdin, if the `stdin` field of this process's
/// `ProcessConfig` was `CreatePipe`. By default, this handle is `Some`.
pub stdin: Option<PipeStream>,
pub stdin: Option<PipeWriter>,

/// Handle to the child's stdout, if the `stdout` field of this process's
/// `ProcessConfig` was `CreatePipe`. By default, this handle is `Some`.
pub stdout: Option<PipeStream>,
pub stdout: Option<PipeReader>,

/// Handle to the child's stderr, if the `stderr` field of this process's
/// `ProcessConfig` was `CreatePipe`. By default, this handle is `Some`.
pub stderr: Option<PipeStream>,
pub stderr: Option<PipeReader>,
}

/// A representation of environment variable name
Expand Down Expand Up @@ -208,9 +208,9 @@ impl Command {
args: Vec::new(),
env: None,
cwd: None,
stdin: CreatePipe(true, false),
stdout: CreatePipe(false, true),
stderr: CreatePipe(false, true),
stdin: CreatePipe,
stdout: CreatePipe,
stderr: CreatePipe,
uid: None,
gid: None,
detach: false,
Expand Down Expand Up @@ -278,21 +278,21 @@ impl Command {
}

/// Configuration for the child process's stdin handle (file descriptor 0).
/// Defaults to `CreatePipe(true, false)` so the input can be written to.
/// Defaults to `CreatePipe` so the input can be written to.
pub fn stdin<'a>(&'a mut self, cfg: StdioContainer) -> &'a mut Command {
self.stdin = cfg;
self
}

/// Configuration for the child process's stdout handle (file descriptor 1).
/// Defaults to `CreatePipe(false, true)` so the output can be collected.
/// Defaults to `CreatePipe` so the output can be collected.
pub fn stdout<'a>(&'a mut self, cfg: StdioContainer) -> &'a mut Command {
self.stdout = cfg;
self
}

/// Configuration for the child process's stderr handle (file descriptor 2).
/// Defaults to `CreatePipe(false, true)` so the output can be collected.
/// Defaults to `CreatePipe` so the output can be collected.
pub fn stderr<'a>(&'a mut self, cfg: StdioContainer) -> &'a mut Command {
self.stderr = cfg;
self
Expand Down Expand Up @@ -323,9 +323,9 @@ impl Command {

/// Executes the command as a child process, which is returned.
pub fn spawn(&self) -> IoResult<Process> {
let (their_stdin, our_stdin) = try!(setup_io(self.stdin));
let (their_stdout, our_stdout) = try!(setup_io(self.stdout));
let (their_stderr, our_stderr) = try!(setup_io(self.stderr));
let (their_stdin, our_stdin) = try!(setup_io(Direction::Write, self.stdin));
let (our_stdout, their_stdout) = try!(setup_io(Direction::Read, self.stdout));
let (our_stderr, their_stderr) = try!(setup_io(Direction::Read, self.stderr));

match ProcessImp::spawn(self, their_stdin, their_stdout, their_stderr) {
Err(e) => Err(e),
Expand Down Expand Up @@ -396,30 +396,36 @@ impl fmt::Show for Command {
}
}

fn setup_io(io: StdioContainer) -> IoResult<(Option<PipeStream>, Option<PipeStream>)> {
let ours;
let theirs;
enum Direction{Read,Write}

fn setup_io(way:Direction, io: StdioContainer)
-> IoResult<(Option<PipeReader>, Option<PipeWriter>)> {
let reader;
let writer;
match io {
Ignored => {
theirs = None;
ours = None;
reader = None;
writer = None;
}
InheritFd(fd) => {
theirs = Some(PipeStream::from_filedesc(FileDesc::new(fd, false)));
ours = None;
}
CreatePipe(readable, _writable) => {
let PipePair { reader, writer } = try!(PipeStream::pair());
if readable {
theirs = Some(reader);
ours = Some(writer);
} else {
theirs = Some(writer);
ours = Some(reader);
match way {
Direction::Read => {
reader = Some(PipeReader::from_filedesc(FileDesc::new(fd, false)));
writer = None;
}
Direction::Write => {
reader = None;
writer = Some(PipeWriter::from_filedesc(FileDesc::new(fd, false)));
}
}
}
CreatePipe => {
let PipePair { reader:read, writer:write } = try!(PipePair::new());
reader = Some(read);
writer = Some(write);
}
}
Ok((theirs, ours))
Ok((reader, writer))
}

// Allow the sys module to get access to the Command state
Expand Down Expand Up @@ -473,11 +479,7 @@ pub enum StdioContainer {

/// Creates a pipe for the specified file descriptor which will be created
/// when the process is spawned.
///
/// The first boolean argument is whether the pipe is readable, and the
/// second is whether it is writable. These properties are from the view of
/// the *child* process, not the parent process.
CreatePipe(bool /* readable */, bool /* writable */),
CreatePipe,
}

/// Describes the result of a process after it has terminated.
Expand Down Expand Up @@ -686,7 +688,7 @@ impl Process {
/// fail.
pub fn wait_with_output(mut self) -> IoResult<ProcessOutput> {
drop(self.stdin.take());
fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<Vec<u8>>> {
fn read(stream: Option<PipeReader>) -> Receiver<IoResult<Vec<u8>>> {
let (tx, rx) = channel();
match stream {
Some(stream) => spawn(proc() {
Expand Down Expand Up @@ -809,7 +811,7 @@ mod tests {
#[test]
fn stdout_works() {
let mut cmd = Command::new("echo");
cmd.arg("foobar").stdout(CreatePipe(false, true));
cmd.arg("foobar").stdout(CreatePipe);
assert_eq!(run_output(cmd), "foobar\n".to_string());
}

Expand All @@ -819,7 +821,7 @@ mod tests {
let mut cmd = Command::new("/bin/sh");
cmd.arg("-c").arg("pwd")
.cwd(&Path::new("/"))
.stdout(CreatePipe(false, true));
.stdout(CreatePipe);
assert_eq!(run_output(cmd), "/\n".to_string());
}

Expand All @@ -828,8 +830,8 @@ mod tests {
fn stdin_works() {
let mut p = Command::new("/bin/sh")
.arg("-c").arg("read line; echo $line")
.stdin(CreatePipe(true, false))
.stdout(CreatePipe(false, true))
.stdin(CreatePipe)
.stdout(CreatePipe)
.spawn().unwrap();
p.stdin.as_mut().unwrap().write("foobar".as_bytes()).unwrap();
drop(p.stdin.take());
Expand Down
Loading