Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ enum SendErrorKind {

/// The error type returned from [`try_next`](Receiver::try_next).
pub struct TryRecvError {
_inner: (),
_priv: (),
}

impl fmt::Display for SendError {
Expand Down Expand Up @@ -834,7 +834,7 @@ impl<T> Receiver<T> {
Poll::Ready(msg) => {
Ok(msg)
},
Poll::Pending => Err(TryRecvError { _inner: () }),
Poll::Pending => Err(TryRecvError { _priv: () }),
}
}

Expand Down
16 changes: 12 additions & 4 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl<T> Inner<T> {
return Ok(Some(data));
}
}
Err(Canceled)
Err(Canceled { _priv: () })
} else {
Ok(None)
}
Expand Down Expand Up @@ -290,7 +290,7 @@ impl<T> Inner<T> {
return Poll::Ready(Ok(data));
}
}
Poll::Ready(Err(Canceled))
Poll::Ready(Err(Canceled { _priv: () }))
} else {
Poll::Pending
}
Expand Down Expand Up @@ -377,8 +377,16 @@ impl<T> Drop for Sender<T> {

/// Error returned from a [`Receiver`](Receiver) when the corresponding
/// [`Sender`](Sender) is dropped.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like this one as-is because it allows you to easily write it as a pattern-- e.g.

match recv.await {
    Ok(x) => ...,
    Err(Canceled) => ...
}

I can't really imagine what else we'd want to add to it in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, these are in util/channel, so we can change them if we need them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I tend to write it like:

match rx.await {
    Err(_canceled) => (),
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seanmonstar That doesn't actually assert that cancelled is what happened there, though-- you could be silencing a real error by accident.

pub struct Canceled;
#[derive(Clone, PartialEq, Eq)]
pub struct Canceled {
_priv: (),
}

impl fmt::Debug for Canceled {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Canceled").finish()
}
}

impl fmt::Display for Canceled {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
4 changes: 2 additions & 2 deletions futures-core/src/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub trait LocalSpawn {

/// An error that occurred during spawning.
pub struct SpawnError {
_hidden: (),
_priv: (),
}

impl fmt::Debug for SpawnError {
Expand All @@ -78,7 +78,7 @@ impl std::error::Error for SpawnError {}
impl SpawnError {
/// Spawning failed because the executor has been shut down.
pub fn shutdown() -> Self {
Self { _hidden: () }
Self { _priv: () }
}

/// Check whether spawning failed to the executor being shut down.
Expand Down
24 changes: 19 additions & 5 deletions futures-executor/src/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,29 @@ thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
///
/// For more details, see [`enter` documentation](enter()).
pub struct Enter {
_a: ()
_priv: (),
}

/// An error returned by `enter` if an execution scope has already been
/// entered.
#[derive(Debug)]
pub struct EnterError {
_a: (),
_priv: (),
}

impl fmt::Debug for EnterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EnterError").finish()
}
}

impl fmt::Display for EnterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "an execution scope has already been entered")
}
}

impl std::error::Error for EnterError {}

/// Marks the current thread as being within the dynamic extent of an
/// executor.
///
Expand All @@ -42,11 +55,11 @@ pub struct EnterError {
pub fn enter() -> Result<Enter, EnterError> {
ENTERED.with(|c| {
if c.get() {
Err(EnterError { _a: () })
Err(EnterError { _priv: () })
} else {
c.set(true);

Ok(Enter { _a: () })
Ok(Enter { _priv: () })
}
})
}
Expand All @@ -65,3 +78,4 @@ impl Drop for Enter {
});
}
}

