Skip to content

Commit 5a6fb1c

Browse files
authored
Basic Ticked Async Executor (#1)
1 parent 7e849e4 commit 5a6fb1c

File tree

6 files changed

+285
-1
lines changed

6 files changed

+285
-1
lines changed

.github/workflows/rust.yml

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
name: Rust CI/CD
2+
3+
on:
4+
push:
5+
branches: ["main"]
6+
pull_request:
7+
branches: ["main"]
8+
9+
env:
10+
CARGO_TERM_COLOR: always
11+
12+
jobs:
13+
build:
14+
strategy:
15+
fail-fast: false
16+
matrix:
17+
os: [ubuntu-latest, windows-latest, macos-latest]
18+
runs-on: ${{ matrix.os }}
19+
steps:
20+
- uses: actions/checkout@v4
21+
22+
- name: Install
23+
if: ${{ matrix.os == 'ubuntu-latest' }}
24+
run: |
25+
cargo install cargo-tarpaulin
26+
27+
- name: Test
28+
run: |
29+
cargo clippy
30+
cargo test
31+
32+
- name: Build
33+
run: |
34+
cargo build
35+
cargo build --release
36+
37+
- name: Generate Coverage Report
38+
if: ${{ matrix.os == 'ubuntu-latest' }}
39+
run: |
40+
cargo tarpaulin --engine llvm --out xml --output-dir target
41+
42+
- name: Upload coverage reports to Codecov
43+
if: ${{ matrix.os == 'ubuntu-latest' }}
44+
uses: codecov/[email protected]
45+
with:
46+
token: ${{ secrets.CODECOV_TOKEN }}
47+
files: target/cobertura.xml

Cargo.toml

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "ticked_async_executor"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
async-task = "4.7"
8+
pin-project = "1"
9+
10+
[dev-dependencies]
11+
tokio = { version = "1", features = ["full"] }

README.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
1-
# ticked-async-executor
1+
# Ticked Async Executor
2+
23
Rust based Async Executor which executes woken tasks only when it is ticked
4+
5+
# Limitation
6+
7+
- Does not work with the tokio runtime and async constructs that use the tokio runtime internally

src/droppable_future.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::{future::Future, pin::Pin};
2+
3+
use pin_project::{pin_project, pinned_drop};
4+
5+
#[pin_project(PinnedDrop)]
6+
pub struct DroppableFuture<F, D>
7+
where
8+
F: Future,
9+
D: Fn(),
10+
{
11+
#[pin]
12+
future: F,
13+
on_drop: D,
14+
}
15+
16+
impl<F, D> DroppableFuture<F, D>
17+
where
18+
F: Future,
19+
D: Fn(),
20+
{
21+
pub fn new(future: F, on_drop: D) -> Self {
22+
Self { future, on_drop }
23+
}
24+
}
25+
26+
impl<F, D> Future for DroppableFuture<F, D>
27+
where
28+
F: Future,
29+
D: Fn(),
30+
{
31+
type Output = F::Output;
32+
33+
fn poll(
34+
self: std::pin::Pin<&mut Self>,
35+
cx: &mut std::task::Context<'_>,
36+
) -> std::task::Poll<Self::Output> {
37+
let this = self.project();
38+
this.future.poll(cx)
39+
}
40+
}
41+
42+
#[pinned_drop]
43+
impl<F, D> PinnedDrop for DroppableFuture<F, D>
44+
where
45+
F: Future,
46+
D: Fn(),
47+
{
48+
fn drop(self: Pin<&mut Self>) {
49+
(self.on_drop)();
50+
}
51+
}

src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod droppable_future;
2+
use droppable_future::*;
3+
4+
mod ticked_async_executor;
5+
pub use ticked_async_executor::*;

src/ticked_async_executor.rs

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
use std::{
2+
future::Future,
3+
sync::{
4+
atomic::{AtomicUsize, Ordering},
5+
mpsc, Arc,
6+
},
7+
};
8+
9+
use async_task::{Runnable, Task};
10+
11+
use crate::DroppableFuture;
12+
13+
pub struct TickedAsyncExecutor {
14+
channel: (mpsc::Sender<Runnable>, mpsc::Receiver<Runnable>),
15+
num_woken_tasks: Arc<AtomicUsize>,
16+
num_spawned_tasks: Arc<AtomicUsize>,
17+
}
18+
19+
impl Default for TickedAsyncExecutor {
20+
fn default() -> Self {
21+
Self::new()
22+
}
23+
}
24+
25+
// TODO, Observer: Task spawn/wake/drop events
26+
// TODO, Task Identifier String
27+
impl TickedAsyncExecutor {
28+
pub fn new() -> Self {
29+
Self {
30+
channel: mpsc::channel(),
31+
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
32+
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
33+
}
34+
}
35+
36+
pub fn spawn<T>(&self, future: impl Future<Output = T> + Send + 'static) -> Task<T>
37+
where
38+
T: Send + 'static,
39+
{
40+
let future = self.droppable_future(future);
41+
let schedule = self.runnable_schedule_cb();
42+
let (runnable, task) = async_task::spawn(future, schedule);
43+
runnable.schedule();
44+
task
45+
}
46+
47+
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
48+
where
49+
T: 'static,
50+
{
51+
let future = self.droppable_future(future);
52+
let schedule = self.runnable_schedule_cb();
53+
let (runnable, task) = async_task::spawn_local(future, schedule);
54+
runnable.schedule();
55+
task
56+
}
57+
58+
pub fn num_tasks(&self) -> usize {
59+
self.num_spawned_tasks.load(Ordering::Relaxed)
60+
}
61+
62+
/// Run the woken tasks once
63+
///
64+
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
65+
pub fn tick(&self) {
66+
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
67+
self.channel
68+
.1
69+
.try_iter()
70+
.take(num_woken_tasks)
71+
.for_each(|runnable| {
72+
runnable.run();
73+
});
74+
self.num_woken_tasks
75+
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
76+
}
77+
78+
fn droppable_future<F>(&self, future: F) -> DroppableFuture<F, impl Fn()>
79+
where
80+
F: Future,
81+
{
82+
self.num_spawned_tasks.fetch_add(1, Ordering::Relaxed);
83+
let num_spawned_tasks = self.num_spawned_tasks.clone();
84+
DroppableFuture::new(future, move || {
85+
num_spawned_tasks.fetch_sub(1, Ordering::Relaxed);
86+
})
87+
}
88+
89+
fn runnable_schedule_cb(&self) -> impl Fn(Runnable) {
90+
let sender = self.channel.0.clone();
91+
let num_woken_tasks = self.num_woken_tasks.clone();
92+
move |runnable| {
93+
sender.send(runnable).unwrap_or(());
94+
num_woken_tasks.fetch_add(1, Ordering::Relaxed);
95+
}
96+
}
97+
}
98+
99+
#[cfg(test)]
100+
mod tests {
101+
use super::*;
102+
103+
#[test]
104+
fn test_multiple_tasks() {
105+
let executor = TickedAsyncExecutor::new();
106+
executor
107+
.spawn_local(async move {
108+
println!("A: Start");
109+
tokio::task::yield_now().await;
110+
println!("A: End");
111+
})
112+
.detach();
113+
114+
executor
115+
.spawn_local(async move {
116+
println!("B: Start");
117+
tokio::task::yield_now().await;
118+
println!("B: End");
119+
})
120+
.detach();
121+
122+
// A, B, C: Start
123+
executor.tick();
124+
assert_eq!(executor.num_tasks(), 2);
125+
126+
// A, B, C: End
127+
executor.tick();
128+
assert_eq!(executor.num_tasks(), 0);
129+
}
130+
131+
#[test]
132+
fn test_task_cancellation() {
133+
let executor = TickedAsyncExecutor::new();
134+
let task1 = executor.spawn_local(async move {
135+
loop {
136+
println!("A: Start");
137+
tokio::task::yield_now().await;
138+
println!("A: End");
139+
}
140+
});
141+
142+
let task2 = executor.spawn_local(async move {
143+
loop {
144+
println!("B: Start");
145+
tokio::task::yield_now().await;
146+
println!("B: End");
147+
}
148+
});
149+
assert_eq!(executor.num_tasks(), 2);
150+
executor.tick();
151+
152+
executor
153+
.spawn_local(async move {
154+
task1.cancel().await;
155+
task2.cancel().await;
156+
})
157+
.detach();
158+
assert_eq!(executor.num_tasks(), 3);
159+
160+
// Since we have cancelled the tasks above, the loops should eventually end
161+
while executor.num_tasks() != 0 {
162+
executor.tick();
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)