diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 761916b..2ad197f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -45,3 +45,10 @@ jobs: with: token: ${{ secrets.CODECOV_TOKEN }} files: target/cobertura.xml + + - name: Miri + run: | + rustup toolchain install nightly --component miri + rustup override set nightly + cargo miri setup + cargo miri test diff --git a/Cargo.toml b/Cargo.toml index f3fb95e..6919c0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,10 @@ edition = "2021" async-task = "4.7" pin-project = "1" +# For timer only +# TODO, Add this under a feature gate +# TODO, Only tokio::sync::watch channel is used (find individual dependency) +tokio = { version = "1.0", default-features = false, features = ["sync"] } + [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index 1e5a011..cd977a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,3 +6,6 @@ pub use task_identifier::*; mod ticked_async_executor; pub use ticked_async_executor::*; + +mod ticked_timer; +pub use ticked_timer::*; diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 70c3037..6ce42e9 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -6,13 +6,13 @@ use std::{ }, }; -use crate::{DroppableFuture, TaskIdentifier}; +use crate::{DroppableFuture, TaskIdentifier, TickedTimer}; #[derive(Debug)] pub enum TaskState { Spawn(TaskIdentifier), Wake(TaskIdentifier), - Tick(TaskIdentifier), + Tick(TaskIdentifier, f64), Drop(TaskIdentifier), } @@ -28,6 +28,8 @@ pub struct TickedAsyncExecutor { // Broadcast recv channel should be notified when there are new messages in the queue // Broadcast channel must also be able to remove older/stale messages (like a RingBuffer) observer: O, + + tick_event: tokio::sync::watch::Sender, } impl Default for TickedAsyncExecutor { @@ -46,6 +48,7 @@ where num_woken_tasks: Arc::new(AtomicUsize::new(0)), num_spawned_tasks: Arc::new(AtomicUsize::new(0)), observer, + tick_event: tokio::sync::watch::channel(1.0).0, } } @@ -87,23 +90,43 @@ where /// Run the woken tasks once /// + /// `delta` is used for timing based operations + /// - `TickedTimer` uses this delta value to tick till completion + /// + /// `maybe_limit` is used to limit the number of woken tasks run per tick + /// - None would imply that there is no limit (all woken tasks would run) + /// - Some(limit) would imply that [0..limit] woken tasks would run, + /// even if more tasks are woken. + /// /// Tick is !Sync i.e cannot be invoked from multiple threads /// /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` - pub fn tick(&self) { + pub fn tick(&self, delta: f64) { + let _r = self.tick_event.send(delta); + + // Clamp woken tasks to limit let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); self.channel .1 .try_iter() .take(num_woken_tasks) .for_each(|(identifier, runnable)| { - (self.observer)(TaskState::Tick(identifier)); + (self.observer)(TaskState::Tick(identifier, delta)); runnable.run(); }); self.num_woken_tasks .fetch_sub(num_woken_tasks, Ordering::Relaxed); } + pub fn create_timer(&self) -> TickedTimer { + let tick_recv = self.tick_event.subscribe(); + TickedTimer { tick_recv } + } + + pub fn tick_channel(&self) -> tokio::sync::watch::Receiver { + self.tick_event.subscribe() + } + fn droppable_future( &self, identifier: TaskIdentifier, @@ -141,6 +164,9 @@ where #[cfg(test)] mod tests { use super::*; + use std::time::{Duration, Instant}; + + const DELTA: f64 = 1000.0 / 60.0; #[test] fn test_multiple_tasks() { @@ -157,10 +183,10 @@ mod tests { }) .detach(); - executor.tick(); + executor.tick(DELTA); assert_eq!(executor.num_tasks(), 2); - executor.tick(); + executor.tick(DELTA); assert_eq!(executor.num_tasks(), 0); } @@ -179,7 +205,7 @@ mod tests { } }); assert_eq!(executor.num_tasks(), 2); - executor.tick(); + executor.tick(DELTA); executor .spawn_local("CancelTasks", async move { @@ -192,7 +218,85 @@ mod tests { // Since we have cancelled the tasks above, the loops should eventually end while executor.num_tasks() != 0 { - executor.tick(); + executor.tick(DELTA); } } + + #[test] + fn test_ticked_timer() { + let executor = TickedAsyncExecutor::default(); + + for _ in 0..10 { + let timer: TickedTimer = executor.create_timer(); + executor + .spawn("ThreadedTimer", async move { + timer.sleep_for(256.0).await; + }) + .detach(); + } + + for _ in 0..10 { + let timer = executor.create_timer(); + executor + .spawn_local("LocalTimer", async move { + timer.sleep_for(256.0).await; + }) + .detach(); + } + + let now = Instant::now(); + let mut instances = vec![]; + while executor.num_tasks() != 0 { + let current = Instant::now(); + executor.tick(DELTA); + instances.push(current.elapsed()); + std::thread::sleep(Duration::from_millis(16)); + } + let elapsed = now.elapsed(); + println!("Elapsed: {:?}", elapsed); + println!("Total: {:?}", instances); + + // Test Timer cancellation + let timer = executor.create_timer(); + executor + .spawn("ThreadedFuture", async move { + timer.sleep_for(1000.0).await; + }) + .detach(); + + let timer = executor.create_timer(); + executor + .spawn_local("LocalFuture", async move { + timer.sleep_for(1000.0).await; + }) + .detach(); + + let mut tick_event = executor.tick_channel(); + executor + .spawn("ThreadedTickFuture", async move { + loop { + let _r = tick_event.changed().await; + if _r.is_err() { + break; + } + } + }) + .detach(); + + let mut tick_event = executor.tick_channel(); + executor + .spawn_local("LocalTickFuture", async move { + loop { + let _r = tick_event.changed().await; + if _r.is_err() { + break; + } + } + }) + .detach(); + + executor.tick(DELTA); + assert_eq!(executor.num_tasks(), 4); + drop(executor); + } } diff --git a/src/ticked_timer.rs b/src/ticked_timer.rs new file mode 100644 index 0000000..25172ea --- /dev/null +++ b/src/ticked_timer.rs @@ -0,0 +1,21 @@ +pub struct TickedTimer { + pub tick_recv: tokio::sync::watch::Receiver, +} + +impl TickedTimer { + pub async fn sleep_for(mut self, mut duration_in_ms: f64) { + loop { + let _r = self.tick_recv.changed().await; + if _r.is_err() { + // This means that the executor supplying the delta channel has shutdown + // We must stop waiting gracefully + break; + } + let current_dt = *self.tick_recv.borrow_and_update(); + duration_in_ms -= current_dt; + if duration_in_ms <= 0.0 { + break; + } + } + } +} diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 6b1db77..6e5ee0d 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -1,5 +1,7 @@ use ticked_async_executor::TickedAsyncExecutor; +const DELTA: f64 = 1000.0 / 60.0; + #[test] fn test_tokio_join() { let executor = TickedAsyncExecutor::default(); @@ -27,13 +29,13 @@ fn test_tokio_join() { tx1.try_send(10).unwrap(); tx3.try_send(10).unwrap(); for _ in 0..10 { - executor.tick(); + executor.tick(DELTA); } tx2.try_send(20).unwrap(); tx4.try_send(20).unwrap(); while executor.num_tasks() != 0 { - executor.tick(); + executor.tick(DELTA); } } @@ -68,12 +70,12 @@ fn test_tokio_select() { .detach(); for _ in 0..10 { - executor.tick(); + executor.tick(DELTA); } tx1.try_send(10).unwrap(); tx3.try_send(10).unwrap(); while executor.num_tasks() != 0 { - executor.tick(); + executor.tick(DELTA); } }