Skip to content

Update this crate to use the new polling breaking changes #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@ concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
parking = "2.0.0"
polling = "2.6.0"
polling = "3.0.0"
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] }
slab = "0.4.2"
socket2 = { version = "0.5.3", features = ["all"] }
tracing = { version = "0.1.37", default-features = false }
waker-fn = "1.1.0"

[build-dependencies]
autocfg = "1"

[dev-dependencies]
async-channel = "1"
async-net = "1"
Expand All @@ -54,3 +51,6 @@ timerfd = "1"

[target.'cfg(windows)'.dev-dependencies]
uds_windows = "1"

[patch.crates-io]
async-io = { path = "." }
16 changes: 0 additions & 16 deletions build.rs

This file was deleted.

14 changes: 9 additions & 5 deletions examples/linux-inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,21 @@ fn main() -> std::io::Result<()> {
future::block_on(async {
// Watch events in the current directory.
let mut inotify = Async::new(Inotify::init()?)?;
inotify
.get_mut()
.watches()
.add(".", WatchMask::ALL_EVENTS)?;

// SAFETY: We do not move the inner file descriptor out.
unsafe {
inotify
.get_mut()
.watches()
.add(".", WatchMask::ALL_EVENTS)?;
}
println!("Watching for filesystem events in the current directory...");
println!("Try opening a file to trigger some events.");
println!();

// Wait for events in a loop and print them on the screen.
loop {
for event in inotify.read_with_mut(read_op).await? {
for event in unsafe { inotify.read_with_mut(read_op).await? } {
println!("{:?}", event);
}
}
Expand Down
80 changes: 74 additions & 6 deletions examples/windows-uds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,88 @@

#[cfg(windows)]
fn main() -> std::io::Result<()> {
use std::ops::Deref;
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket};
use std::path::PathBuf;

use async_io::Async;
use blocking::Unblock;
use futures_lite::{future, io, prelude::*};
use futures_lite::{future, prelude::*};
use std::io;
use tempfile::tempdir;
use uds_windows::{UnixListener, UnixStream};

// n.b.: notgull: uds_windows does not support I/O safety uet, hence the wrapper types

struct UnixListener(uds_windows::UnixListener);

impl From<uds_windows::UnixListener> for UnixListener {
fn from(ul: uds_windows::UnixListener) -> Self {
Self(ul)
}
}

impl Deref for UnixListener {
type Target = uds_windows::UnixListener;

fn deref(&self) -> &uds_windows::UnixListener {
&self.0
}
}

impl AsSocket for UnixListener {
fn as_socket(&self) -> BorrowedSocket<'_> {
unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
}
}

struct UnixStream(uds_windows::UnixStream);

impl From<uds_windows::UnixStream> for UnixStream {
fn from(ul: uds_windows::UnixStream) -> Self {
Self(ul)
}
}

impl Deref for UnixStream {
type Target = uds_windows::UnixStream;

fn deref(&self) -> &uds_windows::UnixStream {
&self.0
}
}

impl AsSocket for UnixStream {
fn as_socket(&self) -> BorrowedSocket<'_> {
unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
}
}

impl io::Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
io::Read::read(&mut self.0, buf)
}
}

impl io::Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::Write::write(&mut self.0, buf)
}

fn flush(&mut self) -> io::Result<()> {
io::Write::flush(&mut self.0)
}
}

unsafe impl async_io::IoSafe for UnixStream {}

async fn client(addr: PathBuf) -> io::Result<()> {
// Connect to the address.
let stream = Async::new(UnixStream::connect(addr)?)?;
let stream = Async::new(UnixStream::from(uds_windows::UnixStream::connect(addr)?))?;
println!("Connected to {:?}", stream.get_ref().peer_addr()?);

// Pipe the stream to stdout.
let mut stdout = Unblock::new(std::io::stdout());
io::copy(&stream, &mut stdout).await?;
futures_lite::io::copy(stream, &mut stdout).await?;
Ok(())
}

Expand All @@ -32,7 +98,7 @@ fn main() -> std::io::Result<()> {

future::block_on(async {
// Create a listener.
let listener = Async::new(UnixListener::bind(&path)?)?;
let listener = Async::new(UnixListener::from(uds_windows::UnixListener::bind(&path)?))?;
println!("Listening on {:?}", listener.get_ref().local_addr()?);

future::try_zip(
Expand All @@ -42,7 +108,9 @@ fn main() -> std::io::Result<()> {
println!("Accepted a client");

// Send a message, drop the stream, and wait for the client.
Async::new(stream)?.write_all(b"Hello!\n").await?;
Async::new(UnixStream::from(stream))?
.write_all(b"Hello!\n")
.await?;
Ok(())
},
client(path),
Expand Down
Loading