diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4c4b2b..a10d225 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,15 @@ jobs: toolchain: nightly override: true - - name: tests + - name: tests async-io uses: actions-rs/cargo@v1 with: command: test + args: --features async-io + + - name: tests tokio + uses: actions-rs/cargo@v1 + with: + command: test + args: --features tokio + diff --git a/Cargo.toml b/Cargo.toml index 8ec3215..4e2e6db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,20 @@ repository = "https://github.com/async-rs/stop-token" description = "Experimental cooperative cancellation for async-std" +[package.metadata.docs.rs] +features = ["docs"] +rustdoc-args = ["--cfg", "feature=\"docs\""] + +[features] +docs = ["async-io"] + [dependencies] pin-project-lite = "0.2.0" -async-std = "1.8" +async-channel = "1.6.1" +futures-core = "0.3.17" +tokio = { version = "1.12.0", features = ["time"], optional = true } +async-io = { version = "1.6.0", optional = true } + +[dev-dependencies] +async-std = "1.10.0" +tokio = { version = "1.12.0", features = ["rt", "macros"] } diff --git a/src/deadline.rs b/src/deadline.rs new file mode 100644 index 0000000..df80381 --- /dev/null +++ b/src/deadline.rs @@ -0,0 +1,45 @@ +use core::fmt; +use std::{error::Error, future::Future, io}; + +/// An error returned when a future times out. +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] +pub struct TimedOutError { + _private: (), +} + +impl fmt::Debug for TimedOutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TimeoutError").finish() + } +} + +impl TimedOutError { + pub(crate) fn new() -> Self { + Self { _private: () } + } +} + +impl Error for TimedOutError {} + +impl Into for TimedOutError { + fn into(self) -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, "Future has timed out") + } +} + +impl fmt::Display for TimedOutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "Future has timed out".fmt(f) + } +} + +/// Conversion into a deadline. +/// +/// A deadline is a future which resolves after a certain period or event. +pub trait IntoDeadline { + /// Which kind of future are we turning this into? + type Deadline: Future; + + /// Creates a deadline from a value. + fn into_deadline(self) -> Self::Deadline; +} diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..ef4729e --- /dev/null +++ b/src/future.rs @@ -0,0 +1,56 @@ +//! Extension methods and types for the `Future` trait. + +use crate::{deadline::TimedOutError, IntoDeadline}; +use core::future::Future; +use core::pin::Pin; + +use pin_project_lite::pin_project; +use std::task::{Context, Poll}; + +/// Extend the `Future` trait with the `until` method. +pub trait FutureExt: Future { + /// Run a future until it resolves, or until a deadline is hit. + fn until(self, target: T) -> Stop + where + Self: Sized, + T: IntoDeadline, + { + Stop { + deadline: target.into_deadline(), + future: self, + } + } +} + +pin_project! { + /// Run a future until it resolves, or until a deadline is hit. + /// + /// This method is returned by [`FutureExt::deadline`]. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Stop { + #[pin] + future: F, + #[pin] + deadline: D, + } +} + +impl Future for Stop +where + F: Future, + D: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(()) = this.deadline.poll(cx) { + return Poll::Ready(Err(TimedOutError::new())); + } + match this.future.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(it) => Poll::Ready(Ok(it)), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 48d1476..b125fdc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,15 +5,6 @@ //! Experimental. The library works as is, breaking changes will bump major //! version, but there are no guarantees of long-term support. //! -//! Additionally, this library uses unstable cargo feature feature of `async-std` and, for -//! this reason, should be used like this: -//! -//! ```toml -//! [dependencies.stop-token] -//! version = "0.1.0" -//! features = [ "unstable" ] -//! ``` -//! //! # Motivation //! //! Rust futures come with a build-in cancellation mechanism: dropping a future @@ -47,13 +38,14 @@ //! //! ``` //! use async_std::prelude::*; +//! use stop_token::prelude::*; //! use stop_token::StopToken; //! //! struct Event; //! -//! async fn do_work(work: impl Stream + Unpin, stop_token: StopToken) { -//! let mut work = stop_token.stop_stream(work); -//! while let Some(event) = work.next().await { +//! async fn do_work(work: impl Stream + Unpin, stop: StopToken) { +//! let mut work = work.until(stop); +//! while let Some(Ok(event)) = work.next().await { //! process_event(event).await //! } //! } @@ -62,145 +54,31 @@ //! } //! ``` //! +//! # Features +//! +//! The `time` submodule is empty when no features are enabled. To implement [`Deadline`] +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: for use with the `async-std` or `smol` runtimes. +//! - `tokio`: for use with the `tokio` runtime. +//! //! # Lineage //! //! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads). -//! The `StopToken / StopTokenSource` terminology is borrowed from C++ paper P0660: https://wg21.link/p0660. - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use async_std::prelude::*; +//! The `StopToken / StopTokenSource` terminology is borrowed from [C++ paper P0660](https://wg21.link/p0660). -use async_std::channel::{self, Receiver, Sender}; -use pin_project_lite::pin_project; - -enum Never {} - -/// `StopSource` produces `StopToken` and cancels all of its tokens on drop. -/// -/// # Example: -/// -/// ```ignore -/// let stop_source = StopSource::new(); -/// let stop_token = stop_source.stop_token(); -/// schedule_some_work(stop_token); -/// drop(stop_source); // At this point, scheduled work notices that it is canceled. -/// ``` -#[derive(Debug)] -pub struct StopSource { - /// Solely for `Drop`. - _chan: Sender, - stop_token: StopToken, -} - -/// `StopToken` is a future which completes when the associated `StopSource` is dropped. -#[derive(Debug, Clone)] -pub struct StopToken { - chan: Receiver, -} - -impl Default for StopSource { - fn default() -> StopSource { - let (sender, receiver) = channel::bounded::(1); - - StopSource { - _chan: sender, - stop_token: StopToken { chan: receiver }, - } - } -} +pub mod future; +pub mod stream; +pub mod time; -impl StopSource { - /// Creates a new `StopSource`. - pub fn new() -> StopSource { - StopSource::default() - } - - /// Produces a new `StopToken`, associated with this source. - /// - /// Once the source is destroyed, `StopToken` future completes. - pub fn stop_token(&self) -> StopToken { - self.stop_token.clone() - } -} - -impl Future for StopToken { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let chan = Pin::new(&mut self.chan); - match Stream::poll_next(chan, cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(never)) => match never {}, - Poll::Ready(None) => Poll::Ready(()), - } - } -} - -impl StopToken { - /// Applies the token to the `stream`, such that the resulting stream - /// produces no more items once the token becomes cancelled. - pub fn stop_stream(&self, stream: S) -> StopStream { - StopStream { - stop_token: self.clone(), - stream, - } - } - - /// Applies the token to the `future`, such that the resulting future - /// completes with `None` if the token is cancelled. - pub fn stop_future(&self, future: F) -> StopFuture { - StopFuture { - stop_token: self.clone(), - future, - } - } -} - -pin_project! { - #[derive(Debug)] - pub struct StopStream { - #[pin] - stop_token: StopToken, - #[pin] - stream: S, - } -} - -impl Stream for StopStream { - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); - } - this.stream.poll_next(cx) - } -} - -pin_project! { - #[derive(Debug)] - pub struct StopFuture { - #[pin] - stop_token: StopToken, - #[pin] - future: F, - } -} +mod deadline; +mod stop_source; -impl Future for StopFuture { - type Output = Option; +pub use deadline::{IntoDeadline, TimedOutError}; +pub use stop_source::{StopSource, StopToken}; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); - } - match this.future.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(it) => Poll::Ready(Some(it)), - } - } +/// A prelude for `stop-token`. +pub mod prelude { + pub use crate::future::FutureExt as _; + pub use crate::stream::StreamExt as _; } diff --git a/src/stop_source.rs b/src/stop_source.rs new file mode 100644 index 0000000..4476c4e --- /dev/null +++ b/src/stop_source.rs @@ -0,0 +1,77 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_channel::{bounded, Receiver, Sender}; +use futures_core::stream::Stream; + +enum Never {} + +/// `StopSource` produces `StopToken` and cancels all of its tokens on drop. +/// +/// # Example: +/// +/// ```ignore +/// let stop_source = StopSource::new(); +/// let stop_token = stop_source.stop_token(); +/// schedule_some_work(stop_token); +/// drop(stop_source); // At this point, scheduled work notices that it is canceled. +/// ``` +#[derive(Debug)] +pub struct StopSource { + /// Solely for `Drop`. + _chan: Sender, + stop_token: StopToken, +} + +/// `StopToken` is a future which completes when the associated `StopSource` is dropped. +#[derive(Debug, Clone)] +pub struct StopToken { + chan: Receiver, +} + +impl Default for StopSource { + fn default() -> StopSource { + let (sender, receiver) = bounded::(1); + + StopSource { + _chan: sender, + stop_token: StopToken { chan: receiver }, + } + } +} + +impl StopSource { + /// Creates a new `StopSource`. + pub fn new() -> StopSource { + StopSource::default() + } + + /// Produces a new `StopToken`, associated with this source. + /// + /// Once the source is destroyed, `StopToken` future completes. + pub fn stop_token(&self) -> StopToken { + self.stop_token.clone() + } +} + +impl super::IntoDeadline for StopToken { + type Deadline = Self; + + fn into_deadline(self) -> Self::Deadline { + self + } +} + +impl Future for StopToken { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let chan = Pin::new(&mut self.chan); + match Stream::poll_next(chan, cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(never)) => match never {}, + Poll::Ready(None) => Poll::Ready(()), + } + } +} diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..66d4c27 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,56 @@ +//! Extension methods and types for the `Stream` trait. + +use crate::{deadline::TimedOutError, IntoDeadline}; +use core::future::Future; +use core::pin::Pin; + +use futures_core::Stream; +use pin_project_lite::pin_project; +use std::task::{Context, Poll}; + +pub trait StreamExt: Stream { + /// Applies the token to the `stream`, such that the resulting stream + /// produces no more items once the token becomes cancelled. + fn until(self, target: T) -> StopStream + where + Self: Sized, + T: IntoDeadline, + { + StopStream { + stream: self, + deadline: target.into_deadline(), + } + } +} + +impl StreamExt for S {} + +pin_project! { + /// Run a future until it resolves, or until a deadline is hit. + /// + /// This method is returned by [`FutureExt::deadline`]. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct StopStream { + #[pin] + stream: S, + #[pin] + deadline: D, + } +} + +impl Stream for StopStream +where + S: Stream, + D: Future, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if let Poll::Ready(()) = this.deadline.poll(cx) { + return Poll::Ready(Some(Err(TimedOutError::new()))); + } + this.stream.poll_next(cx).map(|el| el.map(|el| Ok(el))) + } +} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..17a3ba0 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,139 @@ +//! Create deadlines from `Duration` and `Instant` types. +//! +//! # Features +//! +//! This module is empty when no features are enabled. To implement deadlines +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: use this when using the `async-std` or `smol` runtimes. +//! - `tokio`: use this when using the `tokio` runtime. +//! +//! # Examples +//! +//! ``` +//! use std::time::Instant; +//! use async_std::prelude::*; +//! use stop_token::prelude::*; +//! use stop_token::StopToken; +//! +//! struct Event; +//! +//! async fn do_work(work: impl Stream + Unpin, until: Instant) { +//! let mut work = work.until(until); +//! while let Some(Ok(event)) = work.next().await { +//! process_event(event).await +//! } +//! } +//! +//! async fn process_event(_event: Event) { +//! } +//! ``` + +#[cfg(feature = "async-io")] +pub use asyncio::*; + +#[cfg(any(feature = "async-io", feature = "docs"))] +mod asyncio { + use async_io::Timer; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use crate::IntoDeadline; + + use pin_project_lite::pin_project; + + pin_project! { + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + #[pin] + delay: Timer, + } + } + + impl Future for Deadline { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.delay.poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } + } + + impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Timer::after(self), + } + } + } + + impl IntoDeadline for std::time::Instant { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Timer::at(self), + } + } + } +} + +#[cfg(feature = "tokio")] +pub use tokiooo::*; + +#[cfg(any(feature = "tokio", feature = "docs"))] +mod tokiooo { + use std::future::{pending, Future, Pending}; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tokio::time::{timeout, timeout_at, Instant as TokioInstant, Timeout}; + + use crate::IntoDeadline; + + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + delay: Pin>>>, + } + + impl Future for Deadline { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } + } + + impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Box::pin(timeout(self, pending())), + } + } + } + + impl IntoDeadline for std::time::Instant { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + let instant = TokioInstant::from(self); + Deadline { + delay: Box::pin(timeout_at(instant, pending())), + } + } + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 2fceda8..f302230 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,8 +1,9 @@ use std::time::Duration; use async_std::prelude::*; +use stop_token::prelude::*; -use async_std::channel; +use async_channel::bounded; use async_std::task; use stop_token::StopSource; @@ -10,15 +11,15 @@ use stop_token::StopSource; #[test] fn smoke() { task::block_on(async { - let (sender, receiver) = channel::bounded::(10); + let (sender, receiver) = bounded::(10); let stop_source = StopSource::new(); let task = task::spawn({ let stop_token = stop_source.stop_token(); let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = stop_token.stop_stream(receiver); - while let Some(x) = stream.next().await { + let mut stream = receiver.until(stop_token); + while let Some(Ok(x)) = stream.next().await { xs.push(x) } xs @@ -38,3 +39,59 @@ fn smoke() { assert_eq!(task.await, vec![1, 2, 3]); }) } + +#[cfg(feature = "async-io")] +#[test] +fn async_io_time() { + task::block_on(async { + let (sender, receiver) = bounded::(10); + let task = task::spawn({ + let receiver = receiver.clone(); + async move { + let mut xs = Vec::new(); + let mut stream = receiver.until(Duration::from_millis(200)); + while let Some(Ok(x)) = stream.next().await { + xs.push(x) + } + xs + } + }); + sender.send(1).await.unwrap(); + sender.send(2).await.unwrap(); + sender.send(3).await.unwrap(); + + task::sleep(Duration::from_millis(250)).await; + + sender.send(4).await.unwrap(); + sender.send(5).await.unwrap(); + sender.send(6).await.unwrap(); + assert_eq!(task.await, vec![1, 2, 3]); + }) +} + +#[cfg(feature = "tokio")] +#[tokio::test] +async fn tokio_time() { + let (sender, receiver) = bounded::(10); + let task = tokio::task::spawn({ + let receiver = receiver.clone(); + async move { + let mut xs = Vec::new(); + let mut stream = receiver.until(Duration::from_millis(200)); + while let Some(Ok(x)) = stream.next().await { + xs.push(x) + } + xs + } + }); + sender.send(1).await.unwrap(); + sender.send(2).await.unwrap(); + sender.send(3).await.unwrap(); + + task::sleep(Duration::from_millis(250)).await; + + sender.send(4).await.unwrap(); + sender.send(5).await.unwrap(); + sender.send(6).await.unwrap(); + assert_eq!(task.await.unwrap(), vec![1, 2, 3]); +}