Skip to content

Basic Ticked Async Executor #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: Rust CI/CD

on:
push:
branches: ["main"]
pull_request:
branches: ["main"]

env:
CARGO_TERM_COLOR: always

jobs:
build:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4

- name: Install
if: ${{ matrix.os == 'ubuntu-latest' }}
run: |
cargo install cargo-tarpaulin

- name: Test
run: |
cargo clippy
cargo test

- name: Build
run: |
cargo build
cargo build --release

- name: Generate Coverage Report
if: ${{ matrix.os == 'ubuntu-latest' }}
run: |
cargo tarpaulin --engine llvm --out xml --output-dir target

- name: Upload coverage reports to Codecov
if: ${{ matrix.os == 'ubuntu-latest' }}
uses: codecov/[email protected]
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: target/cobertura.xml
11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "ticked_async_executor"
version = "0.1.0"
edition = "2021"

[dependencies]
async-task = "4.7"
pin-project = "1"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
# ticked-async-executor
# Ticked Async Executor

Rust based Async Executor which executes woken tasks only when it is ticked

# Limitation

- Does not work with the tokio runtime and async constructs that use the tokio runtime internally
51 changes: 51 additions & 0 deletions src/droppable_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::{future::Future, pin::Pin};

use pin_project::{pin_project, pinned_drop};

#[pin_project(PinnedDrop)]
pub struct DroppableFuture<F, D>
where
F: Future,
D: Fn(),
{
#[pin]
future: F,
on_drop: D,
}

impl<F, D> DroppableFuture<F, D>
where
F: Future,
D: Fn(),
{
pub fn new(future: F, on_drop: D) -> Self {
Self { future, on_drop }
}
}

impl<F, D> Future for DroppableFuture<F, D>
where
F: Future,
D: Fn(),
{
type Output = F::Output;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.future.poll(cx)
}
}

#[pinned_drop]
impl<F, D> PinnedDrop for DroppableFuture<F, D>
where
F: Future,
D: Fn(),
{
fn drop(self: Pin<&mut Self>) {
(self.on_drop)();
}
}
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod droppable_future;
use droppable_future::*;

mod ticked_async_executor;
pub use ticked_async_executor::*;
165 changes: 165 additions & 0 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::{
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
};

use async_task::{Runnable, Task};

use crate::DroppableFuture;

pub struct TickedAsyncExecutor {
channel: (mpsc::Sender<Runnable>, mpsc::Receiver<Runnable>),
num_woken_tasks: Arc<AtomicUsize>,
num_spawned_tasks: Arc<AtomicUsize>,
}

impl Default for TickedAsyncExecutor {
fn default() -> Self {
Self::new()

Check warning on line 21 in src/ticked_async_executor.rs

View check run for this annotation

Codecov / codecov/patch

src/ticked_async_executor.rs#L20-L21

Added lines #L20 - L21 were not covered by tests
}
}

// TODO, Observer: Task spawn/wake/drop events
// TODO, Task Identifier String
impl TickedAsyncExecutor {
pub fn new() -> Self {
Self {
channel: mpsc::channel(),
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
}
}

pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'static) -> Task<T>

Check warning on line 36 in src/ticked_async_executor.rs

View check run for this annotation

Codecov / codecov/patch

src/ticked_async_executor.rs#L36

Added line #L36 was not covered by tests
where
T: Send + 'static,
{
let future = self.droppable_future(future);
let schedule = self.runnable_schedule_cb();
let (runnable, task) = async_task::spawn(future, schedule);
runnable.schedule();
task

Check warning on line 44 in src/ticked_async_executor.rs

View check run for this annotation

Codecov / codecov/patch

src/ticked_async_executor.rs#L40-L44

Added lines #L40 - L44 were not covered by tests
}

pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
where
T: 'static,
{
let future = self.droppable_future(future);
let schedule = self.runnable_schedule_cb();
let (runnable, task) = async_task::spawn_local(future, schedule);
runnable.schedule();
task
}

pub fn num_tasks(&self) -> usize {
self.num_spawned_tasks.load(Ordering::Relaxed)
}

/// Run the woken tasks once
///
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
pub fn tick(&self) {
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
self.channel
.1
.try_iter()
.take(num_woken_tasks)
.for_each(|runnable| {
runnable.run();
});
self.num_woken_tasks
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
}

fn droppable_future<F>(&self, future: F) -> DroppableFuture<F, impl Fn()>
where
F: Future,
{
self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed);
let num_spawned_tasks = self.num_spawned_tasks.clone();
DroppableFuture::new(future, move || {
num_spawned_tasks.fetch_sub(1, Ordering::Relaxed);
})
}

fn runnable_schedule_cb(&self) -> impl Fn(Runnable) {
let sender = self.channel.0.clone();
let num_woken_tasks = self.num_woken_tasks.clone();
move |runnable| {
sender.send(runnable).unwrap_or(());
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_multiple_tasks() {
let executor = TickedAsyncExecutor::new();
executor
.spawn_local(async move {
println!("A: Start");
tokio::task::yield_now().await;
println!("A: End");
})
.detach();

executor
.spawn_local(async move {
println!("B: Start");
tokio::task::yield_now().await;
println!("B: End");
})
.detach();

// A, B, C: Start
executor.tick();
assert_eq!(executor.num_tasks(), 2);

// A, B, C: End
executor.tick();
assert_eq!(executor.num_tasks(), 0);
}

#[test]
fn test_task_cancellation() {
let executor = TickedAsyncExecutor::new();
let task1 = executor.spawn_local(async move {
loop {
println!("A: Start");
tokio::task::yield_now().await;
println!("A: End");
}
});

let task2 = executor.spawn_local(async move {
loop {
println!("B: Start");
tokio::task::yield_now().await;
println!("B: End");
}
});
assert_eq!(executor.num_tasks(), 2);
executor.tick();

executor
.spawn_local(async move {
task1.cancel().await;
task2.cancel().await;
})
.detach();
assert_eq!(executor.num_tasks(), 3);

// Since we have cancelled the tasks above, the loops should eventually end
while executor.num_tasks() != 0 {
executor.tick();
}
}
}