Skip to content

FusedFuture, FusedStream, and select! macro changes #1259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions futures-core/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ pub use core::future::Future;
mod future_obj;
pub use self::future_obj::{FutureObj, LocalFutureObj, UnsafeFutureObj};

/// A `Future` or `TryFuture` which tracks whether or not the underlying future
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll` (or `try_poll`) returned
/// `Poll::Ready`. However, `is_terminated` may also return `true` if a future
/// has become inactive and can no longer make progress and should be ignored
/// or dropped rather than being `poll`ed again.
pub trait FusedFuture {
/// Returns `true` if the underlying future should no longer be polled.
fn is_terminated(&self) -> bool;
}

/// A convenience for futures that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryFuture {
Expand Down
11 changes: 3 additions & 8 deletions futures-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@

#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.7/futures_core")]

#[doc(hidden)] pub use crate::future::Future;
#[doc(hidden)] pub use crate::future::TryFuture;

#[doc(hidden)] pub use crate::stream::Stream;
#[doc(hidden)] pub use crate::stream::TryStream;

#[doc(hidden)] pub use crate::task::Poll;

macro_rules! if_std {
($($i:item)*) => ($(
#[cfg(feature = "std")]
Expand All @@ -26,7 +18,10 @@ macro_rules! if_std {
}

pub mod future;
#[doc(hidden)] pub use self::future::{Future, FusedFuture, TryFuture};

pub mod stream;
#[doc(hidden)] pub use self::stream::{Stream, FusedStream, TryStream};

pub mod task;
#[doc(hidden)] pub use self::task::Poll;
13 changes: 13 additions & 0 deletions futures-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ impl<A, B> Stream for Either<A, B>
}
}

/// A `Stream` or `TryStream` which tracks whether or not the underlying stream
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
/// stream has become inactive and can no longer make progress and should be
/// ignored or dropped rather than being polled again.
pub trait FusedStream {
/// Returns `true` if the stream should no longer be polled.
fn is_terminated(&self) -> bool;
}

/// A convenience for streams that return `Result` values that includes
/// a variety of adapters tailored to such futures.
pub trait TryStream {
Expand Down
7 changes: 7 additions & 0 deletions futures-util/src/async_await/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
use core::marker::Unpin;
use futures_core::future::Future;

#[doc(hidden)]
pub use futures_core::future::FusedFuture;

#[macro_use]
mod poll;
pub use self::poll::*;
Expand All @@ -25,3 +28,7 @@ mod select;
#[doc(hidden)]
#[inline(always)]
pub fn assert_unpin<T: Future + Unpin>(_: &T) {}

#[doc(hidden)]
#[inline(always)]
pub fn assert_fused_future<T: Future + FusedFuture>(_: &T) {}
103 changes: 83 additions & 20 deletions futures-util/src/async_await/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
/// that finishes first.
///
/// `select!` can select over futures with different output types, but each
/// branch has to have the same return type. Inside each branch, the respective
/// future's output is available via a variable with the same name as the future.
/// branch has to have the same return type.
///
/// This macro is only usable inside of async functions, closures, and blocks.
///
Expand All @@ -14,58 +13,122 @@
/// ```
/// #![feature(pin, async_await, await_macro, futures_api)]
/// # futures::executor::block_on(async {
/// use futures::{select, future};
/// use futures::future::{self, FutureExt};
/// use futures::select;
/// let mut a = future::ready(4);
/// let mut b: future::Empty<()> = future::empty();
/// let mut b = future::empty::<()>();
///
/// let res = select! {
/// a => a + 1,
/// b => 0,
/// future(a as a) => a + 1,
/// future(b as _) => 0,
/// };
/// assert_eq!(res, 5);
/// # });
/// ```
///
/// In addition to `future(...)` matchers, `select!` accepts an `all complete`
/// branch which can be used to match on the case where all the futures passed
/// to select have already completed.
///
/// ```
/// #![feature(pin, async_await, await_macro, futures_api)]
/// # futures::executor::block_on(async {
/// use futures::future::{self, FutureExt};
/// use futures::select;
/// let mut a = future::ready(4);
/// let mut b = future::ready(6);
/// let mut total = 0;
///
/// loop {
/// select! {
/// future(a as a) => total += a,
/// future(b as b) => total += b,
/// all complete => break,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find all complete a little confusing, it makes me think of a branch where all are currently complete and ready to handle, not that they all had completed and been consumed in the past. I wonder if there’s a similarly succinct way to say this that implies that better.

