From 701c476258d515adfcd3f7266fb501eaf9f8e5ce Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 31 May 2024 00:42:07 -0700 Subject: [PATCH 1/6] Added TickedAsyncLocalExecutor implementation --- Cargo.toml | 10 ++++++ src/lib.rs | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/lib.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1bf97de --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "ticked_async_executor" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-task = "4.7" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..470f528 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,99 @@ +use std::{ + future::Future, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc, Arc, + }, +}; + +use async_task::{Runnable, Task}; + +pub struct TickedAsyncLocalExecutor { + channel: (mpsc::Sender, mpsc::Receiver), + num_woken_tasks: Arc, +} + +impl Default for TickedAsyncLocalExecutor { + fn default() -> Self { + Self::new() + } +} + +impl TickedAsyncLocalExecutor { + pub fn new() -> Self { + Self { + channel: mpsc::channel(), + num_woken_tasks: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn spawn_local(&self, future: impl Future + 'static) -> Task + where + T: 'static, + { + let sender = self.channel.0.clone(); + let num_woken_tasks = self.num_woken_tasks.clone(); + let schedule = move |runnable| { + sender.send(runnable).unwrap_or(()); + num_woken_tasks.fetch_add(1, Ordering::Relaxed); + }; + let (runnable, task) = async_task::spawn_local(future, schedule); + runnable.schedule(); + task + } + + /// Run the woken tasks once + /// + /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` + pub fn tick(&self) { + let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); + self.channel + .1 + .try_iter() + .take(num_woken_tasks) + .for_each(|runnable| { + runnable.run(); + }); + self.num_woken_tasks + .fetch_sub(num_woken_tasks, Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_multiple_tasks() { + let executor = TickedAsyncLocalExecutor::new(); + executor + .spawn_local(async move { + println!("A: Start"); + tokio::task::yield_now().await; + println!("A: End"); + }) + .detach(); + + executor + .spawn_local(async move { + println!("B: Start"); + tokio::task::yield_now().await; + println!("B: End"); + }) + .detach(); + + executor + .spawn_local(async move { + println!("C: Start"); + tokio::task::yield_now().await; + println!("C: End"); + }) + .detach(); + + // A, B, C: Start + executor.tick(); + + // A, B, C: End + executor.tick(); + } +} From 1614bfcce8587647a0c3b5801c1ca543e2acc722 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 31 May 2024 18:35:07 -0700 Subject: [PATCH 2/6] Added droppable future for Ticked Async Executor - Gives visibility into number of spawned/dropped tasks --- Cargo.toml | 1 + src/droppable_future.rs | 51 +++++++++++++ src/lib.rs | 102 ++------------------------ src/ticked_async_executor.rs | 134 +++++++++++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 98 deletions(-) create mode 100644 src/droppable_future.rs create mode 100644 src/ticked_async_executor.rs diff --git a/Cargo.toml b/Cargo.toml index 1bf97de..f3fb95e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] async-task = "4.7" +pin-project = "1" [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/src/droppable_future.rs b/src/droppable_future.rs new file mode 100644 index 0000000..0ab6d57 --- /dev/null +++ b/src/droppable_future.rs @@ -0,0 +1,51 @@ +use std::{future::Future, pin::Pin}; + +use pin_project::{pin_project, pinned_drop}; + +#[pin_project(PinnedDrop)] +pub struct DroppableFuture +where + F: Future, + D: Fn(), +{ + #[pin] + future: F, + on_drop: D, +} + +impl DroppableFuture +where + F: Future, + D: Fn(), +{ + pub fn new(future: F, on_drop: D) -> Self { + Self { future, on_drop } + } +} + +impl Future for DroppableFuture +where + F: Future, + D: Fn(), +{ + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + this.future.poll(cx) + } +} + +#[pinned_drop] +impl PinnedDrop for DroppableFuture +where + F: Future, + D: Fn(), +{ + fn drop(self: Pin<&mut Self>) { + (self.on_drop)(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 470f528..c4006c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,99 +1,5 @@ -use std::{ - future::Future, - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, - }, -}; +mod droppable_future; +use droppable_future::*; -use async_task::{Runnable, Task}; - -pub struct TickedAsyncLocalExecutor { - channel: (mpsc::Sender, mpsc::Receiver), - num_woken_tasks: Arc, -} - -impl Default for TickedAsyncLocalExecutor { - fn default() -> Self { - Self::new() - } -} - -impl TickedAsyncLocalExecutor { - pub fn new() -> Self { - Self { - channel: mpsc::channel(), - num_woken_tasks: Arc::new(AtomicUsize::new(0)), - } - } - - pub fn spawn_local(&self, future: impl Future + 'static) -> Task - where - T: 'static, - { - let sender = self.channel.0.clone(); - let num_woken_tasks = self.num_woken_tasks.clone(); - let schedule = move |runnable| { - sender.send(runnable).unwrap_or(()); - num_woken_tasks.fetch_add(1, Ordering::Relaxed); - }; - let (runnable, task) = async_task::spawn_local(future, schedule); - runnable.schedule(); - task - } - - /// Run the woken tasks once - /// - /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` - pub fn tick(&self) { - let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); - self.channel - .1 - .try_iter() - .take(num_woken_tasks) - .for_each(|runnable| { - runnable.run(); - }); - self.num_woken_tasks - .fetch_sub(num_woken_tasks, Ordering::Relaxed); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_multiple_tasks() { - let executor = TickedAsyncLocalExecutor::new(); - executor - .spawn_local(async move { - println!("A: Start"); - tokio::task::yield_now().await; - println!("A: End"); - }) - .detach(); - - executor - .spawn_local(async move { - println!("B: Start"); - tokio::task::yield_now().await; - println!("B: End"); - }) - .detach(); - - executor - .spawn_local(async move { - println!("C: Start"); - tokio::task::yield_now().await; - println!("C: End"); - }) - .detach(); - - // A, B, C: Start - executor.tick(); - - // A, B, C: End - executor.tick(); - } -} +mod ticked_async_executor; +pub use ticked_async_executor::*; diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs new file mode 100644 index 0000000..b1d14a4 --- /dev/null +++ b/src/ticked_async_executor.rs @@ -0,0 +1,134 @@ +use std::{ + future::Future, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc, Arc, + }, +}; + +use async_task::{Runnable, Task}; + +use crate::DroppableFuture; + +pub struct TickedAsyncExecutor { + channel: (mpsc::Sender, mpsc::Receiver), + num_woken_tasks: Arc, + num_spawned_tasks: Arc, +} + +impl Default for TickedAsyncExecutor { + fn default() -> Self { + Self::new() + } +} + +impl TickedAsyncExecutor { + pub fn new() -> Self { + Self { + channel: mpsc::channel(), + num_woken_tasks: Arc::new(AtomicUsize::new(0)), + num_spawned_tasks: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn spawn(&self, future: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + let future = self.droppable_future(future); + let schedule = self.runnable_schedule_cb(); + let (runnable, task) = async_task::spawn(future, schedule); + runnable.schedule(); + task + } + + pub fn spawn_local(&self, future: impl Future + 'static) -> Task + where + T: 'static, + { + let future = self.droppable_future(future); + let schedule = self.runnable_schedule_cb(); + let (runnable, task) = async_task::spawn_local(future, schedule); + runnable.schedule(); + task + } + + /// Run the woken tasks once + /// + /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` + pub fn tick(&self) { + let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); + self.channel + .1 + .try_iter() + .take(num_woken_tasks) + .for_each(|runnable| { + runnable.run(); + }); + self.num_woken_tasks + .fetch_sub(num_woken_tasks, Ordering::Relaxed); + } + + fn droppable_future(&self, future: F) -> DroppableFuture + where + F: Future, + { + self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed); + let num_spawned_tasks = self.num_spawned_tasks.clone(); + DroppableFuture::new(future, move || { + num_spawned_tasks.fetch_sub(1, Ordering::Relaxed); + }) + } + + fn runnable_schedule_cb(&self) -> impl Fn(Runnable) { + let sender = self.channel.0.clone(); + let num_woken_tasks = self.num_woken_tasks.clone(); + move |runnable| { + sender.send(runnable).unwrap_or(()); + num_woken_tasks.fetch_add(1, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_multiple_tasks() { + let executor = TickedAsyncExecutor::new(); + executor + .spawn_local(async move { + println!("A: Start"); + tokio::task::yield_now().await; + println!("A: End"); + }) + .detach(); + + executor + .spawn_local(async move { + println!("B: Start"); + tokio::task::yield_now().await; + println!("B: End"); + }) + .detach(); + + executor + .spawn_local(async move { + println!("C: Start"); + tokio::task::yield_now().await; + println!("C: End"); + }) + .detach(); + + // A, B, C: Start + executor.tick(); + + // A, B, C: End + executor.tick(); + } + + // TODO, Test Task cancellation + // TODO, Test FallibleTasks + // TODO, Test Edge cases +} From e36318922127f7ec25b0a564fc7e2afd9f2068f9 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 31 May 2024 19:20:53 -0700 Subject: [PATCH 3/6] Updated unit tests --- src/ticked_async_executor.rs | 53 ++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index b1d14a4..d3fa500 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -22,6 +22,8 @@ impl Default for TickedAsyncExecutor { } } +// TODO, Observer: Task spawn/wake/drop events +// TODO, Task Identifier String impl TickedAsyncExecutor { pub fn new() -> Self { Self { @@ -53,6 +55,10 @@ impl TickedAsyncExecutor { task } + pub fn num_tasks(&self) -> usize { + self.num_spawned_tasks.load(Ordering::Relaxed) + } + /// Run the woken tasks once /// /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` @@ -113,22 +119,47 @@ mod tests { }) .detach(); - executor - .spawn_local(async move { - println!("C: Start"); - tokio::task::yield_now().await; - println!("C: End"); - }) - .detach(); - // A, B, C: Start executor.tick(); + assert_eq!(executor.num_tasks(), 2); // A, B, C: End executor.tick(); + assert_eq!(executor.num_tasks(), 0); } - // TODO, Test Task cancellation - // TODO, Test FallibleTasks - // TODO, Test Edge cases + #[test] + fn test_task_cancellation() { + let executor = TickedAsyncExecutor::new(); + let task1 = executor.spawn_local(async move { + loop { + println!("A: Start"); + tokio::task::yield_now().await; + println!("A: End"); + } + }); + + let task2 = executor.spawn_local(async move { + loop { + println!("B: Start"); + tokio::task::yield_now().await; + println!("B: End"); + } + }); + assert_eq!(executor.num_tasks(), 2); + executor.tick(); + + executor + .spawn_local(async move { + task1.cancel().await; + task2.cancel().await; + }) + .detach(); + assert_eq!(executor.num_tasks(), 3); + + // Since we have cancelled the tasks above, the loops should eventually end + while executor.num_tasks() != 0 { + executor.tick(); + } + } } From ce2b20bf6e423964fbee9b7640c048a08da86f8f Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 31 May 2024 19:25:06 -0700 Subject: [PATCH 4/6] Added CI/CD --- .github/workflows/rust.yml | 47 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..1ac22ab --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,47 @@ +name: Rust CI/CD + +on: + push: + branches: ["main"] + pull_request: + branches: ["main"] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + + - name: Install + if: ${{ matrix.os == 'ubuntu-latest' }} + run: | + cargo install cargo-tarpaulin + + - name: Test + run: | + cargo clippy + cargo test + + - name: Build + run: | + cargo build + cargo build --release + + - name: Generate Coverage Report + if: ${{ matrix.os == 'ubuntu-latest' }} + run: | + cargo tarpaulin --engine llvm --out xml --output-dir target + + - name: Upload coverage reports to Codecov + if: ${{ matrix.os == 'ubuntu-latest' }} + uses: codecov/codecov-action@v4.0.1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: target/cobertura.xml From a9db78b38634b6cc9758478b140426d298a4feb1 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 31 May 2024 19:25:20 -0700 Subject: [PATCH 5/6] Added docs --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f3273db..c969645 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,7 @@ -# ticked-async-executor +# Ticked Async Executor + Rust based Async Executor which executes woken tasks only when it is ticked + +# Limitation + +- Does not work with the tokio runtime and async constructs that use the tokio runtime internally From f7ca99d7116d6c6ce9af22abbabc93f0c6cc3922 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Fri, 31 May 2024 19:30:19 -0700 Subject: [PATCH 6/6] Minor update to CI script --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1ac22ab..761916b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -17,7 +17,7 @@ jobs: os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install if: ${{ matrix.os == 'ubuntu-latest' }}