diff --git a/Cargo.toml b/Cargo.toml index 6919c0b..923b932 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,18 @@ [package] name = "ticked_async_executor" -version = "0.1.0" +version = "0.2.0" +authors = ["coder137"] edition = "2021" +description = "Local executor that runs woken async tasks when it is ticked" +license = "Apache-2.0" +repository = "https://github.com/coder137/ticked-async-executor" +categories = ["asynchronous", "concurrency", "game-development", "simulation"] +readme = "README.md" [dependencies] async-task = "4.7" pin-project = "1" - -# For timer only -# TODO, Add this under a feature gate -# TODO, Only tokio::sync::watch channel is used (find individual dependency) -tokio = { version = "1.0", default-features = false, features = ["sync"] } +tokio = { version = "1", default-features = false, features = ["sync"] } [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/README.md b/README.md index 919abcb..b433167 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,71 @@ # Ticked Async Executor -Rust based Async Executor which executes woken tasks only when it is ticked +Async Local Executor which executes woken tasks only when it is ticked -# Example +# Usage + +## Default Local Executor ```rust +use ticked_async_executor::*; + +const DELTA: f64 = 1000.0 / 60.0; + let executor = TickedAsyncExecutor::default(); executor.spawn_local("MyIdentifier", async move {}).detach(); -// Make sure to tick your executor to run the tasks -executor.tick(DELTA, LIMIT); +// Tick your executor to run the tasks +assert_eq!(executor.num_tasks(), 1); +executor.tick(DELTA, None); +assert_eq!(executor.num_tasks(), 0); +``` + +## Split Local Executor + +```rust +use ticked_async_executor::*; + +const DELTA: f64 = 1000.0 / 60.0; + +let (spawner, ticker) = SplitTickedAsyncExecutor::default(); + +spawner.spawn_local("MyIdentifier", async move {}).detach(); + +// Tick your ticker to run the tasks +assert_eq!(spawner.num_tasks(), 1); +ticker.tick(DELTA, None); +assert_eq!(spawner.num_tasks(), 0); ``` -# Limitation +## Limit the number of woken tasks run per tick + +```rust +use ticked_async_executor::*; + +const DELTA: f64 = 1000.0 / 60.0; + +let executor = TickedAsyncExecutor::default(); + +executor.spawn_local("MyIdentifier1", async move {}).detach(); +executor.spawn_local("MyIdentifier2", async move {}).detach(); + +// Runs upto 1 woken tasks per tick +assert_eq!(executor.num_tasks(), 2); +executor.tick(DELTA, Some(1)); +assert_eq!(executor.num_tasks(), 1); +executor.tick(DELTA, Some(1)); +assert_eq!(executor.num_tasks(), 0); +``` + +# Caveats + +- Uses the `smol` ecosystem +- Ensure that tasks are spawned on the same thread as the one that initializes the executor + +# Roadmap -- Does not work with the tokio runtime and async constructs that use the tokio runtime internally +- [x] TickedAsyncExecutor +- [x] SplitTickedAsyncExecutor + - Similar to the channel API, but spawner and ticker cannot be moved to different threads +- [ ] Tracing diff --git a/src/lib.rs b/src/lib.rs index 5295ab3..2aeddef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![doc = include_str!("../README.md")] + mod droppable_future; use droppable_future::*; diff --git a/src/split_ticked_async_executor.rs b/src/split_ticked_async_executor.rs index 002b3c7..e0fda13 100644 --- a/src/split_ticked_async_executor.rs +++ b/src/split_ticked_async_executor.rs @@ -19,31 +19,40 @@ pub enum TaskState { pub type Task = async_task::Task; type Payload = (TaskIdentifier, async_task::Runnable); -pub fn new_split_ticked_async_executor( - observer: O, -) -> (TickedAsyncExecutorSpawner, TickedAsyncExecutorTicker) -where - O: Fn(TaskState) + Clone + Send + Sync + 'static, -{ - let (tx_channel, rx_channel) = mpsc::channel(); - let num_woken_tasks = Arc::new(AtomicUsize::new(0)); - let num_spawned_tasks = Arc::new(AtomicUsize::new(0)); - let (tx_tick_event, rx_tick_event) = tokio::sync::watch::channel(1.0); - let spawner = TickedAsyncExecutorSpawner { - tx_channel, - num_woken_tasks: num_woken_tasks.clone(), - num_spawned_tasks: num_spawned_tasks.clone(), - observer: observer.clone(), - rx_tick_event, - }; - let ticker = TickedAsyncExecutorTicker { - rx_channel, - num_woken_tasks, - num_spawned_tasks, - observer, - tx_tick_event, - }; - (spawner, ticker) +pub struct SplitTickedAsyncExecutor; + +impl SplitTickedAsyncExecutor { + pub fn default() -> ( + TickedAsyncExecutorSpawner, + TickedAsyncExecutorTicker, + ) { + Self::new(|_state| {}) + } + + pub fn new(observer: O) -> (TickedAsyncExecutorSpawner, TickedAsyncExecutorTicker) + where + O: Fn(TaskState) + Clone + Send + Sync + 'static, + { + let (tx_channel, rx_channel) = mpsc::channel(); + let num_woken_tasks = Arc::new(AtomicUsize::new(0)); + let num_spawned_tasks = Arc::new(AtomicUsize::new(0)); + let (tx_tick_event, rx_tick_event) = tokio::sync::watch::channel(1.0); + let spawner = TickedAsyncExecutorSpawner { + tx_channel, + num_woken_tasks: num_woken_tasks.clone(), + num_spawned_tasks: num_spawned_tasks.clone(), + observer: observer.clone(), + rx_tick_event, + }; + let ticker = TickedAsyncExecutorTicker { + rx_channel, + num_woken_tasks, + num_spawned_tasks, + observer, + tx_tick_event, + }; + (spawner, ticker) + } } pub struct TickedAsyncExecutorSpawner { diff --git a/src/ticked_async_executor.rs b/src/ticked_async_executor.rs index 525ab10..a014246 100644 --- a/src/ticked_async_executor.rs +++ b/src/ticked_async_executor.rs @@ -1,7 +1,7 @@ use std::future::Future; use crate::{ - new_split_ticked_async_executor, Task, TaskIdentifier, TaskState, TickedAsyncExecutorSpawner, + SplitTickedAsyncExecutor, Task, TaskIdentifier, TaskState, TickedAsyncExecutorSpawner, TickedAsyncExecutorTicker, TickedTimer, }; @@ -21,7 +21,7 @@ where O: Fn(TaskState) + Clone + Send + Sync + 'static, { pub fn new(observer: O) -> Self { - let (spawner, ticker) = new_split_ticked_async_executor(observer); + let (spawner, ticker) = SplitTickedAsyncExecutor::new(observer); Self { spawner, ticker } }