Copy link
Member Author

@cramertj cramertj Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I puzzled on this a bit, but I think we're going to want to change this in the future anyways as we add cases for next(...) etc., so I don't think we need to make a final decision now. If you have ideas, please say so!

/// };
/// }
/// assert_eq!(total, 10);
/// # });
/// ```
#[macro_export]
macro_rules! select {
() => {
compile_error!("The `select!` macro requires at least one branch")
};
($(
$name:ident => $body:expr,
)*) => { {
(
$(
future($future_name:ident as $bound_pat:pat) => $body:expr,
)*
all complete => $complete:expr $(,)*
) => { {
// Require all arguments to be `Unpin` so that we don't have to pin them,
// allowing uncompleted futures to be reused by the caller after the
// `select!` resolves.
//
// Additionally, require all arguments to implement `FusedFuture` so that
// we can ensure that futures aren't polled after completion by use
// in successive `select!` statements.
$(
$crate::async_await::assert_unpin(&$name);
$crate::async_await::assert_unpin(&$future_name);
$crate::async_await::assert_fused_future(&$future_name);
)*

#[allow(bad_style)]
enum __PrivResult<$($name,)*> {
enum __PrivResult<$($future_name,)*> {
$(
$name($name),
$future_name($future_name),
)*
__Complete,
}

let __priv_res = await!($crate::future::poll_fn(|lw| {
let mut __any_polled = false;

$(
match $crate::core_reexport::future::Future::poll(
$crate::core_reexport::pin::Pin::new(&mut $name), lw)
{
$crate::core_reexport::task::Poll::Ready(x) =>
return $crate::core_reexport::task::Poll::Ready(__PrivResult::$name(x)),
$crate::core_reexport::task::Poll::Pending => {},
if !$crate::async_await::FusedFuture::is_terminated(& $future_name) {
__any_polled = true;
match $crate::core_reexport::future::Future::poll(
$crate::core_reexport::pin::Pin::new(&mut $future_name), lw)
{
$crate::core_reexport::task::Poll::Ready(x) =>
return $crate::core_reexport::task::Poll::Ready(
__PrivResult::$future_name(x)
),
$crate::core_reexport::task::Poll::Pending => {},
}
}
)*

if !__any_polled {
return $crate::core_reexport::task::Poll::Ready(
__PrivResult::__Complete);
}

$crate::core_reexport::task::Poll::Pending
}));
match __priv_res {
$(
__PrivResult::$name($name) => {
let _ = $name; // Suppress "unused" warning for binding name
__PrivResult::$future_name($bound_pat) => {
$body
}
)*
__PrivResult::__Complete => {
$complete
}
}
} };

(
$(
future($future_name:ident as $bound_pat:pat) => $body:expr $(,)*
)*
) => {
$crate::select! {
$(
future($future_name as $bound_pat) => $body,
)*
all complete =>
panic!("all futures in select! were completed, \
but no `all complete =>` handler was provided"),
}
};
}
6 changes: 6 additions & 0 deletions futures-util/src/future/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub(crate) enum Chain<Fut1, Fut2, Data> {
Empty,
}

impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data> {
pub(crate)fn is_terminated(&self) -> bool {
if let Chain::Empty = *self { true } else { false }
}
}

impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data>
where Fut1: Future,
Fut2: Future,
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/future/empty.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::marker;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{LocalWaker, Poll};

/// A future which is never resolved.
Expand All @@ -12,6 +12,10 @@ pub struct Empty<T> {
_data: marker::PhantomData<T>,
}

impl<T> FusedFuture for Empty<T> {
fn is_terminated(&self) -> bool { false }
}

/// Creates a future which never resolves, representing a computation that never
/// finishes.
///
Expand Down
9 changes: 8 additions & 1 deletion futures-util/src/future/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::chain::Chain;
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{LocalWaker, Poll};
use pin_utils::unsafe_pinned;

Expand Down Expand Up @@ -42,6 +42,13 @@ impl<Fut> fmt::Debug for Flatten<Fut>
}
}

impl<Fut> FusedFuture for Flatten<Fut>
where Fut: Future,
Fut::Output: Future,
{
fn is_terminated(&self) -> bool { self.state.is_terminated() }
}

impl<Fut> Future for Flatten<Fut>
where Fut: Future,
Fut::Output: Future,
Expand Down
14 changes: 13 additions & 1 deletion futures-util/src/future/flatten_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{LocalWaker, Poll};

/// Future for the `flatten_stream` combinator, flattening a
Expand Down Expand Up @@ -40,6 +40,18 @@ enum State<Fut: Future> {
Stream(Fut::Output),
}

impl<Fut> FusedStream for FlattenStream<Fut>
where Fut: Future,
Fut::Output: Stream + FusedStream,
{
fn is_terminated(&self) -> bool {
match &self.state {
State::Future(_) => false,
State::Stream(stream) => stream.is_terminated(),
}
}
}

impl<Fut> Stream for FlattenStream<Fut>
where Fut: Future,
Fut::Output: Stream,
Expand Down
18 changes: 13 additions & 5 deletions futures-util/src/future/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{LocalWaker, Poll};
use pin_utils::unsafe_pinned;

/// A future which "fuses" a future once it's been resolved.
/// A future which "fuses" a future once it has been resolved.
/// This wrapper can be used to turn any `Future` into a `FusedFuture`.
///
/// Normally futures can behave unpredictable once they're used after a future
/// has been resolved, but `Fuse` is always defined to return `Async::Pending`
/// from `poll` after it has resolved successfully or returned an error.
/// Normally, `poll`ing a future after it has completed (returned `Poll::Ready`
/// from a call to `poll`) can cause arbitrary behavior (panics, deadlock).
/// `Fuse` is always defined to return `Poll::Pending` from `poll` after it has
/// resolved.
///
/// This is created by the `Future::fuse` method.
#[derive(Debug)]
Expand All @@ -26,6 +28,12 @@ impl<Fut: Future> Fuse<Fut> {
}
}

impl<Fut: Future> FusedFuture for Fuse<Fut> {
fn is_terminated(&self) -> bool {
self.future.is_none()
}
}

impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;

Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/future/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::marker::Unpin;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{LocalWaker, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};

Expand Down Expand Up @@ -28,6 +28,10 @@ impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {

impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {}

impl<Fut: Future + FusedFuture, F> FusedFuture for Inspect<Fut, F> {
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}

impl<Fut, F> Future for Inspect<Fut, F>
where Fut: Future,
F: FnOnce(&Fut::Output),
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/future/lazy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::marker::Unpin;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{LocalWaker, Poll};

/// A future which, when polled, invokes a closure and yields its result.
Expand Down Expand Up @@ -41,6 +41,10 @@ pub fn lazy<F, R>(f: F) -> Lazy<F>
Lazy { f: Some(f) }
}

impl<F> FusedFuture for Lazy<F> {
fn is_terminated(&self) -> bool { self.f.is_none() }
}

impl<R, F> Future for Lazy<F>
where F: FnOnce(&LocalWaker) -> R,
{
Expand Down
Loading