From 339580ddb59b47895e1fab8ce523631a4d32df4b Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 18:14:04 -0700 Subject: [PATCH 1/8] Remove spawn API --- src/ticked_async_executor.rs | 43 ++++++++++-------------------------- tests/tokio_tests.rs | 8 +++---- 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 6ce42e9..dbb6388 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -22,6 +22,7 @@ type Payload = (TaskIdentifier, async_task::Runnable); pub struct TickedAsyncExecutor { channel: (mpsc::Sender, mpsc::Receiver), num_woken_tasks: Arc, + num_spawned_tasks: Arc, // TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel @@ -52,22 +53,6 @@ where } } - pub fn spawn( - &self, - identifier: impl Into, - future: impl Future + Send + 'static, - ) -> Task - where - T: Send + 'static, - { - let identifier = identifier.into(); - let future = self.droppable_future(identifier.clone(), future); - let schedule = self.runnable_schedule_cb(identifier); - let (runnable, task) = async_task::spawn(future, schedule); - runnable.schedule(); - task - } - pub fn spawn_local( &self, identifier: impl Into, @@ -172,7 +157,7 @@ mod tests { fn test_multiple_tasks() { let executor = TickedAsyncExecutor::default(); executor - .spawn("A", async move { + .spawn_local("A", async move { tokio::task::yield_now().await; }) .detach(); @@ -226,16 +211,7 @@ mod tests { fn test_ticked_timer() { let executor = TickedAsyncExecutor::default(); - for _ in 0..10 { - let timer: TickedTimer = executor.create_timer(); - executor - .spawn("ThreadedTimer", async move { - timer.sleep_for(256.0).await; - }) - .detach(); - } - - for _ in 0..10 { + for _ in 0..10000 { let timer = executor.create_timer(); executor .spawn_local("LocalTimer", async move { @@ -255,25 +231,30 @@ mod tests { let elapsed = now.elapsed(); println!("Elapsed: {:?}", elapsed); println!("Total: {:?}", instances); + println!( + "Min: {:?}, Max: {:?}", + instances.iter().min(), + instances.iter().max() + ); // Test Timer cancellation let timer = executor.create_timer(); executor - .spawn("ThreadedFuture", async move { + .spawn_local("LocalFuture1", async move { timer.sleep_for(1000.0).await; }) .detach(); let timer = executor.create_timer(); executor - .spawn_local("LocalFuture", async move { + .spawn_local("LocalFuture2", async move { timer.sleep_for(1000.0).await; }) .detach(); let mut tick_event = executor.tick_channel(); executor - .spawn("ThreadedTickFuture", async move { + .spawn_local("LocalTickFuture1", async move { loop { let _r = tick_event.changed().await; if _r.is_err() { @@ -285,7 +266,7 @@ mod tests { let mut tick_event = executor.tick_channel(); executor - .spawn_local("LocalTickFuture", async move { + .spawn_local("LocalTickFuture2", async move { loop { let _r = tick_event.changed().await; if _r.is_err() { diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 6e5ee0d..6dcdb47 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -9,7 +9,7 @@ fn test_tokio_join() { let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (tx2, mut rx2) = tokio::sync::mpsc::channel::(1); executor - .spawn("ThreadedFuture", async move { + .spawn_local("LocalFuture1", async move { let (a, b) = tokio::join!(rx1.recv(), rx2.recv()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -19,7 +19,7 @@ fn test_tokio_join() { let (tx3, mut rx3) = tokio::sync::mpsc::channel::(1); let (tx4, mut rx4) = tokio::sync::mpsc::channel::(1); executor - .spawn("LocalFuture", async move { + .spawn_local("LocalFuture2", async move { let (a, b) = tokio::join!(rx3.recv(), rx4.recv()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -46,7 +46,7 @@ fn test_tokio_select() { let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (_tx2, mut rx2) = tokio::sync::mpsc::channel::(1); executor - .spawn("ThreadedFuture", async move { + .spawn_local("LocalFuture1", async move { tokio::select! { data = rx1.recv() => { assert_eq!(data.unwrap(), 10); @@ -59,7 +59,7 @@ fn test_tokio_select() { let (tx3, mut rx3) = tokio::sync::mpsc::channel::(1); let (_tx4, mut rx4) = tokio::sync::mpsc::channel::(1); executor - .spawn("LocalFuture", async move { + .spawn_local("LocalFuture2", async move { tokio::select! { data = rx3.recv() => { assert_eq!(data.unwrap(), 10); From 2e5a29f00db52239a726d846da03f9e66d6818fa Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 18:47:02 -0700 Subject: [PATCH 2/8] Update ticked_async_executor.rs --- src/ticked_async_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index dbb6388..7704242 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -211,7 +211,7 @@ mod tests { fn test_ticked_timer() { let executor = TickedAsyncExecutor::default(); - for _ in 0..10000 { + for _ in 0..10 { let timer = executor.create_timer(); executor .spawn_local("LocalTimer", async move { From 41e923a1ed5e2c015ff10fc6cf9e0331c06d8c8e Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 18:58:42 -0700 Subject: [PATCH 3/8] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6919c0b..3f56afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ pin-project = "1" # For timer only # TODO, Add this under a feature gate # TODO, Only tokio::sync::watch channel is used (find individual dependency) -tokio = { version = "1.0", default-features = false, features = ["sync"] } +tokio = { version = "1.38.1", default-features = false, features = ["sync"] } [dev-dependencies] tokio = { version = "1", features = ["full"] } From 8960bae43a57228cd79d4ff6e850f24bb820d692 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 19:03:43 -0700 Subject: [PATCH 4/8] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3f56afa..93e6217 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ pin-project = "1" # For timer only # TODO, Add this under a feature gate # TODO, Only tokio::sync::watch channel is used (find individual dependency) -tokio = { version = "1.38.1", default-features = false, features = ["sync"] } +tokio = { version = "=1.38.1", default-features = false, features = ["sync"] } [dev-dependencies] tokio = { version = "1", features = ["full"] } From 44d5a326884388bf230285d183415c97fb2e7304 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 19:09:51 -0700 Subject: [PATCH 5/8] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 93e6217..c8aa471 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,4 @@ pin-project = "1" tokio = { version = "=1.38.1", default-features = false, features = ["sync"] } [dev-dependencies] -tokio = { version = "1", features = ["full"] } +tokio = { version = "=1.38.1", features = ["full"] } From 005edae53e5f8c6edc199cc87c7c91f6b79a2748 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 19:13:43 -0700 Subject: [PATCH 6/8] Revert "Remove spawn API" This reverts commit 339580ddb59b47895e1fab8ce523631a4d32df4b. --- src/ticked_async_executor.rs | 41 ++++++++++++++++++++++++++---------- tests/tokio_tests.rs | 8 +++---- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 7704242..6ce42e9 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -22,7 +22,6 @@ type Payload = (TaskIdentifier, async_task::Runnable); pub struct TickedAsyncExecutor { channel: (mpsc::Sender, mpsc::Receiver), num_woken_tasks: Arc, - num_spawned_tasks: Arc, // TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel @@ -53,6 +52,22 @@ where } } + pub fn spawn( + &self, + identifier: impl Into, + future: impl Future + Send + 'static, + ) -> Task + where + T: Send + 'static, + { + let identifier = identifier.into(); + let future = self.droppable_future(identifier.clone(), future); + let schedule = self.runnable_schedule_cb(identifier); + let (runnable, task) = async_task::spawn(future, schedule); + runnable.schedule(); + task + } + pub fn spawn_local( &self, identifier: impl Into, @@ -157,7 +172,7 @@ mod tests { fn test_multiple_tasks() { let executor = TickedAsyncExecutor::default(); executor - .spawn_local("A", async move { + .spawn("A", async move { tokio::task::yield_now().await; }) .detach(); @@ -211,6 +226,15 @@ mod tests { fn test_ticked_timer() { let executor = TickedAsyncExecutor::default(); + for _ in 0..10 { + let timer: TickedTimer = executor.create_timer(); + executor + .spawn("ThreadedTimer", async move { + timer.sleep_for(256.0).await; + }) + .detach(); + } + for _ in 0..10 { let timer = executor.create_timer(); executor @@ -231,30 +255,25 @@ mod tests { let elapsed = now.elapsed(); println!("Elapsed: {:?}", elapsed); println!("Total: {:?}", instances); - println!( - "Min: {:?}, Max: {:?}", - instances.iter().min(), - instances.iter().max() - ); // Test Timer cancellation let timer = executor.create_timer(); executor - .spawn_local("LocalFuture1", async move { + .spawn("ThreadedFuture", async move { timer.sleep_for(1000.0).await; }) .detach(); let timer = executor.create_timer(); executor - .spawn_local("LocalFuture2", async move { + .spawn_local("LocalFuture", async move { timer.sleep_for(1000.0).await; }) .detach(); let mut tick_event = executor.tick_channel(); executor - .spawn_local("LocalTickFuture1", async move { + .spawn("ThreadedTickFuture", async move { loop { let _r = tick_event.changed().await; if _r.is_err() { @@ -266,7 +285,7 @@ mod tests { let mut tick_event = executor.tick_channel(); executor - .spawn_local("LocalTickFuture2", async move { + .spawn_local("LocalTickFuture", async move { loop { let _r = tick_event.changed().await; if _r.is_err() { diff --git a/tests/tokio_tests.rs b/tests/tokio_tests.rs index 6dcdb47..6e5ee0d 100644 --- a/tests/tokio_tests.rs +++ b/tests/tokio_tests.rs @@ -9,7 +9,7 @@ fn test_tokio_join() { let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (tx2, mut rx2) = tokio::sync::mpsc::channel::(1); executor - .spawn_local("LocalFuture1", async move { + .spawn("ThreadedFuture", async move { let (a, b) = tokio::join!(rx1.recv(), rx2.recv()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -19,7 +19,7 @@ fn test_tokio_join() { let (tx3, mut rx3) = tokio::sync::mpsc::channel::(1); let (tx4, mut rx4) = tokio::sync::mpsc::channel::(1); executor - .spawn_local("LocalFuture2", async move { + .spawn("LocalFuture", async move { let (a, b) = tokio::join!(rx3.recv(), rx4.recv()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -46,7 +46,7 @@ fn test_tokio_select() { let (tx1, mut rx1) = tokio::sync::mpsc::channel::(1); let (_tx2, mut rx2) = tokio::sync::mpsc::channel::(1); executor - .spawn_local("LocalFuture1", async move { + .spawn("ThreadedFuture", async move { tokio::select! { data = rx1.recv() => { assert_eq!(data.unwrap(), 10); @@ -59,7 +59,7 @@ fn test_tokio_select() { let (tx3, mut rx3) = tokio::sync::mpsc::channel::(1); let (_tx4, mut rx4) = tokio::sync::mpsc::channel::(1); executor - .spawn_local("LocalFuture2", async move { + .spawn("LocalFuture", async move { tokio::select! { data = rx3.recv() => { assert_eq!(data.unwrap(), 10); From 445ad71834a1d26982da4d9a8acb973fe44bf823 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 20:04:12 -0700 Subject: [PATCH 7/8] Test futures with spawn_local --- Cargo.toml | 1 + tests/futures_tests.rs | 82 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 tests/futures_tests.rs diff --git a/Cargo.toml b/Cargo.toml index c8aa471..497c695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ tokio = { version = "=1.38.1", default-features = false, features = ["sync"] } [dev-dependencies] tokio = { version = "=1.38.1", features = ["full"] } +futures = "0.3" diff --git a/tests/futures_tests.rs b/tests/futures_tests.rs new file mode 100644 index 0000000..522eb10 --- /dev/null +++ b/tests/futures_tests.rs @@ -0,0 +1,82 @@ +use futures::StreamExt; +use ticked_async_executor::TickedAsyncExecutor; + +const DELTA: f64 = 1000.0 / 60.0; + +#[test] +fn test_futures_join() { + let executor = TickedAsyncExecutor::default(); + + let (mut tx1, mut rx1) = futures::channel::mpsc::channel::(1); + let (mut tx2, mut rx2) = futures::channel::mpsc::channel::(1); + executor + .spawn_local("ThreadedFuture", async move { + let (a, b) = futures::join!(rx1.next(), rx2.next()); + assert_eq!(a.unwrap(), 10); + assert_eq!(b.unwrap(), 20); + }) + .detach(); + + let (mut tx3, mut rx3) = futures::channel::mpsc::channel::(1); + let (mut tx4, mut rx4) = futures::channel::mpsc::channel::(1); + executor + .spawn_local("LocalFuture", async move { + let (a, b) = futures::join!(rx3.next(), rx4.next()); + assert_eq!(a.unwrap(), 10); + assert_eq!(b.unwrap(), 20); + }) + .detach(); + + tx1.try_send(10).unwrap(); + tx3.try_send(10).unwrap(); + for _ in 0..10 { + executor.tick(DELTA); + } + tx2.try_send(20).unwrap(); + tx4.try_send(20).unwrap(); + + while executor.num_tasks() != 0 { + executor.tick(DELTA); + } +} + +#[test] +fn test_futures_select() { + let executor = TickedAsyncExecutor::default(); + + let (mut tx1, mut rx1) = futures::channel::mpsc::channel::(1); + let (_tx2, mut rx2) = futures::channel::mpsc::channel::(1); + executor + .spawn_local("ThreadedFuture", async move { + futures::select! { + data = rx1.next() => { + assert_eq!(data.unwrap(), 10); + } + _ = rx2.next() => {} + } + }) + .detach(); + + let (mut tx3, mut rx3) = futures::channel::mpsc::channel::(1); + let (_tx4, mut rx4) = futures::channel::mpsc::channel::(1); + executor + .spawn_local("LocalFuture", async move { + futures::select! { + data = rx3.next() => { + assert_eq!(data.unwrap(), 10); + } + _ = rx4.next() => {} + } + }) + .detach(); + + for _ in 0..10 { + executor.tick(DELTA); + } + + tx1.try_send(10).unwrap(); + tx3.try_send(10).unwrap(); + while executor.num_tasks() != 0 { + executor.tick(DELTA); + } +} From 385e3835958699ea1a1c1dd2ac4d9fa0c16e8e37 Mon Sep 17 00:00:00 2001 From: Niket Naidu Date: Mon, 5 Aug 2024 20:10:32 -0700 Subject: [PATCH 8/8] Update futures_tests.rs --- tests/futures_tests.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/futures_tests.rs b/tests/futures_tests.rs index 522eb10..636352e 100644 --- a/tests/futures_tests.rs +++ b/tests/futures_tests.rs @@ -10,7 +10,7 @@ fn test_futures_join() { let (mut tx1, mut rx1) = futures::channel::mpsc::channel::(1); let (mut tx2, mut rx2) = futures::channel::mpsc::channel::(1); executor - .spawn_local("ThreadedFuture", async move { + .spawn("ThreadedFuture", async move { let (a, b) = futures::join!(rx1.next(), rx2.next()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -20,7 +20,7 @@ fn test_futures_join() { let (mut tx3, mut rx3) = futures::channel::mpsc::channel::(1); let (mut tx4, mut rx4) = futures::channel::mpsc::channel::(1); executor - .spawn_local("LocalFuture", async move { + .spawn("LocalFuture", async move { let (a, b) = futures::join!(rx3.next(), rx4.next()); assert_eq!(a.unwrap(), 10); assert_eq!(b.unwrap(), 20); @@ -47,7 +47,7 @@ fn test_futures_select() { let (mut tx1, mut rx1) = futures::channel::mpsc::channel::(1); let (_tx2, mut rx2) = futures::channel::mpsc::channel::(1); executor - .spawn_local("ThreadedFuture", async move { + .spawn("ThreadedFuture", async move { futures::select! { data = rx1.next() => { assert_eq!(data.unwrap(), 10); @@ -60,7 +60,7 @@ fn test_futures_select() { let (mut tx3, mut rx3) = futures::channel::mpsc::channel::(1); let (_tx4, mut rx4) = futures::channel::mpsc::channel::(1); executor - .spawn_local("LocalFuture", async move { + .spawn("LocalFuture", async move { futures::select! { data = rx3.next() => { assert_eq!(data.unwrap(), 10);