From f342e3fed5c4b8b59acde8b8409c2cad0636b3d2 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Thu, 12 Sep 2024 00:20:05 -0700 Subject: [PATCH 1/2] Add limit to tick --- src/ticked_async_executor.rs | 58 +++++++++++++++++++++++++++++------- tests/tokio_tests.rs | 8 ++--- 2 files changed, 52 insertions(+), 14 deletions(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 7704242..9af2875 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -78,7 +78,7 @@ where /// `delta` is used for timing based operations /// - `TickedTimer` uses this delta value to tick till completion /// - /// `maybe_limit` is used to limit the number of woken tasks run per tick + /// `limit` is used to limit the number of woken tasks run per tick /// - None would imply that there is no limit (all woken tasks would run) /// - Some(limit) would imply that [0..limit] woken tasks would run, /// even if more tasks are woken. @@ -86,11 +86,15 @@ where /// Tick is !Sync i.e cannot be invoked from multiple threads /// /// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run` - pub fn tick(&self, delta: f64) { + pub fn tick(&self, delta: f64, limit: Option) { let _r = self.tick_event.send(delta); - // Clamp woken tasks to limit - let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); + let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed); + if let Some(limit) = limit { + // Woken tasks should not exceed the allowed limit + num_woken_tasks = num_woken_tasks.min(limit); + } + self.channel .1 .try_iter() @@ -153,6 +157,20 @@ mod tests { const DELTA: f64 = 1000.0 / 60.0; + #[test] + fn test_one_task() { + const DELTA: f64 = 1.0 / 60.0; + const LIMIT: Option = None; + + let executor = TickedAsyncExecutor::default(); + + executor.spawn_local("MyIdentifier", async move {}).detach(); + + // Make sure to tick your executor to run the tasks + executor.tick(DELTA, LIMIT); + assert_eq!(executor.num_tasks(), 0); + } + #[test] fn test_multiple_tasks() { let executor = TickedAsyncExecutor::default(); @@ -168,10 +186,10 @@ mod tests { }) .detach(); - executor.tick(DELTA); + executor.tick(DELTA, None); assert_eq!(executor.num_tasks(), 2); - executor.tick(DELTA); + executor.tick(DELTA, None); assert_eq!(executor.num_tasks(), 0); } @@ -190,7 +208,7 @@ mod tests { } }); assert_eq!(executor.num_tasks(), 2); - executor.tick(DELTA); + executor.tick(DELTA, None); executor .spawn_local("CancelTasks", async move { @@ -203,7 +221,7 @@ mod tests { // Since we have cancelled the tasks above, the loops should eventually end while executor.num_tasks() != 0 { - executor.tick(DELTA); + executor.tick(DELTA, None); } } @@ -224,7 +242,7 @@ mod tests { let mut instances = vec![]; while executor.num_tasks() != 0 { let current = Instant::now(); - executor.tick(DELTA); + executor.tick(DELTA, None); instances.push(current.elapsed()); std::thread::sleep(Duration::from_millis(16)); } @@ -276,8 +294,28 @@ mod tests { }) .detach(); - executor.tick(DELTA); + executor.tick(DELTA, None); assert_eq!(executor.num_tasks(), 4); drop(executor); } + + #[test] + fn test_limit() { + let executor = TickedAsyncExecutor::default(); + for i in 0..10 { + executor + .spawn_local(format!("{i}"), async move { + println!("Finish {i}"); + }) + .detach(); + } + + for i in 0..10 { + let woken_tasks = executor.num_woken_tasks.load(Ordering::Relaxed); + assert_eq!(woken_tasks, 10 - i); + executor.tick(0.1, Some(1)); + } + + assert_eq!(executor.num_tasks(), 0); + } } diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 6dcdb47..1b2be1b 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -29,13 +29,13 @@ fn test_tokio_join() { tx1.try_send(10).unwrap(); tx3.try_send(10).unwrap(); for _ in 0..10 { - executor.tick(DELTA); + executor.tick(DELTA, None); } tx2.try_send(20).unwrap(); tx4.try_send(20).unwrap(); while executor.num_tasks() != 0 { - executor.tick(DELTA); + executor.tick(DELTA, None); } } @@ -70,12 +70,12 @@ fn test_tokio_select() { .detach(); for _ in 0..10 { - executor.tick(DELTA); + executor.tick(DELTA, None); } tx1.try_send(10).unwrap(); tx3.try_send(10).unwrap(); while executor.num_tasks() != 0 { - executor.tick(DELTA); + executor.tick(DELTA, None); } } From 6fa666deff1d36566bc9b871f37363ce6f005f29 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Thu, 12 Sep 2024 00:20:24 -0700 Subject: [PATCH 2/2] Update README.md Added example --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index c969645..919abcb 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,17 @@ Rust based Async Executor which executes woken tasks only when it is ticked +# Example + +```rust +let executor = TickedAsyncExecutor::default(); + +executor.spawn_local("MyIdentifier", async move {}).detach(); + +// Make sure to tick your executor to run the tasks +executor.tick(DELTA, LIMIT); +``` + # Limitation - Does not work with the tokio runtime and async constructs that use the tokio runtime internally