From 73f0376bfdd8ea2e88a2712729dd8c1d558f2189 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Wed, 22 Mar 2023 09:57:53 +0800 Subject: [PATCH 1/7] Add spawn*2 --- src/lib.rs | 4 +- src/raw.rs | 20 +-- src/runnable.rs | 363 +++++++++++++++++++++++++++++++++++++++++++++++- src/task.rs | 4 +- 4 files changed, 375 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 19eb77d..3d2f427 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,8 +92,8 @@ mod state; mod task; mod utils; -pub use crate::runnable::{spawn, spawn_unchecked, Builder, Runnable}; +pub use crate::runnable::{spawn, spawn2, spawn_unchecked, spawn_unchecked2, Builder, Runnable}; pub use crate::task::{FallibleTask, Task}; #[cfg(feature = "std")] -pub use crate::runnable::spawn_local; +pub use crate::runnable::{spawn_local, spawn_local2}; diff --git a/src/raw.rs b/src/raw.rs index 4bba757..3b9289d 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -22,7 +22,7 @@ pub(crate) type Panic = core::convert::Infallible; /// The vtable for a task. pub(crate) struct TaskVTable { /// Schedules the task. - pub(crate) schedule: unsafe fn(*const ()), + pub(crate) schedule: unsafe fn(*const (), bool), /// Drops the future inside the task. pub(crate) drop_future: unsafe fn(*const ()), @@ -129,7 +129,7 @@ impl RawTask { impl RawTask where F: Future, - S: Fn(Runnable), + S: Fn(Runnable, bool), { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, @@ -279,7 +279,7 @@ where // time to schedule it. if state & RUNNING == 0 { // Schedule the task. - Self::schedule(ptr); + Self::schedule(ptr, false); } else { // Drop the waker. Self::drop_waker(ptr); @@ -348,7 +348,7 @@ where ptr: NonNull::new_unchecked(ptr as *mut ()), _marker: PhantomData, }; - (*raw.schedule)(task); + (*raw.schedule)(task, false); } break; @@ -396,7 +396,7 @@ where (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule(ptr); + Self::schedule(ptr, true); } else { // Otherwise, destroy the task right away. Self::destroy(ptr); @@ -426,7 +426,7 @@ where /// /// This function doesn't modify the state of the task. It only passes the task reference to /// its schedule function. - unsafe fn schedule(ptr: *const ()) { + unsafe fn schedule(ptr: *const (), woken_while_running: bool) { let raw = Self::from_ptr(ptr); // If the schedule function has captured variables, create a temporary waker that prevents @@ -440,7 +440,7 @@ where ptr: NonNull::new_unchecked(ptr as *mut ()), _marker: PhantomData, }; - (*raw.schedule)(task); + (*raw.schedule)(task, woken_while_running); } /// Drops the future inside a task. @@ -662,7 +662,7 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule(ptr); + Self::schedule(ptr, true); return true; } else { // Drop the task reference. @@ -682,12 +682,12 @@ where struct Guard(RawTask) where F: Future, - S: Fn(Runnable); + S: Fn(Runnable, bool); impl Drop for Guard where F: Future, - S: Fn(Runnable), + S: Fn(Runnable, bool), { fn drop(&mut self) { let raw = self.0; diff --git a/src/runnable.rs b/src/runnable.rs index e371176..42c6617 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -231,6 +231,65 @@ impl Builder { unsafe { self.spawn_unchecked(future, schedule) } } + /// Creates a new task, with an additional argument in the schedule function. + /// + /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its + /// output. + /// + /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] + /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run + /// again. + /// + /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. + /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it + /// should push it into a task queue so that it can be processed later. + /// + /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider + /// using [`spawn_local2()`] or [`spawn_unchecked2()`] instead. + /// + /// # Arguments + /// + /// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the + /// task is woken up while running. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// use std::sync::{Arc, Mutex}; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // A function that schedules the task when it gets woken up while runninng. + /// let (s, r) = flume::unbounded(); + /// // Otherwise, it will be placed into this slot. + /// let lifo_slot = Arc::new(Mutex::new(None)); + /// let schedule = move |runnable, woken_while_running| { + /// if woken_while_running { + /// s.send(runnable).unwrap() + /// } else { + /// let last = lifo_slot.lock().unwrap().replace(runnable); + /// if let Some(last) = last { + /// s.send(last).unwrap() + /// } + /// } + /// }; + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = Builder::new().spawn2(|()| future, schedule); + /// ``` + pub fn spawn2(self, future: F, schedule: S) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + S: Fn(Runnable, bool) + Send + Sync + 'static, + { + unsafe { self.spawn_unchecked2(future, schedule) } + } + /// Creates a new thread-local task. /// /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the @@ -274,6 +333,69 @@ impl Builder { Fut: Future + 'static, Fut::Output: 'static, S: Fn(Runnable) + Send + Sync + 'static, + { + self.spawn_local2(future, move |runnable, _| schedule(runnable)) + } + + /// Creates a new thread-local task, with an additional argument in the schedule function.. + /// + /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the + /// [`Runnable`] is used or dropped on another thread, a panic will occur. + /// + /// This function is only available when the `std` feature for this crate is enabled. + /// + /// # Arguments + /// + /// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the + /// task is woken up while running. + /// + /// # Examples + /// + /// ``` + /// use async_task::{Builder, Runnable}; + /// use flume::{Receiver, Sender}; + /// use std::{rc::Rc, cell::RefCell}; + /// + /// thread_local! { + /// // A queue that holds scheduled tasks. + /// static QUEUE: (Sender, Receiver) = flume::unbounded(); + /// + /// // A slot intended to be fetched first when picking tasks to run. + /// static LIFO_SLOT: RefCell> = RefCell::new(None); + /// } + /// + /// // Make a non-Send future. + /// let msg: Rc = "Hello, world!".into(); + /// let future = async move { + /// println!("{}", msg); + /// }; + /// + /// // A function that schedules the task when it gets woken up. + /// let s = QUEUE.with(|(s, _)| s.clone()); + /// let schedule = move |runnable, woken_while_running| { + /// if woken_while_running { + /// s.send(runnable).unwrap() + /// } else { + /// let last = LIFO_SLOT.with(|slot| slot.borrow_mut().replace(runnable)); + /// if let Some(last) = last { + /// s.send(last).unwrap() + /// } + /// } + /// }; + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = Builder::new().spawn_local2(move |()| future, schedule); + /// ``` + #[cfg(feature = "std")] + pub fn spawn_local2( + self, + future: F, + schedule: S, + ) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + 'static, + Fut::Output: 'static, + S: Fn(Runnable, bool) + Send + Sync + 'static, { use std::mem::ManuallyDrop; use std::pin::Pin; @@ -328,7 +450,7 @@ impl Builder { } }; - unsafe { self.spawn_unchecked(future, schedule) } + unsafe { self.spawn_unchecked2(future, schedule) } } /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. @@ -372,6 +494,70 @@ impl Builder { Fut: Future + 'a, S: Fn(Runnable), M: 'a, + { + // Allocate large futures on the heap. + self.spawn_unchecked2(future, move |task, _| schedule(task)) + } + + /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds, with an additional argument + /// in the schedule function. + /// + /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and + /// `'static` on `future` and `schedule`. + /// + /// # Arguments + /// + /// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the + /// task is woken up while running. + /// + /// # Safety + /// + /// - If `future`'s output is not [`Send`], its [`Runnable`] must be used and dropped on the original + /// thread. + /// - If `future`'s output is not `'static`, borrowed variables must outlive its [`Runnable`]. + /// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on + /// the original thread. + /// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// use std::sync::{Arc, Mutex}; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // If the task gets woken up while running, it will be sent into this channel. + /// let (s, r) = flume::unbounded(); + /// // Otherwise, it will be placed into this slot. + /// let lifo_slot = Arc::new(Mutex::new(None)); + /// let schedule = move |runnable, woken_while_running| { + /// if woken_while_running { + /// s.send(runnable).unwrap() + /// } else { + /// let last = lifo_slot.lock().unwrap().replace(runnable); + /// if let Some(last) = last { + /// s.send(last).unwrap() + /// } + /// } + /// }; + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked2(move |()| future, schedule) }; + /// ``` + pub unsafe fn spawn_unchecked2<'a, F, Fut, S>( + self, + future: F, + schedule: S, + ) -> (Runnable, Task) + where + F: FnOnce(&'a M) -> Fut, + Fut: Future + 'a, + S: Fn(Runnable, bool), + M: 'a, { // Allocate large futures on the heap. let ptr = if mem::size_of::() >= 2048 { @@ -437,6 +623,64 @@ where unsafe { spawn_unchecked(future, schedule) } } +/// Creates a new task, with an additional argument in the schedule function. +/// +/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its +/// output. +/// +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. +/// +/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. +/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it +/// should push it into a task queue so that it can be processed later. +/// +/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider +/// using [`spawn_local2()`] or [`spawn_unchecked2()`] instead. +/// +/// # Arguments +/// +/// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the +/// task is woken up while running. +/// +/// # Examples +/// +/// ``` +/// use async_task::Builder; +/// use std::sync::{Arc, Mutex}; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // A function that schedules the task when it gets woken up while runninng. +/// let (s, r) = flume::unbounded(); +/// // Otherwise, it will be placed into this slot. +/// let lifo_slot = Arc::new(Mutex::new(None)); +/// let schedule = move |runnable, woken_while_running| { +/// if woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = lifo_slot.lock().unwrap().replace(runnable); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = Builder::new().spawn2(|()| future, schedule); +/// ``` +pub fn spawn2(future: F, schedule: S) -> (Runnable, Task) +where + F: Future + Send + 'static, + F::Output: Send + 'static, + S: Fn(Runnable, bool) + Send + Sync + 'static, +{ + unsafe { spawn_unchecked2(future, schedule) } +} + /// Creates a new thread-local task. /// /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the @@ -479,6 +723,64 @@ where Builder::new().spawn_local(move |()| future, schedule) } +/// Creates a new thread-local task, with an additional argument in the schedule function.. +/// +/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the +/// [`Runnable`] is used or dropped on another thread, a panic will occur. +/// +/// This function is only available when the `std` feature for this crate is enabled. +/// +/// # Arguments +/// +/// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the +/// task is woken up while running. +/// +/// # Examples +/// +/// ``` +/// use async_task::{Builder, Runnable}; +/// use flume::{Receiver, Sender}; +/// use std::{rc::Rc, cell::RefCell}; +/// +/// thread_local! { +/// // A queue that holds scheduled tasks. +/// static QUEUE: (Sender, Receiver) = flume::unbounded(); +/// +/// // A slot intended to be fetched first when picking tasks to run. +/// static LIFO_SLOT: RefCell> = RefCell::new(None); +/// } +/// +/// // Make a non-Send future. +/// let msg: Rc = "Hello, world!".into(); +/// let future = async move { +/// println!("{}", msg); +/// }; +/// +/// // A function that schedules the task when it gets woken up. +/// let s = QUEUE.with(|(s, _)| s.clone()); +/// let schedule = move |runnable, woken_while_running| { +/// if woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = LIFO_SLOT.with(|slot| slot.borrow_mut().replace(runnable)); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = Builder::new().spawn_local2(move |()| future, schedule); +/// ``` +#[cfg(feature = "std")] +pub fn spawn_local2(future: F, schedule: S) -> (Runnable, Task) +where + F: Future + 'static, + F::Output: 'static, + S: Fn(Runnable, bool) + Send + Sync + 'static, +{ + Builder::new().spawn_local2(move |()| future, schedule) +} + /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. /// /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and @@ -516,6 +818,63 @@ where Builder::new().spawn_unchecked(move |()| future, schedule) } +/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds, with an additional argument +/// in the schedule function. +/// +/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and +/// `'static` on `future` and `schedule`. +/// +/// # Arguments +/// +/// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the +/// task is woken while running. +/// +/// # Safety +/// +/// - If `future`'s output is not [`Send`], its [`Runnable`] must be used and dropped on the original +/// thread. +/// - If `future`'s output is not `'static`, borrowed variables must outlive its [`Runnable`]. +/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on +/// the original thread. +/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. +/// +/// # Examples +/// +/// ``` +/// use async_task::Builder; +/// use std::sync::{Arc, Mutex}; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up while running, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// // Otherwise, it will be placed into this slot. +/// let lifo_slot = Arc::new(Mutex::new(None)); +/// let schedule = move |runnable, woken_while_running| { +/// if woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = lifo_slot.lock().unwrap().replace(runnable); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = unsafe { Builder::new().spawn_unchecked2(move |()| future, schedule) }; +/// ``` +pub unsafe fn spawn_unchecked2(future: F, schedule: S) -> (Runnable, Task) +where + F: Future, + S: Fn(Runnable, bool), +{ + Builder::new().spawn_unchecked2(move |()| future, schedule) +} + /// A handle to a runnable task. /// /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is @@ -604,7 +963,7 @@ impl Runnable { mem::forget(self); unsafe { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, false); } } diff --git a/src/task.rs b/src/task.rs index 5bf8b46..2f86989 100644 --- a/src/task.rs +++ b/src/task.rs @@ -210,7 +210,7 @@ impl Task { // If the task is not scheduled nor running, schedule it one more time so // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, false); } // Notify the awaiter that the task has been closed. @@ -289,7 +289,7 @@ impl Task { // schedule dropping its future or destroy it. if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, false); } else { ((*header).vtable.destroy)(ptr); } From dc0b37d74fca0a9e646c76b4024f689e611c822e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Wed, 22 Mar 2023 10:09:45 +0800 Subject: [PATCH 2/7] Fix some document --- src/runnable.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/runnable.rs b/src/runnable.rs index 42c6617..336dbcf 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -249,7 +249,7 @@ impl Builder { /// /// # Arguments /// - /// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the + /// - `woken_while_running` - the second argument in the schedule function, set true when the /// task is woken up while running. /// /// # Examples @@ -346,7 +346,7 @@ impl Builder { /// /// # Arguments /// - /// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the + /// - `woken_while_running` - the second argument in the schedule function, set true when the /// task is woken up while running. /// /// # Examples @@ -507,7 +507,7 @@ impl Builder { /// /// # Arguments /// - /// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the + /// - `woken_while_running` - the second argument in the schedule function, set true when the /// task is woken up while running. /// /// # Safety @@ -641,7 +641,7 @@ where /// /// # Arguments /// -/// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the +/// - `woken_while_running` - the second argument in the schedule function, set true when the /// task is woken up while running. /// /// # Examples @@ -732,7 +732,7 @@ where /// /// # Arguments /// -/// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the +/// - `woken_while_running` - the second argument in the schedule function, set true when the /// task is woken up while running. /// /// # Examples @@ -826,7 +826,7 @@ where /// /// # Arguments /// -/// - `woken_while_running` - e.g. the second argument in the schedule function, set true when the +/// - `woken_while_running` - the second argument in the schedule function, set true when the /// task is woken while running. /// /// # Safety From a9dc768aefd71c25d7a70ad01e03cce14465a936 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Wed, 22 Mar 2023 10:46:37 +0800 Subject: [PATCH 3/7] Remove old comment --- src/runnable.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/runnable.rs b/src/runnable.rs index 336dbcf..6b3bf60 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -495,7 +495,6 @@ impl Builder { S: Fn(Runnable), M: 'a, { - // Allocate large futures on the heap. self.spawn_unchecked2(future, move |task, _| schedule(task)) } From b4110a2c4c840beb01312c4a93d5b280c7de0469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Thu, 23 Mar 2023 10:56:35 +0800 Subject: [PATCH 4/7] Replace spawn*2 with a more generalized trait --- src/lib.rs | 6 +- src/raw.rs | 41 +++- src/runnable.rs | 496 +++++++++++++----------------------------------- src/task.rs | 15 +- 4 files changed, 178 insertions(+), 380 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3d2f427..6c0c272 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,8 +92,10 @@ mod state; mod task; mod utils; -pub use crate::runnable::{spawn, spawn2, spawn_unchecked, spawn_unchecked2, Builder, Runnable}; +pub use crate::runnable::{ + spawn, spawn_unchecked, Builder, Runnable, Schedule, ScheduleInfo, WithInfo, +}; pub use crate::task::{FallibleTask, Task}; #[cfg(feature = "std")] -pub use crate::runnable::{spawn_local, spawn_local2}; +pub use crate::runnable::spawn_local; diff --git a/src/raw.rs b/src/raw.rs index 3b9289d..b1ac161 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -9,6 +9,7 @@ use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use crate::header::Header; +use crate::runnable::{Schedule, ScheduleInfo}; use crate::state::*; use crate::utils::{abort, abort_on_panic, max, Layout}; use crate::Runnable; @@ -22,7 +23,7 @@ pub(crate) type Panic = core::convert::Infallible; /// The vtable for a task. pub(crate) struct TaskVTable { /// Schedules the task. - pub(crate) schedule: unsafe fn(*const (), bool), + pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), /// Drops the future inside the task. pub(crate) drop_future: unsafe fn(*const ()), @@ -129,7 +130,7 @@ impl RawTask { impl RawTask where F: Future, - S: Fn(Runnable, bool), + S: Schedule, { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, @@ -279,7 +280,12 @@ where // time to schedule it. if state & RUNNING == 0 { // Schedule the task. - Self::schedule(ptr, false); + Self::schedule( + ptr, + ScheduleInfo { + woken_while_running: false, + }, + ); } else { // Drop the waker. Self::drop_waker(ptr); @@ -348,7 +354,12 @@ where ptr: NonNull::new_unchecked(ptr as *mut ()), _marker: PhantomData, }; - (*raw.schedule)(task, false); + (*raw.schedule).schedule( + task, + ScheduleInfo { + woken_while_running: false, + }, + ); } break; @@ -396,7 +407,12 @@ where (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule(ptr, true); + Self::schedule( + ptr, + ScheduleInfo { + woken_while_running: false, + }, + ); } else { // Otherwise, destroy the task right away. Self::destroy(ptr); @@ -426,7 +442,7 @@ where /// /// This function doesn't modify the state of the task. It only passes the task reference to /// its schedule function. - unsafe fn schedule(ptr: *const (), woken_while_running: bool) { + unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { let raw = Self::from_ptr(ptr); // If the schedule function has captured variables, create a temporary waker that prevents @@ -440,7 +456,7 @@ where ptr: NonNull::new_unchecked(ptr as *mut ()), _marker: PhantomData, }; - (*raw.schedule)(task, woken_while_running); + (*raw.schedule).schedule(task, info); } /// Drops the future inside a task. @@ -662,7 +678,12 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule(ptr, true); + Self::schedule( + ptr, + ScheduleInfo { + woken_while_running: true, + }, + ); return true; } else { // Drop the task reference. @@ -682,12 +703,12 @@ where struct Guard(RawTask) where F: Future, - S: Fn(Runnable, bool); + S: Schedule; impl Drop for Guard where F: Future, - S: Fn(Runnable, bool), + S: Schedule, { fn drop(&mut self) { let raw = self.0; diff --git a/src/runnable.rs b/src/runnable.rs index 6b3bf60..1c126f9 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -13,6 +13,15 @@ use crate::raw::RawTask; use crate::state::*; use crate::Task; +mod sealed { + use super::*; + pub trait Sealed {} + + impl Sealed for F where F: Fn(Runnable) {} + + impl Sealed for WithInfo where F: Fn(Runnable, ScheduleInfo) {} +} + /// A builder that creates a new task. #[derive(Debug)] pub struct Builder { @@ -30,6 +39,114 @@ impl Default for Builder { } } +/// Extra scheduling information that can be acquired by the scheduling +/// function. +/// +/// Note: The data source of this struct is directly from the actual +/// implementation of the crate itself, different from [`Runnable`]'s metadata, +/// which is managed by the caller. +/// +/// # Examples +/// +/// ``` +/// use async_task::{Runnable, ScheduleInfo, WithInfo}; +/// use std::sync::{Arc, Mutex}; +/// +/// // If the task gets woken up while running, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// // Otherwise, it will be placed into this slot. +/// let lifo_slot = Arc::new(Mutex::new(None)); +/// let schedule = move |runnable: Runnable, info: ScheduleInfo| { +/// if info.woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = lifo_slot.lock().unwrap().replace(runnable); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// +/// // Create the actual scheduler to be spawned with some future. +/// let scheduler = WithInfo(schedule); +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub struct ScheduleInfo { + /// Indicates whether the task gets woken up while running. + /// + /// It is set to true usually because the task has yielded itself to the + /// scheduler. + pub woken_while_running: bool, +} + +/// The trait for scheduling functions. +pub trait Schedule: sealed::Sealed { + /// The actual scheduling procedure. + fn schedule(&self, runnable: Runnable, info: ScheduleInfo); +} + +impl Schedule for F +where + F: Fn(Runnable), +{ + fn schedule(&self, runnable: Runnable, _: ScheduleInfo) { + self(runnable) + } +} + +/// Pass a scheduling function with more scheduling information - a.k.a. +/// [`ScheduleInfo`]. +/// +/// Sometimes, it's useful to pass the runnable's state directly to the +/// scheduling function, such as whether it's woken up while running. The +/// scheduler can thus use the information to determine its scheduling +/// strategy. +/// +/// Note: The data source of [`ScheduleInfo`] is directly from the actual +/// implementation of the crate itself, different from [`Runnable`]'s metadata, +/// which is managed by the caller. +/// +/// # Examples +/// +/// ``` +/// use async_task::{ScheduleInfo, WithInfo}; +/// use std::sync::{Arc, Mutex}; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up while running, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// // Otherwise, it will be placed into this slot. +/// let lifo_slot = Arc::new(Mutex::new(None)); +/// let schedule = move |runnable, info: ScheduleInfo| { +/// if info.woken_while_running { +/// s.send(runnable).unwrap() +/// } else { +/// let last = lifo_slot.lock().unwrap().replace(runnable); +/// if let Some(last) = last { +/// s.send(last).unwrap() +/// } +/// } +/// }; +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = async_task::spawn(future, WithInfo(schedule)); +/// ``` +#[derive(Debug)] +pub struct WithInfo(pub F); + +impl Schedule for WithInfo +where + F: Fn(Runnable, ScheduleInfo), +{ + fn schedule(&self, runnable: Runnable, info: ScheduleInfo) { + (self.0)(runnable, info) + } +} + impl Builder<()> { /// Creates a new task builder. /// @@ -226,70 +343,11 @@ impl Builder { F: FnOnce(&M) -> Fut, Fut: Future + Send + 'static, Fut::Output: Send + 'static, - S: Fn(Runnable) + Send + Sync + 'static, + S: Schedule + Send + Sync + 'static, { unsafe { self.spawn_unchecked(future, schedule) } } - /// Creates a new task, with an additional argument in the schedule function. - /// - /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its - /// output. - /// - /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] - /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run - /// again. - /// - /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. - /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it - /// should push it into a task queue so that it can be processed later. - /// - /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider - /// using [`spawn_local2()`] or [`spawn_unchecked2()`] instead. - /// - /// # Arguments - /// - /// - `woken_while_running` - the second argument in the schedule function, set true when the - /// task is woken up while running. - /// - /// # Examples - /// - /// ``` - /// use async_task::Builder; - /// use std::sync::{Arc, Mutex}; - /// - /// // The future inside the task. - /// let future = async { - /// println!("Hello, world!"); - /// }; - /// - /// // A function that schedules the task when it gets woken up while runninng. - /// let (s, r) = flume::unbounded(); - /// // Otherwise, it will be placed into this slot. - /// let lifo_slot = Arc::new(Mutex::new(None)); - /// let schedule = move |runnable, woken_while_running| { - /// if woken_while_running { - /// s.send(runnable).unwrap() - /// } else { - /// let last = lifo_slot.lock().unwrap().replace(runnable); - /// if let Some(last) = last { - /// s.send(last).unwrap() - /// } - /// } - /// }; - /// // Create a task with the future and the schedule function. - /// let (runnable, task) = Builder::new().spawn2(|()| future, schedule); - /// ``` - pub fn spawn2(self, future: F, schedule: S) -> (Runnable, Task) - where - F: FnOnce(&M) -> Fut, - Fut: Future + Send + 'static, - Fut::Output: Send + 'static, - S: Fn(Runnable, bool) + Send + Sync + 'static, - { - unsafe { self.spawn_unchecked2(future, schedule) } - } - /// Creates a new thread-local task. /// /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the @@ -332,70 +390,7 @@ impl Builder { F: FnOnce(&M) -> Fut, Fut: Future + 'static, Fut::Output: 'static, - S: Fn(Runnable) + Send + Sync + 'static, - { - self.spawn_local2(future, move |runnable, _| schedule(runnable)) - } - - /// Creates a new thread-local task, with an additional argument in the schedule function.. - /// - /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the - /// [`Runnable`] is used or dropped on another thread, a panic will occur. - /// - /// This function is only available when the `std` feature for this crate is enabled. - /// - /// # Arguments - /// - /// - `woken_while_running` - the second argument in the schedule function, set true when the - /// task is woken up while running. - /// - /// # Examples - /// - /// ``` - /// use async_task::{Builder, Runnable}; - /// use flume::{Receiver, Sender}; - /// use std::{rc::Rc, cell::RefCell}; - /// - /// thread_local! { - /// // A queue that holds scheduled tasks. - /// static QUEUE: (Sender, Receiver) = flume::unbounded(); - /// - /// // A slot intended to be fetched first when picking tasks to run. - /// static LIFO_SLOT: RefCell> = RefCell::new(None); - /// } - /// - /// // Make a non-Send future. - /// let msg: Rc = "Hello, world!".into(); - /// let future = async move { - /// println!("{}", msg); - /// }; - /// - /// // A function that schedules the task when it gets woken up. - /// let s = QUEUE.with(|(s, _)| s.clone()); - /// let schedule = move |runnable, woken_while_running| { - /// if woken_while_running { - /// s.send(runnable).unwrap() - /// } else { - /// let last = LIFO_SLOT.with(|slot| slot.borrow_mut().replace(runnable)); - /// if let Some(last) = last { - /// s.send(last).unwrap() - /// } - /// } - /// }; - /// // Create a task with the future and the schedule function. - /// let (runnable, task) = Builder::new().spawn_local2(move |()| future, schedule); - /// ``` - #[cfg(feature = "std")] - pub fn spawn_local2( - self, - future: F, - schedule: S, - ) -> (Runnable, Task) - where - F: FnOnce(&M) -> Fut, - Fut: Future + 'static, - Fut::Output: 'static, - S: Fn(Runnable, bool) + Send + Sync + 'static, + S: Schedule + Send + Sync + 'static, { use std::mem::ManuallyDrop; use std::pin::Pin; @@ -450,7 +445,7 @@ impl Builder { } }; - unsafe { self.spawn_unchecked2(future, schedule) } + unsafe { self.spawn_unchecked(future, schedule) } } /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. @@ -492,70 +487,7 @@ impl Builder { where F: FnOnce(&'a M) -> Fut, Fut: Future + 'a, - S: Fn(Runnable), - M: 'a, - { - self.spawn_unchecked2(future, move |task, _| schedule(task)) - } - - /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds, with an additional argument - /// in the schedule function. - /// - /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and - /// `'static` on `future` and `schedule`. - /// - /// # Arguments - /// - /// - `woken_while_running` - the second argument in the schedule function, set true when the - /// task is woken up while running. - /// - /// # Safety - /// - /// - If `future`'s output is not [`Send`], its [`Runnable`] must be used and dropped on the original - /// thread. - /// - If `future`'s output is not `'static`, borrowed variables must outlive its [`Runnable`]. - /// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on - /// the original thread. - /// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. - /// - /// # Examples - /// - /// ``` - /// use async_task::Builder; - /// use std::sync::{Arc, Mutex}; - /// - /// // The future inside the task. - /// let future = async { - /// println!("Hello, world!"); - /// }; - /// - /// // If the task gets woken up while running, it will be sent into this channel. - /// let (s, r) = flume::unbounded(); - /// // Otherwise, it will be placed into this slot. - /// let lifo_slot = Arc::new(Mutex::new(None)); - /// let schedule = move |runnable, woken_while_running| { - /// if woken_while_running { - /// s.send(runnable).unwrap() - /// } else { - /// let last = lifo_slot.lock().unwrap().replace(runnable); - /// if let Some(last) = last { - /// s.send(last).unwrap() - /// } - /// } - /// }; - /// - /// // Create a task with the future and the schedule function. - /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked2(move |()| future, schedule) }; - /// ``` - pub unsafe fn spawn_unchecked2<'a, F, Fut, S>( - self, - future: F, - schedule: S, - ) -> (Runnable, Task) - where - F: FnOnce(&'a M) -> Fut, - Fut: Future + 'a, - S: Fn(Runnable, bool), + S: Schedule, M: 'a, { // Allocate large futures on the heap. @@ -617,69 +549,11 @@ pub fn spawn(future: F, schedule: S) -> (Runnable, Task) where F: Future + Send + 'static, F::Output: Send + 'static, - S: Fn(Runnable) + Send + Sync + 'static, + S: Schedule + Send + Sync + 'static, { unsafe { spawn_unchecked(future, schedule) } } -/// Creates a new task, with an additional argument in the schedule function. -/// -/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its -/// output. -/// -/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] -/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run -/// again. -/// -/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. -/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it -/// should push it into a task queue so that it can be processed later. -/// -/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider -/// using [`spawn_local2()`] or [`spawn_unchecked2()`] instead. -/// -/// # Arguments -/// -/// - `woken_while_running` - the second argument in the schedule function, set true when the -/// task is woken up while running. -/// -/// # Examples -/// -/// ``` -/// use async_task::Builder; -/// use std::sync::{Arc, Mutex}; -/// -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // A function that schedules the task when it gets woken up while runninng. -/// let (s, r) = flume::unbounded(); -/// // Otherwise, it will be placed into this slot. -/// let lifo_slot = Arc::new(Mutex::new(None)); -/// let schedule = move |runnable, woken_while_running| { -/// if woken_while_running { -/// s.send(runnable).unwrap() -/// } else { -/// let last = lifo_slot.lock().unwrap().replace(runnable); -/// if let Some(last) = last { -/// s.send(last).unwrap() -/// } -/// } -/// }; -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = Builder::new().spawn2(|()| future, schedule); -/// ``` -pub fn spawn2(future: F, schedule: S) -> (Runnable, Task) -where - F: Future + Send + 'static, - F::Output: Send + 'static, - S: Fn(Runnable, bool) + Send + Sync + 'static, -{ - unsafe { spawn_unchecked2(future, schedule) } -} - /// Creates a new thread-local task. /// /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the @@ -717,69 +591,11 @@ pub fn spawn_local(future: F, schedule: S) -> (Runnable, Task) where F: Future + 'static, F::Output: 'static, - S: Fn(Runnable) + Send + Sync + 'static, + S: Schedule + Send + Sync + 'static, { Builder::new().spawn_local(move |()| future, schedule) } -/// Creates a new thread-local task, with an additional argument in the schedule function.. -/// -/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the -/// [`Runnable`] is used or dropped on another thread, a panic will occur. -/// -/// This function is only available when the `std` feature for this crate is enabled. -/// -/// # Arguments -/// -/// - `woken_while_running` - the second argument in the schedule function, set true when the -/// task is woken up while running. -/// -/// # Examples -/// -/// ``` -/// use async_task::{Builder, Runnable}; -/// use flume::{Receiver, Sender}; -/// use std::{rc::Rc, cell::RefCell}; -/// -/// thread_local! { -/// // A queue that holds scheduled tasks. -/// static QUEUE: (Sender, Receiver) = flume::unbounded(); -/// -/// // A slot intended to be fetched first when picking tasks to run. -/// static LIFO_SLOT: RefCell> = RefCell::new(None); -/// } -/// -/// // Make a non-Send future. -/// let msg: Rc = "Hello, world!".into(); -/// let future = async move { -/// println!("{}", msg); -/// }; -/// -/// // A function that schedules the task when it gets woken up. -/// let s = QUEUE.with(|(s, _)| s.clone()); -/// let schedule = move |runnable, woken_while_running| { -/// if woken_while_running { -/// s.send(runnable).unwrap() -/// } else { -/// let last = LIFO_SLOT.with(|slot| slot.borrow_mut().replace(runnable)); -/// if let Some(last) = last { -/// s.send(last).unwrap() -/// } -/// } -/// }; -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = Builder::new().spawn_local2(move |()| future, schedule); -/// ``` -#[cfg(feature = "std")] -pub fn spawn_local2(future: F, schedule: S) -> (Runnable, Task) -where - F: Future + 'static, - F::Output: 'static, - S: Fn(Runnable, bool) + Send + Sync + 'static, -{ - Builder::new().spawn_local2(move |()| future, schedule) -} - /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. /// /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and @@ -812,68 +628,11 @@ where pub unsafe fn spawn_unchecked(future: F, schedule: S) -> (Runnable, Task) where F: Future, - S: Fn(Runnable), + S: Schedule, { Builder::new().spawn_unchecked(move |()| future, schedule) } -/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds, with an additional argument -/// in the schedule function. -/// -/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and -/// `'static` on `future` and `schedule`. -/// -/// # Arguments -/// -/// - `woken_while_running` - the second argument in the schedule function, set true when the -/// task is woken while running. -/// -/// # Safety -/// -/// - If `future`'s output is not [`Send`], its [`Runnable`] must be used and dropped on the original -/// thread. -/// - If `future`'s output is not `'static`, borrowed variables must outlive its [`Runnable`]. -/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on -/// the original thread. -/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. -/// -/// # Examples -/// -/// ``` -/// use async_task::Builder; -/// use std::sync::{Arc, Mutex}; -/// -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // If the task gets woken up while running, it will be sent into this channel. -/// let (s, r) = flume::unbounded(); -/// // Otherwise, it will be placed into this slot. -/// let lifo_slot = Arc::new(Mutex::new(None)); -/// let schedule = move |runnable, woken_while_running| { -/// if woken_while_running { -/// s.send(runnable).unwrap() -/// } else { -/// let last = lifo_slot.lock().unwrap().replace(runnable); -/// if let Some(last) = last { -/// s.send(last).unwrap() -/// } -/// } -/// }; -/// -/// // Create a task with the future and the schedule function. -/// let (runnable, task) = unsafe { Builder::new().spawn_unchecked2(move |()| future, schedule) }; -/// ``` -pub unsafe fn spawn_unchecked2(future: F, schedule: S) -> (Runnable, Task) -where - F: Future, - S: Fn(Runnable, bool), -{ - Builder::new().spawn_unchecked2(move |()| future, schedule) -} - /// A handle to a runnable task. /// /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is @@ -962,7 +721,12 @@ impl Runnable { mem::forget(self); unsafe { - ((*header).vtable.schedule)(ptr, false); + ((*header).vtable.schedule)( + ptr, + ScheduleInfo { + woken_while_running: false, + }, + ); } } diff --git a/src/task.rs b/src/task.rs index 2f86989..d1cd0d8 100644 --- a/src/task.rs +++ b/src/task.rs @@ -9,6 +9,7 @@ use core::task::{Context, Poll}; use crate::header::Header; use crate::raw::Panic; +use crate::runnable::ScheduleInfo; use crate::state::*; /// A spawned task. @@ -210,7 +211,12 @@ impl Task { // If the task is not scheduled nor running, schedule it one more time so // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr, false); + ((*header).vtable.schedule)( + ptr, + ScheduleInfo { + woken_while_running: false, + }, + ); } // Notify the awaiter that the task has been closed. @@ -289,7 +295,12 @@ impl Task { // schedule dropping its future or destroy it. if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr, false); + ((*header).vtable.schedule)( + ptr, + ScheduleInfo { + woken_while_running: false, + }, + ); } else { ((*header).vtable.destroy)(ptr); } From dcef1d6ba1843671e4cbc969dfd402dfefad22f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Thu, 23 Mar 2023 18:58:00 +0800 Subject: [PATCH 5/7] Improve some code --- src/raw.rs | 28 ++++------------------------ src/runnable.rs | 34 ++++++++++++++++++++++------------ src/task.rs | 14 ++------------ 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/src/raw.rs b/src/raw.rs index b1ac161..97134fd 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -280,12 +280,7 @@ where // time to schedule it. if state & RUNNING == 0 { // Schedule the task. - Self::schedule( - ptr, - ScheduleInfo { - woken_while_running: false, - }, - ); + Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Drop the waker. Self::drop_waker(ptr); @@ -354,12 +349,7 @@ where ptr: NonNull::new_unchecked(ptr as *mut ()), _marker: PhantomData, }; - (*raw.schedule).schedule( - task, - ScheduleInfo { - woken_while_running: false, - }, - ); + (*raw.schedule).schedule(task, ScheduleInfo::new(false)); } break; @@ -407,12 +397,7 @@ where (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule( - ptr, - ScheduleInfo { - woken_while_running: false, - }, - ); + Self::schedule(ptr, ScheduleInfo::new(false)); } else { // Otherwise, destroy the task right away. Self::destroy(ptr); @@ -678,12 +663,7 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule( - ptr, - ScheduleInfo { - woken_while_running: true, - }, - ); + Self::schedule(ptr, ScheduleInfo::new(true)); return true; } else { // Drop the task reference. diff --git a/src/runnable.rs b/src/runnable.rs index 1c126f9..57cd34b 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -39,12 +39,11 @@ impl Default for Builder { } } -/// Extra scheduling information that can be acquired by the scheduling -/// function. +/// Extra scheduling information that can be passed to the scheduling function. /// -/// Note: The data source of this struct is directly from the actual -/// implementation of the crate itself, different from [`Runnable`]'s metadata, -/// which is managed by the caller. +/// The data source of this struct is directly from the actual implementation +/// of the crate itself, different from [`Runnable`]'s metadata, which is +/// managed by the caller. /// /// # Examples /// @@ -52,6 +51,11 @@ impl Default for Builder { /// use async_task::{Runnable, ScheduleInfo, WithInfo}; /// use std::sync::{Arc, Mutex}; /// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// /// // If the task gets woken up while running, it will be sent into this channel. /// let (s, r) = flume::unbounded(); /// // Otherwise, it will be placed into this slot. @@ -69,6 +73,9 @@ impl Default for Builder { /// /// // Create the actual scheduler to be spawned with some future. /// let scheduler = WithInfo(schedule); +/// // Create a task with the future and the scheduler. +/// let (runnable, task) = async_task::spawn(future, scheduler); +/// ``` #[derive(Debug, Copy, Clone)] #[non_exhaustive] pub struct ScheduleInfo { @@ -79,6 +86,14 @@ pub struct ScheduleInfo { pub woken_while_running: bool, } +impl ScheduleInfo { + pub(crate) fn new(woken_while_running: bool) -> Self { + ScheduleInfo { + woken_while_running, + } + } +} + /// The trait for scheduling functions. pub trait Schedule: sealed::Sealed { /// The actual scheduling procedure. @@ -102,7 +117,7 @@ where /// scheduler can thus use the information to determine its scheduling /// strategy. /// -/// Note: The data source of [`ScheduleInfo`] is directly from the actual +/// The data source of [`ScheduleInfo`] is directly from the actual /// implementation of the crate itself, different from [`Runnable`]'s metadata, /// which is managed by the caller. /// @@ -721,12 +736,7 @@ impl Runnable { mem::forget(self); unsafe { - ((*header).vtable.schedule)( - ptr, - ScheduleInfo { - woken_while_running: false, - }, - ); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } } diff --git a/src/task.rs b/src/task.rs index d1cd0d8..8f52549 100644 --- a/src/task.rs +++ b/src/task.rs @@ -211,12 +211,7 @@ impl Task { // If the task is not scheduled nor running, schedule it one more time so // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)( - ptr, - ScheduleInfo { - woken_while_running: false, - }, - ); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } // Notify the awaiter that the task has been closed. @@ -295,12 +290,7 @@ impl Task { // schedule dropping its future or destroy it. if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { - ((*header).vtable.schedule)( - ptr, - ScheduleInfo { - woken_while_running: false, - }, - ); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } else { ((*header).vtable.destroy)(ptr); } From 928d30392634cdcc8b8ffe336da067918bbfa9a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Thu, 23 Mar 2023 19:01:49 +0800 Subject: [PATCH 6/7] Improve some code --- src/runnable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runnable.rs b/src/runnable.rs index 57cd34b..43b945f 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -42,7 +42,7 @@ impl Default for Builder { /// Extra scheduling information that can be passed to the scheduling function. /// /// The data source of this struct is directly from the actual implementation -/// of the crate itself, different from [`Runnable`]'s metadata, which is +/// of the crate itself, different from [`Runnable`]'s metadata, which is /// managed by the caller. /// /// # Examples From 3ba42f8e1d86bafcebdb19b242e369235546388f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E5=90=AF=E8=88=AA?= Date: Fri, 24 Mar 2023 01:03:18 +0800 Subject: [PATCH 7/7] Add `impl From` for `WithInfo` --- src/runnable.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/runnable.rs b/src/runnable.rs index 43b945f..e495d53 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -153,6 +153,12 @@ where #[derive(Debug)] pub struct WithInfo(pub F); +impl From for WithInfo { + fn from(value: F) -> Self { + WithInfo(value) + } +} + impl Schedule for WithInfo where F: Fn(Runnable, ScheduleInfo),