Skip to content

Commit 3706c1e

Browse files
committed
Add non-panicking poll option for Task
1 parent f875ce8 commit 3706c1e

File tree

3 files changed

+154
-16
lines changed

3 files changed

+154
-16
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ flaky_test = "0.1"
2525
flume = { version = "0.10", default-features = false }
2626
once_cell = "1"
2727
smol = "1"
28+
29+
# rewrite dependencies to use the this version of async-task when running tests
30+
[patch.crates-io]
31+
async-task = { path = "." }

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ mod task;
8282
mod utils;
8383

8484
pub use crate::runnable::{spawn, spawn_unchecked, Runnable};
85-
pub use crate::task::Task;
85+
pub use crate::task::{FallibleTask, Task};
8686

8787
#[cfg(feature = "std")]
8888
pub use crate::runnable::spawn_local;

src/task.rs

Lines changed: 149 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,56 @@ impl<T> Task<T> {
124124
pub async fn cancel(self) -> Option<T> {
125125
let mut this = self;
126126
this.set_canceled();
127+
this.fallible().await
128+
}
127129

128-
struct Fut<T>(Task<T>);
129-
130-
impl<T> Future for Fut<T> {
131-
type Output = Option<T>;
132-
133-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134-
self.0.poll_task(cx)
135-
}
136-
}
137-
138-
Fut(this).await
130+
/// Converts this task into a [`FallibleTask`].
131+
///
132+
/// Like [`Task`], a fallible task will poll the task's output until it is
133+
/// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
134+
/// dropped without being run. Resolves to the task's output when completed,
135+
/// or [`None`] if it didn't complete.
136+
///
137+
/// # Examples
138+
///
139+
/// ```
140+
/// use smol::{future, Executor};
141+
/// use std::thread;
142+
///
143+
/// let ex = Executor::new();
144+
///
145+
/// // Spawn a future onto the executor.
146+
/// let task = ex.spawn(async {
147+
/// println!("Hello from a task!");
148+
/// 1 + 2
149+
/// })
150+
/// .fallible();
151+
///
152+
/// // Run an executor thread.
153+
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
154+
///
155+
/// // Wait for the task's output.
156+
/// assert_eq!(future::block_on(task), Some(3));
157+
/// ```
158+
///
159+
/// ```
160+
/// use smol::future;
161+
///
162+
/// // Schedule function which drops the runnable without running it.
163+
/// let schedule = move |runnable| drop(runnable);
164+
///
165+
/// // Create a task with the future and the schedule function.
166+
/// let (runnable, task) = async_task::spawn(async {
167+
/// println!("Hello from a task!");
168+
/// 1 + 2
169+
/// }, schedule);
170+
/// runnable.schedule();
171+
///
172+
/// // Wait for the task's output.
173+
/// assert_eq!(future::block_on(task.fallible()), None);
174+
/// ```
175+
pub fn fallible(self) -> FallibleTask<T> {
176+
FallibleTask { task: self }
139177
}
140178

141179
/// Puts the task in canceled state.
@@ -351,6 +389,12 @@ impl<T> Task<T> {
351389
}
352390
}
353391
}
392+
393+
fn header(&self) -> &Header {
394+
let ptr = self.ptr.as_ptr();
395+
let header = ptr as *const Header;
396+
unsafe { &*header }
397+
}
354398
}
355399

356400
impl<T> Drop for Task<T> {
@@ -373,11 +417,101 @@ impl<T> Future for Task<T> {
373417

374418
impl<T> fmt::Debug for Task<T> {
375419
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376-
let ptr = self.ptr.as_ptr();
377-
let header = ptr as *const Header;
378-
379420
f.debug_struct("Task")
380-
.field("header", unsafe { &(*header) })
421+
.field("header", self.header())
422+
.finish()
423+
}
424+
}
425+
426+
/// A spawned task with a fallible response.
427+
///
428+
/// This type behaves like [`Task`], however it produces an `Option<T>` when
429+
/// polled and will return `None` if the executor dropped its
430+
/// [`Runnable`][`super::Runnable`] without being run.
431+
///
432+
/// This can be useful to avoid the panic produced when polling the `Task`
433+
/// future if the executor dropped its `Runnable`.
434+
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
435+
pub struct FallibleTask<T> {
436+
task: Task<T>,
437+
}
438+
439+
impl<T> FallibleTask<T> {
440+
/// Detaches the task to let it keep running in the background.
441+
///
442+
/// # Examples
443+
///
444+
/// ```
445+
/// use smol::{Executor, Timer};
446+
/// use std::time::Duration;
447+
///
448+
/// let ex = Executor::new();
449+
///
450+
/// // Spawn a deamon future.
451+
/// ex.spawn(async {
452+
/// loop {
453+
/// println!("I'm a daemon task looping forever.");
454+
/// Timer::after(Duration::from_secs(1)).await;
455+
/// }
456+
/// })
457+
/// .fallible()
458+
/// .detach();
459+
/// ```
460+
pub fn detach(self) {
461+
self.task.detach()
462+
}
463+
464+
/// Cancels the task and waits for it to stop running.
465+
///
466+
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
467+
/// it didn't complete.
468+
///
469+
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
470+
/// canceling because it also waits for the task to stop running.
471+
///
472+
/// # Examples
473+
///
474+
/// ```
475+
/// use smol::{future, Executor, Timer};
476+
/// use std::thread;
477+
/// use std::time::Duration;
478+
///
479+
/// let ex = Executor::new();
480+
///
481+
/// // Spawn a deamon future.
482+
/// let task = ex.spawn(async {
483+
/// loop {
484+
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
485+
/// Timer::after(Duration::from_secs(1)).await;
486+
/// }
487+
/// })
488+
/// .fallible();
489+
///
490+
/// // Run an executor thread.
491+
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
492+
///
493+
/// future::block_on(async {
494+
/// Timer::after(Duration::from_secs(3)).await;
495+
/// task.cancel().await;
496+
/// });
497+
/// ```
498+
pub async fn cancel(self) -> Option<T> {
499+
self.task.cancel().await
500+
}
501+
}
502+
503+
impl<T> Future for FallibleTask<T> {
504+
type Output = Option<T>;
505+
506+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
507+
self.task.poll_task(cx)
508+
}
509+
}
510+
511+
impl<T> fmt::Debug for FallibleTask<T> {
512+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
513+
f.debug_struct("FallibleTask")
514+
.field("header", self.task.header())
381515
.finish()
382516
}
383517
}

0 commit comments

Comments
 (0)