Skip to content

Support creating an Async with AsFd rather than AsRawFd #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ slab = "0.4.2"
socket2 = { version = "0.5.3", features = ["all"] }
waker-fn = "1.1.0"

[build-dependencies]
autocfg = "1"

[dev-dependencies]
async-channel = "1"
async-net = "1"
Expand All @@ -50,7 +47,7 @@ tempfile = "3"

[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.10.1", default-features = false }
timerfd = "1"
timerfd = "1.5"

[target.'cfg(windows)'.dev-dependencies]
uds_windows = "1"
16 changes: 0 additions & 16 deletions build.rs

This file was deleted.

10 changes: 8 additions & 2 deletions examples/unix-signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@

#[cfg(unix)]
fn main() -> std::io::Result<()> {
use std::os::unix::{io::AsRawFd, net::UnixStream};
use std::os::unix::{
io::{AsFd, AsRawFd},
net::UnixStream,
};

use async_io::Async;
use futures_lite::{future, prelude::*};

future::block_on(async {
// Create a Unix stream that receives a byte on each signal occurrence.
let (a, mut b) = Async::<UnixStream>::pair()?;
signal_hook::low_level::pipe::register_raw(signal_hook::consts::SIGINT, a.as_raw_fd())?;
signal_hook::low_level::pipe::register_raw(
signal_hook::consts::SIGINT,
a.as_fd().as_raw_fd(),
)?;
println!("Waiting for Ctrl-C...");

// Receive a byte that indicates the Ctrl-C signal occurred.
Expand Down
60 changes: 18 additions & 42 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,15 @@ use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};

#[cfg(all(not(async_io_no_io_safety), unix))]
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
#[cfg(unix)]
use std::{
os::unix::io::{AsRawFd, RawFd},
os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd},
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
path::Path,
};

#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, RawSocket};
#[cfg(all(not(async_io_no_io_safety), windows))]
use std::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket};
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket};

use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::stream::{self, Stream};
Expand Down Expand Up @@ -619,14 +615,14 @@ pub struct Async<T> {
impl<T> Unpin for Async<T> {}

#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
impl<T: AsFd> Async<T> {
/// Creates an async I/O handle.
///
/// This method will put the handle in non-blocking mode and register it in
/// [epoll]/[kqueue]/[event ports]/[IOCP].
///
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
/// `AsRawSocket`.
/// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
/// `AsSocket`.
///
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
Expand All @@ -645,14 +641,9 @@ impl<T: AsRawFd> Async<T> {
/// # std::io::Result::Ok(()) });
/// ```
pub fn new(io: T) -> io::Result<Async<T>> {
let raw = io.as_raw_fd();
let fd = io.as_fd();

// Put the file descriptor in non-blocking mode.
//
// Safety: We assume `as_raw_fd()` returns a valid fd. When we can
// depend on Rust >= 1.63, where `AsFd` is stabilized, and when
// `TimerFd` implements it, we can remove this unsafe and simplify this.
let fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(raw) };
cfg_if::cfg_if! {
// ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
// for now, as with the standard library, because it seems to behave
Expand All @@ -672,36 +663,29 @@ impl<T: AsRawFd> Async<T> {
}

Ok(Async {
source: Reactor::get().insert_io(raw)?,
source: Reactor::get().insert_io(fd.as_raw_fd())?,
io: Some(io),
})
}
}

#[cfg(unix)]
impl<T: AsRawFd> AsRawFd for Async<T> {
fn as_raw_fd(&self) -> RawFd {
self.get_ref().as_raw_fd()
}
}

#[cfg(all(not(async_io_no_io_safety), unix))]
impl<T: AsFd> AsFd for Async<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.get_ref().as_fd()
}
}

#[cfg(all(not(async_io_no_io_safety), unix))]
impl<T: AsRawFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
#[cfg(unix)]
impl<T: AsFd + From<OwnedFd>> TryFrom<OwnedFd> for Async<T> {
type Error = io::Error;

fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
Async::new(value.into())
}
}

#[cfg(all(not(async_io_no_io_safety), unix))]
#[cfg(unix)]
impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
type Error = io::Error;

Expand All @@ -711,14 +695,14 @@ impl<T: Into<OwnedFd>> TryFrom<Async<T>> for OwnedFd {
}

#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
impl<T: AsSocket> Async<T> {
/// Creates an async I/O handle.
///
/// This method will put the handle in non-blocking mode and register it in
/// [epoll]/[kqueue]/[event ports]/[IOCP].
///
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
/// `AsRawSocket`.
/// On Unix systems, the handle must implement `AsFd`, while on Windows it must implement
/// `AsSocket`.
///
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
Expand All @@ -737,8 +721,7 @@ impl<T: AsRawSocket> Async<T> {
/// # std::io::Result::Ok(()) });
/// ```
pub fn new(io: T) -> io::Result<Async<T>> {
let sock = io.as_raw_socket();
let borrowed = unsafe { rustix::fd::BorrowedFd::borrow_raw(sock) };
let borrowed = io.as_socket();

// Put the socket in non-blocking mode.
//
Expand All @@ -748,36 +731,29 @@ impl<T: AsRawSocket> Async<T> {
rustix::io::ioctl_fionbio(borrowed, true)?;

Ok(Async {
source: Reactor::get().insert_io(sock)?,
source: Reactor::get().insert_io(borrowed.as_raw_socket())?,
io: Some(io),
})
}
}

#[cfg(windows)]
impl<T: AsRawSocket> AsRawSocket for Async<T> {
fn as_raw_socket(&self) -> RawSocket {
self.get_ref().as_raw_socket()
}
}

#[cfg(all(not(async_io_no_io_safety), windows))]
impl<T: AsSocket> AsSocket for Async<T> {
fn as_socket(&self) -> BorrowedSocket<'_> {
self.get_ref().as_socket()
}
}

#[cfg(all(not(async_io_no_io_safety), windows))]
impl<T: AsRawSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
#[cfg(windows)]
impl<T: AsSocket + From<OwnedSocket>> TryFrom<OwnedSocket> for Async<T> {
type Error = io::Error;

fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
Async::new(value.into())
}
}

#[cfg(all(not(async_io_no_io_safety), windows))]
#[cfg(windows)]
impl<T: Into<OwnedSocket>> TryFrom<Async<T>> for OwnedSocket {
type Error = io::Error;

Expand Down