Skip to content

Commit 6648651

Browse files
committed
auto merge of #13723 : alexcrichton/rust/pipe-connect-timeout, r=brson
This adds support for connecting to a unix socket with a timeout (a named pipe on windows), and accepting a connection with a timeout. The goal is to bring unix pipes/named sockets back in line with TCP support for timeouts. Similarly to the TCP sockets, all methods are marked #[experimental] due to uncertainty about the type of the timeout argument. This internally involved a good bit of refactoring to share as much code as possible between TCP servers and pipe servers, but the core implementation did not change drastically as part of this commit. cc #13523
2 parents 07598fc + 6328f7c commit 6648651

File tree

12 files changed

+531
-420
lines changed

12 files changed

+531
-420
lines changed

src/liblibc/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown};
225225
#[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
226226
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
227227
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
228-
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
228+
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
229229
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
230230
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
231231
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};

src/libnative/io/c_win32.rs

+2
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,6 @@ extern "system" {
5959
optname: libc::c_int,
6060
optval: *mut libc::c_char,
6161
optlen: *mut libc::c_int) -> libc::c_int;
62+
63+
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
6264
}

src/libnative/io/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub use self::process::Process;
4444
pub mod addrinfo;
4545
pub mod net;
4646
pub mod process;
47+
mod util;
4748

4849
#[cfg(unix)]
4950
#[path = "file_unix.rs"]
@@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory {
177178
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
178179
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
179180
}
180-
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
181-
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
181+
fn unix_connect(&mut self, path: &CString,
182+
timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
183+
pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
182184
}
183185
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
184186
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {

src/libnative/io/net.rs

+6-122
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@ use std::cast;
1313
use std::io::net::ip;
1414
use std::io;
1515
use std::mem;
16-
use std::os;
17-
use std::ptr;
1816
use std::rt::rtio;
1917
use std::sync::arc::UnsafeArc;
2018

2119
use super::{IoResult, retry, keep_going};
2220
use super::c;
21+
use super::util;
2322

2423
////////////////////////////////////////////////////////////////////////////////
2524
// sockaddr and misc bindings
@@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
118117
}
119118
}
120119

121-
fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
122-
val: libc::c_int) -> IoResult<T> {
120+
pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121+
val: libc::c_int) -> IoResult<T> {
123122
unsafe {
124123
let mut slot: T = mem::init();
125124
let mut len = mem::size_of::<T>() as libc::socklen_t;
@@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
145144
super::last_error()
146145
}
147146

148-
fn ms_to_timeval(ms: u64) -> libc::timeval {
149-
libc::timeval {
150-
tv_sec: (ms / 1000) as libc::time_t,
151-
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
152-
}
153-
}
154-
155-
fn timeout(desc: &'static str) -> io::IoError {
156-
io::IoError {
157-
kind: io::TimedOut,
158-
desc: desc,
159-
detail: None,
160-
}
161-
}
162-
163147
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
164148
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
165149

@@ -270,7 +254,7 @@ impl TcpStream {
270254
let addrp = &addr as *_ as *libc::sockaddr;
271255
match timeout {
272256
Some(timeout) => {
273-
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
257+
try!(util::connect_timeout(fd, addrp, len, timeout));
274258
Ok(ret)
275259
},
276260
None => {
@@ -282,84 +266,6 @@ impl TcpStream {
282266
}
283267
}
284268

285-
// See http://developerweb.net/viewtopic.php?id=3196 for where this is
286-
// derived from.
287-
fn connect_timeout(fd: sock_t,
288-
addrp: *libc::sockaddr,
289-
len: libc::socklen_t,
290-
timeout_ms: u64) -> IoResult<()> {
291-
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
292-
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
293-
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
294-
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
295-
296-
// Make sure the call to connect() doesn't block
297-
try!(set_nonblocking(fd, true));
298-
299-
let ret = match unsafe { libc::connect(fd, addrp, len) } {
300-
// If the connection is in progress, then we need to wait for it to
301-
// finish (with a timeout). The current strategy for doing this is
302-
// to use select() with a timeout.
303-
-1 if os::errno() as int == INPROGRESS as int ||
304-
os::errno() as int == WOULDBLOCK as int => {
305-
let mut set: c::fd_set = unsafe { mem::init() };
306-
c::fd_set(&mut set, fd);
307-
match await(fd, &mut set, timeout_ms) {
308-
0 => Err(timeout("connection timed out")),
309-
-1 => Err(last_error()),
310-
_ => {
311-
let err: libc::c_int = try!(
312-
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
313-
if err == 0 {
314-
Ok(())
315-
} else {
316-
Err(io::IoError::from_errno(err as uint, true))
317-
}
318-
}
319-
}
320-
}
321-
322-
-1 => Err(last_error()),
323-
_ => Ok(()),
324-
};
325-
326-
// be sure to turn blocking I/O back on
327-
try!(set_nonblocking(fd, false));
328-
return ret;
329-
330-
#[cfg(unix)]
331-
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
332-
let set = nb as libc::c_int;
333-
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
334-
}
335-
#[cfg(windows)]
336-
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
337-
let mut set = nb as libc::c_ulong;
338-
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
339-
Err(last_error())
340-
} else {
341-
Ok(())
342-
}
343-
}
344-
345-
#[cfg(unix)]
346-
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
347-
let start = ::io::timer::now();
348-
retry(|| unsafe {
349-
// Recalculate the timeout each iteration (it is generally
350-
// undefined what the value of the 'tv' is after select
351-
// returns EINTR).
352-
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
353-
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
354-
})
355-
}
356-
#[cfg(windows)]
357-
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
358-
let tv = ms_to_timeval(timeout);
359-
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
360-
}
361-
}
362-
363269
pub fn fd(&self) -> sock_t {
364270
// This unsafety is fine because it's just a read-only arc
365271
unsafe { (*self.inner.get()).fd }
@@ -533,7 +439,7 @@ impl TcpAcceptor {
533439

534440
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
535441
if self.deadline != 0 {
536-
try!(self.accept_deadline());
442+
try!(util::accept_deadline(self.fd(), self.deadline));
537443
}
538444
unsafe {
539445
let mut storage: libc::sockaddr_storage = mem::init();
@@ -550,25 +456,6 @@ impl TcpAcceptor {
550456
}
551457
}
552458
}
553-
554-
fn accept_deadline(&mut self) -> IoResult<()> {
555-
let mut set: c::fd_set = unsafe { mem::init() };
556-
c::fd_set(&mut set, self.fd());
557-
558-
match retry(|| {
559-
// If we're past the deadline, then pass a 0 timeout to select() so
560-
// we can poll the status of the socket.
561-
let now = ::io::timer::now();
562-
let ms = if self.deadline > now {0} else {self.deadline - now};
563-
let tv = ms_to_timeval(ms);
564-
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
565-
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
566-
}) {
567-
-1 => Err(last_error()),
568-
0 => Err(timeout("accept timed out")),
569-
_ => return Ok(()),
570-
}
571-
}
572459
}
573460

574461
impl rtio::RtioSocket for TcpAcceptor {
@@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
585472
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
586473
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
587474
fn set_timeout(&mut self, timeout: Option<u64>) {
588-
self.deadline = match timeout {
589-
None => 0,
590-
Some(t) => ::io::timer::now() + t,
591-
};
475+
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
592476
}
593477
}
594478

0 commit comments

Comments
 (0)