Skip to content

Tick with limit #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

Rust based Async Executor which executes woken tasks only when it is ticked

# Example

```rust
let executor = TickedAsyncExecutor::default();

executor.spawn_local("MyIdentifier", async move {}).detach();

// Make sure to tick your executor to run the tasks
executor.tick(DELTA, LIMIT);
```

# Limitation

- Does not work with the tokio runtime and async constructs that use the tokio runtime internally
58 changes: 48 additions & 10 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,23 @@
/// `delta` is used for timing based operations
/// - `TickedTimer` uses this delta value to tick till completion
///
/// `maybe_limit` is used to limit the number of woken tasks run per tick
/// `limit` is used to limit the number of woken tasks run per tick
/// - None would imply that there is no limit (all woken tasks would run)
/// - Some(limit) would imply that [0..limit] woken tasks would run,
/// even if more tasks are woken.
///
/// Tick is !Sync i.e cannot be invoked from multiple threads
///
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
pub fn tick(&self, delta: f64) {
pub fn tick(&self, delta: f64, limit: Option<usize>) {
let _r = self.tick_event.send(delta);

// Clamp woken tasks to limit
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
if let Some(limit) = limit {
// Woken tasks should not exceed the allowed limit
num_woken_tasks = num_woken_tasks.min(limit);

Check warning on line 95 in src/ticked_async_executor.rs

View check run for this annotation

Codecov / codecov/patch

src/ticked_async_executor.rs#L95

Added line #L95 was not covered by tests
}

self.channel
.1
.try_iter()
Expand Down Expand Up @@ -153,6 +157,20 @@

const DELTA: f64 = 1000.0 / 60.0;

#[test]
fn test_one_task() {
const DELTA: f64 = 1.0 / 60.0;
const LIMIT: Option<usize> = None;

let executor = TickedAsyncExecutor::default();

executor.spawn_local("MyIdentifier", async move {}).detach();

// Make sure to tick your executor to run the tasks
executor.tick(DELTA, LIMIT);
assert_eq!(executor.num_tasks(), 0);
}

#[test]
fn test_multiple_tasks() {
let executor = TickedAsyncExecutor::default();
Expand All @@ -168,10 +186,10 @@
})
.detach();

executor.tick(DELTA);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 2);

executor.tick(DELTA);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 0);
}

Expand All @@ -190,7 +208,7 @@
}
});
assert_eq!(executor.num_tasks(), 2);
executor.tick(DELTA);
executor.tick(DELTA, None);

executor
.spawn_local("CancelTasks", async move {
Expand All @@ -203,7 +221,7 @@

// Since we have cancelled the tasks above, the loops should eventually end
while executor.num_tasks() != 0 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
}

Expand All @@ -224,7 +242,7 @@
let mut instances = vec![];
while executor.num_tasks() != 0 {
let current = Instant::now();
executor.tick(DELTA);
executor.tick(DELTA, None);
instances.push(current.elapsed());
std::thread::sleep(Duration::from_millis(16));
}
Expand Down Expand Up @@ -276,8 +294,28 @@
})
.detach();

executor.tick(DELTA);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 4);
drop(executor);
}

#[test]
fn test_limit() {
let executor = TickedAsyncExecutor::default();
for i in 0..10 {
executor
.spawn_local(format!("{i}"), async move {
println!("Finish {i}");
})
.detach();
}

for i in 0..10 {
let woken_tasks = executor.num_woken_tasks.load(Ordering::Relaxed);
assert_eq!(woken_tasks, 10 - i);
executor.tick(0.1, Some(1));
}

assert_eq!(executor.num_tasks(), 0);
}
}
8 changes: 4 additions & 4 deletions tests/tokio_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ fn test_tokio_join() {
tx1.try_send(10).unwrap();
tx3.try_send(10).unwrap();
for _ in 0..10 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
tx2.try_send(20).unwrap();
tx4.try_send(20).unwrap();

while executor.num_tasks() != 0 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
}

Expand Down Expand Up @@ -70,12 +70,12 @@ fn test_tokio_select() {
.detach();

for _ in 0..10 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}

tx1.try_send(10).unwrap();
tx3.try_send(10).unwrap();
while executor.num_tasks() != 0 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
}