Skip to content

TickedAsyncExecutor::tick with delta parameter #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ jobs:
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: target/cobertura.xml

- name: Miri
run: |
rustup toolchain install nightly --component miri
rustup override set nightly
cargo miri setup
cargo miri test
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@ edition = "2021"
async-task = "4.7"
pin-project = "1"

# For timer only
# TODO, Add this under a feature gate
# TODO, Only tokio::sync::watch channel is used (find individual dependency)
tokio = { version = "1.0", default-features = false, features = ["sync"] }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ pub use task_identifier::*;

mod ticked_async_executor;
pub use ticked_async_executor::*;

mod ticked_timer;
pub use ticked_timer::*;
120 changes: 112 additions & 8 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::{
},
};

use crate::{DroppableFuture, TaskIdentifier};
use crate::{DroppableFuture, TaskIdentifier, TickedTimer};

#[derive(Debug)]
pub enum TaskState {
Spawn(TaskIdentifier),
Wake(TaskIdentifier),
Tick(TaskIdentifier),
Tick(TaskIdentifier, f64),
Drop(TaskIdentifier),
}

Expand All @@ -28,6 +28,8 @@ pub struct TickedAsyncExecutor<O> {
// Broadcast recv channel should be notified when there are new messages in the queue
// Broadcast channel must also be able to remove older/stale messages (like a RingBuffer)
observer: O,

tick_event: tokio::sync::watch::Sender<f64>,
}

impl Default for TickedAsyncExecutor<fn(TaskState)> {
Expand All @@ -46,6 +48,7 @@ where
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
observer,
tick_event: tokio::sync::watch::channel(1.0).0,
}
}

Expand Down Expand Up @@ -87,23 +90,43 @@ where

/// Run the woken tasks once
///
/// `delta` is used for timing based operations
/// - `TickedTimer` uses this delta value to tick till completion
///
/// `maybe_limit` is used to limit the number of woken tasks run per tick
/// - None would imply that there is no limit (all woken tasks would run)
/// - Some(limit) would imply that [0..limit] woken tasks would run,
/// even if more tasks are woken.
///
/// Tick is !Sync i.e cannot be invoked from multiple threads
///
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
pub fn tick(&self) {
pub fn tick(&self, delta: f64) {
let _r = self.tick_event.send(delta);

// Clamp woken tasks to limit
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
self.channel
.1
.try_iter()
.take(num_woken_tasks)
.for_each(|(identifier, runnable)| {
(self.observer)(TaskState::Tick(identifier));
(self.observer)(TaskState::Tick(identifier, delta));
runnable.run();
});
self.num_woken_tasks
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
}

pub fn create_timer(&self) -> TickedTimer {
let tick_recv = self.tick_event.subscribe();
TickedTimer { tick_recv }
}

pub fn tick_channel(&self) -> tokio::sync::watch::Receiver<f64> {
self.tick_event.subscribe()
}

fn droppable_future<F>(
&self,
identifier: TaskIdentifier,
Expand Down Expand Up @@ -141,6 +164,9 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, Instant};

const DELTA: f64 = 1000.0 / 60.0;

#[test]
fn test_multiple_tasks() {
Expand All @@ -157,10 +183,10 @@ mod tests {
})
.detach();

executor.tick();
executor.tick(DELTA);
assert_eq!(executor.num_tasks(), 2);

executor.tick();
executor.tick(DELTA);
assert_eq!(executor.num_tasks(), 0);
}

Expand All @@ -179,7 +205,7 @@ mod tests {
}
});
assert_eq!(executor.num_tasks(), 2);
executor.tick();
executor.tick(DELTA);

executor
.spawn_local("CancelTasks", async move {
Expand All @@ -192,7 +218,85 @@ mod tests {

// Since we have cancelled the tasks above, the loops should eventually end
while executor.num_tasks() != 0 {
executor.tick();
executor.tick(DELTA);
}
}

#[test]
fn test_ticked_timer() {
let executor = TickedAsyncExecutor::default();

for _ in 0..10 {
let timer: TickedTimer = executor.create_timer();
executor
.spawn("ThreadedTimer", async move {
timer.sleep_for(256.0).await;
})
.detach();
}

for _ in 0..10 {
let timer = executor.create_timer();
executor
.spawn_local("LocalTimer", async move {
timer.sleep_for(256.0).await;
})
.detach();
}

let now = Instant::now();
let mut instances = vec![];
while executor.num_tasks() != 0 {
let current = Instant::now();
executor.tick(DELTA);
instances.push(current.elapsed());
std::thread::sleep(Duration::from_millis(16));
}
let elapsed = now.elapsed();
println!("Elapsed: {:?}", elapsed);
println!("Total: {:?}", instances);

// Test Timer cancellation
let timer = executor.create_timer();
executor
.spawn("ThreadedFuture", async move {
timer.sleep_for(1000.0).await;
})
.detach();

let timer = executor.create_timer();
executor
.spawn_local("LocalFuture", async move {
timer.sleep_for(1000.0).await;
})
.detach();

let mut tick_event = executor.tick_channel();
executor
.spawn("ThreadedTickFuture", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
break;
}
}
})
.detach();

let mut tick_event = executor.tick_channel();
executor
.spawn_local("LocalTickFuture", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
break;
}
}
})
.detach();

executor.tick(DELTA);
assert_eq!(executor.num_tasks(), 4);
drop(executor);
}
}
21 changes: 21 additions & 0 deletions src/ticked_timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub struct TickedTimer {
pub tick_recv: tokio::sync::watch::Receiver<f64>,
}

impl TickedTimer {
pub async fn sleep_for(mut self, mut duration_in_ms: f64) {
loop {
let _r = self.tick_recv.changed().await;
if _r.is_err() {
// This means that the executor supplying the delta channel has shutdown
// We must stop waiting gracefully
break;

Check warning on line 12 in src/ticked_timer.rs

View check run for this annotation

Codecov / codecov/patch

src/ticked_timer.rs#L12

Added line #L12 was not covered by tests
}
let current_dt = *self.tick_recv.borrow_and_update();
duration_in_ms -= current_dt;
if duration_in_ms <= 0.0 {
break;
}
}
}
}
10 changes: 6 additions & 4 deletions tests/tokio_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use ticked_async_executor::TickedAsyncExecutor;

const DELTA: f64 = 1000.0 / 60.0;

#[test]
fn test_tokio_join() {
let executor = TickedAsyncExecutor::default();
Expand Down Expand Up @@ -27,13 +29,13 @@ fn test_tokio_join() {
tx1.try_send(10).unwrap();
tx3.try_send(10).unwrap();
for _ in 0..10 {
executor.tick();
executor.tick(DELTA);
}
tx2.try_send(20).unwrap();
tx4.try_send(20).unwrap();

while executor.num_tasks() != 0 {
executor.tick();
executor.tick(DELTA);
}
}

Expand Down Expand Up @@ -68,12 +70,12 @@ fn test_tokio_select() {
.detach();

for _ in 0..10 {
executor.tick();
executor.tick(DELTA);
}

tx1.try_send(10).unwrap();
tx3.try_send(10).unwrap();
while executor.num_tasks() != 0 {
executor.tick();
executor.tick(DELTA);
}
}