Skip to content

Commit 00581f5

Browse files
authored
Add woken_while_running as another argument to the scheduling function (#42)
1 parent 9eb8196 commit 00581f5

File tree

4 files changed

+162
-20
lines changed

4 files changed

+162
-20
lines changed

src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ mod state;
9292
mod task;
9393
mod utils;
9494

95-
pub use crate::runnable::{spawn, spawn_unchecked, Builder, Runnable};
95+
pub use crate::runnable::{
96+
spawn, spawn_unchecked, Builder, Runnable, Schedule, ScheduleInfo, WithInfo,
97+
};
9698
pub use crate::task::{FallibleTask, Task};
9799

98100
#[cfg(feature = "std")]

src/raw.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use core::sync::atomic::{AtomicUsize, Ordering};
99
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1010

1111
use crate::header::Header;
12+
use crate::runnable::{Schedule, ScheduleInfo};
1213
use crate::state::*;
1314
use crate::utils::{abort, abort_on_panic, max, Layout};
1415
use crate::Runnable;
@@ -22,7 +23,7 @@ pub(crate) type Panic = core::convert::Infallible;
2223
/// The vtable for a task.
2324
pub(crate) struct TaskVTable {
2425
/// Schedules the task.
25-
pub(crate) schedule: unsafe fn(*const ()),
26+
pub(crate) schedule: unsafe fn(*const (), ScheduleInfo),
2627

2728
/// Drops the future inside the task.
2829
pub(crate) drop_future: unsafe fn(*const ()),
@@ -129,7 +130,7 @@ impl<F, T, S, M> RawTask<F, T, S, M> {
129130
impl<F, T, S, M> RawTask<F, T, S, M>
130131
where
131132
F: Future<Output = T>,
132-
S: Fn(Runnable<M>),
133+
S: Schedule<M>,
133134
{
134135
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
135136
Self::clone_waker,
@@ -279,7 +280,7 @@ where
279280
// time to schedule it.
280281
if state & RUNNING == 0 {
281282
// Schedule the task.
282-
Self::schedule(ptr);
283+
Self::schedule(ptr, ScheduleInfo::new(false));
283284
} else {
284285
// Drop the waker.
285286
Self::drop_waker(ptr);
@@ -348,7 +349,7 @@ where
348349
ptr: NonNull::new_unchecked(ptr as *mut ()),
349350
_marker: PhantomData,
350351
};
351-
(*raw.schedule)(task);
352+
(*raw.schedule).schedule(task, ScheduleInfo::new(false));
352353
}
353354

354355
break;
@@ -396,7 +397,7 @@ where
396397
(*raw.header)
397398
.state
398399
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
399-
Self::schedule(ptr);
400+
Self::schedule(ptr, ScheduleInfo::new(false));
400401
} else {
401402
// Otherwise, destroy the task right away.
402403
Self::destroy(ptr);
@@ -426,7 +427,7 @@ where
426427
///
427428
/// This function doesn't modify the state of the task. It only passes the task reference to
428429
/// its schedule function.
429-
unsafe fn schedule(ptr: *const ()) {
430+
unsafe fn schedule(ptr: *const (), info: ScheduleInfo) {
430431
let raw = Self::from_ptr(ptr);
431432

432433
// If the schedule function has captured variables, create a temporary waker that prevents
@@ -440,7 +441,7 @@ where
440441
ptr: NonNull::new_unchecked(ptr as *mut ()),
441442
_marker: PhantomData,
442443
};
443-
(*raw.schedule)(task);
444+
(*raw.schedule).schedule(task, info);
444445
}
445446

446447
/// Drops the future inside a task.
@@ -662,7 +663,7 @@ where
662663
} else if state & SCHEDULED != 0 {
663664
// The thread that woke the task up didn't reschedule it because
664665
// it was running so now it's our responsibility to do so.
665-
Self::schedule(ptr);
666+
Self::schedule(ptr, ScheduleInfo::new(true));
666667
return true;
667668
} else {
668669
// Drop the task reference.
@@ -682,12 +683,12 @@ where
682683
struct Guard<F, T, S, M>(RawTask<F, T, S, M>)
683684
where
684685
F: Future<Output = T>,
685-
S: Fn(Runnable<M>);
686+
S: Schedule<M>;
686687

687688
impl<F, T, S, M> Drop for Guard<F, T, S, M>
688689
where
689690
F: Future<Output = T>,
690-
S: Fn(Runnable<M>),
691+
S: Schedule<M>,
691692
{
692693
fn drop(&mut self) {
693694
let raw = self.0;

src/runnable.rs

+145-7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ use crate::raw::RawTask;
1313
use crate::state::*;
1414
use crate::Task;
1515

16+
mod sealed {
17+
use super::*;
18+
pub trait Sealed<M> {}
19+
20+
impl<M, F> Sealed<M> for F where F: Fn(Runnable<M>) {}
21+
22+
impl<M, F> Sealed<M> for WithInfo<F> where F: Fn(Runnable<M>, ScheduleInfo) {}
23+
}
24+
1625
/// A builder that creates a new task.
1726
#[derive(Debug)]
1827
pub struct Builder<M> {
@@ -30,6 +39,135 @@ impl<M: Default> Default for Builder<M> {
3039
}
3140
}
3241

42+
/// Extra scheduling information that can be passed to the scheduling function.
43+
///
44+
/// The data source of this struct is directly from the actual implementation
45+
/// of the crate itself, different from [`Runnable`]'s metadata, which is
46+
/// managed by the caller.
47+
///
48+
/// # Examples
49+
///
50+
/// ```
51+
/// use async_task::{Runnable, ScheduleInfo, WithInfo};
52+
/// use std::sync::{Arc, Mutex};
53+
///
54+
/// // The future inside the task.
55+
/// let future = async {
56+
/// println!("Hello, world!");
57+
/// };
58+
///
59+
/// // If the task gets woken up while running, it will be sent into this channel.
60+
/// let (s, r) = flume::unbounded();
61+
/// // Otherwise, it will be placed into this slot.
62+
/// let lifo_slot = Arc::new(Mutex::new(None));
63+
/// let schedule = move |runnable: Runnable, info: ScheduleInfo| {
64+
/// if info.woken_while_running {
65+
/// s.send(runnable).unwrap()
66+
/// } else {
67+
/// let last = lifo_slot.lock().unwrap().replace(runnable);
68+
/// if let Some(last) = last {
69+
/// s.send(last).unwrap()
70+
/// }
71+
/// }
72+
/// };
73+
///
74+
/// // Create the actual scheduler to be spawned with some future.
75+
/// let scheduler = WithInfo(schedule);
76+
/// // Create a task with the future and the scheduler.
77+
/// let (runnable, task) = async_task::spawn(future, scheduler);
78+
/// ```
79+
#[derive(Debug, Copy, Clone)]
80+
#[non_exhaustive]
81+
pub struct ScheduleInfo {
82+
/// Indicates whether the task gets woken up while running.
83+
///
84+
/// It is set to true usually because the task has yielded itself to the
85+
/// scheduler.
86+
pub woken_while_running: bool,
87+
}
88+
89+
impl ScheduleInfo {
90+
pub(crate) fn new(woken_while_running: bool) -> Self {
91+
ScheduleInfo {
92+
woken_while_running,
93+
}
94+
}
95+
}
96+
97+
/// The trait for scheduling functions.
98+
pub trait Schedule<M = ()>: sealed::Sealed<M> {
99+
/// The actual scheduling procedure.
100+
fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo);
101+
}
102+
103+
impl<M, F> Schedule<M> for F
104+
where
105+
F: Fn(Runnable<M>),
106+
{
107+
fn schedule(&self, runnable: Runnable<M>, _: ScheduleInfo) {
108+
self(runnable)
109+
}
110+
}
111+
112+
/// Pass a scheduling function with more scheduling information - a.k.a.
113+
/// [`ScheduleInfo`].
114+
///
115+
/// Sometimes, it's useful to pass the runnable's state directly to the
116+
/// scheduling function, such as whether it's woken up while running. The
117+
/// scheduler can thus use the information to determine its scheduling
118+
/// strategy.
119+
///
120+
/// The data source of [`ScheduleInfo`] is directly from the actual
121+
/// implementation of the crate itself, different from [`Runnable`]'s metadata,
122+
/// which is managed by the caller.
123+
///
124+
/// # Examples
125+
///
126+
/// ```
127+
/// use async_task::{ScheduleInfo, WithInfo};
128+
/// use std::sync::{Arc, Mutex};
129+
///
130+
/// // The future inside the task.
131+
/// let future = async {
132+
/// println!("Hello, world!");
133+
/// };
134+
///
135+
/// // If the task gets woken up while running, it will be sent into this channel.
136+
/// let (s, r) = flume::unbounded();
137+
/// // Otherwise, it will be placed into this slot.
138+
/// let lifo_slot = Arc::new(Mutex::new(None));
139+
/// let schedule = move |runnable, info: ScheduleInfo| {
140+
/// if info.woken_while_running {
141+
/// s.send(runnable).unwrap()
142+
/// } else {
143+
/// let last = lifo_slot.lock().unwrap().replace(runnable);
144+
/// if let Some(last) = last {
145+
/// s.send(last).unwrap()
146+
/// }
147+
/// }
148+
/// };
149+
///
150+
/// // Create a task with the future and the schedule function.
151+
/// let (runnable, task) = async_task::spawn(future, WithInfo(schedule));
152+
/// ```
153+
#[derive(Debug)]
154+
pub struct WithInfo<F>(pub F);
155+
156+
impl<F> From<F> for WithInfo<F> {
157+
fn from(value: F) -> Self {
158+
WithInfo(value)
159+
}
160+
}
161+
162+
impl<M, F> Schedule<M> for WithInfo<F>
163+
where
164+
F: Fn(Runnable<M>, ScheduleInfo),
165+
{
166+
fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo) {
167+
(self.0)(runnable, info)
168+
}
169+
}
170+
33171
impl Builder<()> {
34172
/// Creates a new task builder.
35173
///
@@ -226,7 +364,7 @@ impl<M> Builder<M> {
226364
F: FnOnce(&M) -> Fut,
227365
Fut: Future + Send + 'static,
228366
Fut::Output: Send + 'static,
229-
S: Fn(Runnable<M>) + Send + Sync + 'static,
367+
S: Schedule<M> + Send + Sync + 'static,
230368
{
231369
unsafe { self.spawn_unchecked(future, schedule) }
232370
}
@@ -273,7 +411,7 @@ impl<M> Builder<M> {
273411
F: FnOnce(&M) -> Fut,
274412
Fut: Future + 'static,
275413
Fut::Output: 'static,
276-
S: Fn(Runnable<M>) + Send + Sync + 'static,
414+
S: Schedule<M> + Send + Sync + 'static,
277415
{
278416
use std::mem::ManuallyDrop;
279417
use std::pin::Pin;
@@ -370,7 +508,7 @@ impl<M> Builder<M> {
370508
where
371509
F: FnOnce(&'a M) -> Fut,
372510
Fut: Future + 'a,
373-
S: Fn(Runnable<M>),
511+
S: Schedule<M>,
374512
M: 'a,
375513
{
376514
// Allocate large futures on the heap.
@@ -432,7 +570,7 @@ pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
432570
where
433571
F: Future + Send + 'static,
434572
F::Output: Send + 'static,
435-
S: Fn(Runnable) + Send + Sync + 'static,
573+
S: Schedule + Send + Sync + 'static,
436574
{
437575
unsafe { spawn_unchecked(future, schedule) }
438576
}
@@ -474,7 +612,7 @@ pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
474612
where
475613
F: Future + 'static,
476614
F::Output: 'static,
477-
S: Fn(Runnable) + Send + Sync + 'static,
615+
S: Schedule + Send + Sync + 'static,
478616
{
479617
Builder::new().spawn_local(move |()| future, schedule)
480618
}
@@ -511,7 +649,7 @@ where
511649
pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
512650
where
513651
F: Future,
514-
S: Fn(Runnable),
652+
S: Schedule,
515653
{
516654
Builder::new().spawn_unchecked(move |()| future, schedule)
517655
}
@@ -604,7 +742,7 @@ impl<M> Runnable<M> {
604742
mem::forget(self);
605743

606744
unsafe {
607-
((*header).vtable.schedule)(ptr);
745+
((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
608746
}
609747
}
610748

src/task.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use core::task::{Context, Poll};
99

1010
use crate::header::Header;
1111
use crate::raw::Panic;
12+
use crate::runnable::ScheduleInfo;
1213
use crate::state::*;
1314

1415
/// A spawned task.
@@ -210,7 +211,7 @@ impl<T, M> Task<T, M> {
210211
// If the task is not scheduled nor running, schedule it one more time so
211212
// that its future gets dropped by the executor.
212213
if state & (SCHEDULED | RUNNING) == 0 {
213-
((*header).vtable.schedule)(ptr);
214+
((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
214215
}
215216

216217
// Notify the awaiter that the task has been closed.
@@ -289,7 +290,7 @@ impl<T, M> Task<T, M> {
289290
// schedule dropping its future or destroy it.
290291
if state & !(REFERENCE - 1) == 0 {
291292
if state & CLOSED == 0 {
292-
((*header).vtable.schedule)(ptr);
293+
((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
293294
} else {
294295
((*header).vtable.destroy)(ptr);
295296
}

0 commit comments

Comments
 (0)