25 changes: 17 additions & 8 deletions futures-util/src/future/abortable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::task::AtomicWaker;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;
use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::sync::Arc;
Expand Down Expand Up @@ -30,12 +31,12 @@ impl<Fut> Abortable<Fut> where Fut: Future {
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::{Abortable, AbortHandle, Aborted};
/// use futures::future::{Abortable, AbortHandle};
///
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
/// let future = Abortable::new(async { 2 }, abort_registration);
/// abort_handle.abort();
/// assert_eq!(future.await, Err(Aborted));
/// assert!(future.await.is_err());
/// # });
/// ```
pub fn new(future: Fut, reg: AbortRegistration) -> Self {
Expand Down Expand Up @@ -70,12 +71,12 @@ impl AbortHandle {
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::{Abortable, AbortHandle, Aborted};
/// use futures::future::{Abortable, AbortHandle};
///
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
/// let future = Abortable::new(async { 2 }, abort_registration);
/// abort_handle.abort();
/// assert_eq!(future.await, Err(Aborted));
/// assert!(future.await.is_err());
/// # });
/// ```
pub fn new_pair() -> (Self, AbortRegistration) {
Expand Down Expand Up @@ -121,16 +122,24 @@ pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
}

/// Indicator that the `Abortable` future was aborted.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as with Canceled-- this is nice for matching on, and I can't forsee wanting to add anything here.

pub struct Aborted;
#[derive(Clone, Eq, PartialEq)]
pub struct Aborted {
_priv: (),
}

impl fmt::Debug for Aborted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Aborted").finish()
}
}

impl<Fut> Future for Abortable<Fut> where Fut: Future {
type Output = Result<Fut::Output, Aborted>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the future has been aborted
if self.inner.cancel.load(Ordering::Relaxed) {
return Poll::Ready(Err(Aborted))
return Poll::Ready(Err(Aborted { _priv: () }))
}

// attempt to complete the future
Expand All @@ -146,7 +155,7 @@ impl<Fut> Future for Abortable<Fut> where Fut: Future {
// Checking with `Relaxed` is sufficient because `register` introduces an
// `AcqRel` barrier.
if self.inner.cancel.load(Ordering::Relaxed) {
return Poll::Ready(Err(Aborted))
return Poll::Ready(Err(Aborted { _priv: () }))
}

Poll::Pending
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ pub trait TryStreamExt: TryStream {
///
/// // The final result is an error because the second future
/// // resulted in an error.
/// assert_eq!(Err(oneshot::Canceled), fut.await);
/// assert!(fut.await.is_err());
/// # })
/// ```
#[cfg_attr(
Expand Down
9 changes: 6 additions & 3 deletions futures/tests/abortable.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::{abortable, Aborted, FutureExt};
use futures::future::{abortable, FutureExt};
use futures::task::{Context, Poll};
use futures_test::task::new_count_waker;

Expand All @@ -10,7 +10,7 @@ fn abortable_works() {
let (abortable_rx, abort_handle) = abortable(a_rx);

abort_handle.abort();
assert_eq!(Err(Aborted), block_on(abortable_rx));
assert!(block_on(abortable_rx).is_err());
}

#[test]
Expand All @@ -25,7 +25,10 @@ fn abortable_awakens() {
assert_eq!(counter, 0);
abort_handle.abort();
assert_eq!(counter, 1);
assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx));
match abortable_rx.poll_unpin(&mut cx) {
Poll::Ready(Err(_)) => {}
_ => unreachable!()
}
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ fn iter_mut_cancel() {
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert!(iter.next().unwrap().is_err());
assert!(iter.next().unwrap().is_err());
assert!(iter.next().unwrap().is_err());
assert_eq!(iter.next(), None);
}

Expand Down
4 changes: 2 additions & 2 deletions futures/tests/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn oneshot_drop_tx1() {
drop(tx1);
rx1.map(move |result| tx2.send(result).unwrap()).run_in_background();

assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
assert!(rx2.recv().unwrap().is_err());
}

#[test]
Expand All @@ -55,7 +55,7 @@ fn oneshot_drop_tx2() {
rx1.map(move |result| tx2.send(result).unwrap()).run_in_background();
t.join().unwrap();

assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
assert!(rx2.recv().unwrap().is_err());
}

#[test]
Expand Down
10 changes: 8 additions & 2 deletions futures/tests/ready_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,19 @@ fn resolving_errors() {

drop(tx2);

assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the bad thing about this PR is that we cannot do this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, actually, this is not just a problem for this PR. I also encountered this in #1785.

match queue.poll_next_unpin(cx) {
Poll::Ready(Some(Err(_))) => {}
_ => unreachable!(),
}
assert!(!queue.poll_next_unpin(cx).is_ready());

drop(tx1);
tx3.send("world2").unwrap();

assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
match queue.poll_next_unpin(cx) {
Poll::Ready(Some(Err(_))) => {}
_ => unreachable!(),
}
assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
}));
Expand Down