Skip to content

Commit 45c76da

Browse files
authored
TickedAsyncExecutor::tick with delta parameter (#5)
- `TickedTimer` feature where timer only runs when `TickedAsyncExecutor::tick` is invoked - Exposes `tokio::sync::watch::Receiver` to subscribe to tick event(s)
1 parent f22d942 commit 45c76da

File tree

6 files changed

+154
-12
lines changed

6 files changed

+154
-12
lines changed

.github/workflows/rust.yml

+7
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,10 @@ jobs:
4545
with:
4646
token: ${{ secrets.CODECOV_TOKEN }}
4747
files: target/cobertura.xml
48+
49+
- name: Miri
50+
run: |
51+
rustup toolchain install nightly --component miri
52+
rustup override set nightly
53+
cargo miri setup
54+
cargo miri test

Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,10 @@ edition = "2021"
77
async-task = "4.7"
88
pin-project = "1"
99

10+
# For timer only
11+
# TODO, Add this under a feature gate
12+
# TODO, Only tokio::sync::watch channel is used (find individual dependency)
13+
tokio = { version = "1.0", default-features = false, features = ["sync"] }
14+
1015
[dev-dependencies]
1116
tokio = { version = "1", features = ["full"] }

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ pub use task_identifier::*;
66

77
mod ticked_async_executor;
88
pub use ticked_async_executor::*;
9+
10+
mod ticked_timer;
11+
pub use ticked_timer::*;

src/ticked_async_executor.rs

+112-8
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use std::{
66
},
77
};
88

9-
use crate::{DroppableFuture, TaskIdentifier};
9+
use crate::{DroppableFuture, TaskIdentifier, TickedTimer};
1010

1111
#[derive(Debug)]
1212
pub enum TaskState {
1313
Spawn(TaskIdentifier),
1414
Wake(TaskIdentifier),
15-
Tick(TaskIdentifier),
15+
Tick(TaskIdentifier, f64),
1616
Drop(TaskIdentifier),
1717
}
1818

@@ -28,6 +28,8 @@ pub struct TickedAsyncExecutor<O> {
2828
// Broadcast recv channel should be notified when there are new messages in the queue
2929
// Broadcast channel must also be able to remove older/stale messages (like a RingBuffer)
3030
observer: O,
31+
32+
tick_event: tokio::sync::watch::Sender<f64>,
3133
}
3234

3335
impl Default for TickedAsyncExecutor<fn(TaskState)> {
@@ -46,6 +48,7 @@ where
4648
num_woken_tasks: Arc::new(AtomicUsize::new(0)),
4749
num_spawned_tasks: Arc::new(AtomicUsize::new(0)),
4850
observer,
51+
tick_event: tokio::sync::watch::channel(1.0).0,
4952
}
5053
}
5154

@@ -87,23 +90,43 @@ where
8790

8891
/// Run the woken tasks once
8992
///
93+
/// `delta` is used for timing based operations
94+
/// - `TickedTimer` uses this delta value to tick till completion
95+
///
96+
/// `maybe_limit` is used to limit the number of woken tasks run per tick
97+
/// - None would imply that there is no limit (all woken tasks would run)
98+
/// - Some(limit) would imply that [0..limit] woken tasks would run,
99+
/// even if more tasks are woken.
100+
///
90101
/// Tick is !Sync i.e cannot be invoked from multiple threads
91102
///
92103
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
93-
pub fn tick(&self) {
104+
pub fn tick(&self, delta: f64) {
105+
let _r = self.tick_event.send(delta);
106+
107+
// Clamp woken tasks to limit
94108
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
95109
self.channel
96110
.1
97111
.try_iter()
98112
.take(num_woken_tasks)
99113
.for_each(|(identifier, runnable)| {
100-
(self.observer)(TaskState::Tick(identifier));
114+
(self.observer)(TaskState::Tick(identifier, delta));
101115
runnable.run();
102116
});
103117
self.num_woken_tasks
104118
.fetch_sub(num_woken_tasks, Ordering::Relaxed);
105119
}
106120

121+
pub fn create_timer(&self) -> TickedTimer {
122+
let tick_recv = self.tick_event.subscribe();
123+
TickedTimer { tick_recv }
124+
}
125+
126+
pub fn tick_channel(&self) -> tokio::sync::watch::Receiver<f64> {
127+
self.tick_event.subscribe()
128+
}
129+
107130
fn droppable_future<F>(
108131
&self,
109132
identifier: TaskIdentifier,
@@ -141,6 +164,9 @@ where
141164
#[cfg(test)]
142165
mod tests {
143166
use super::*;
167+
use std::time::{Duration, Instant};
168+
169+
const DELTA: f64 = 1000.0 / 60.0;
144170

145171
#[test]
146172
fn test_multiple_tasks() {
@@ -157,10 +183,10 @@ mod tests {
157183
})
158184
.detach();
159185

160-
executor.tick();
186+
executor.tick(DELTA);
161187
assert_eq!(executor.num_tasks(), 2);
162188

163-
executor.tick();
189+
executor.tick(DELTA);
164190
assert_eq!(executor.num_tasks(), 0);
165191
}
166192

@@ -179,7 +205,7 @@ mod tests {
179205
}
180206
});
181207
assert_eq!(executor.num_tasks(), 2);
182-
executor.tick();
208+
executor.tick(DELTA);
183209

184210
executor
185211
.spawn_local("CancelTasks", async move {
@@ -192,7 +218,85 @@ mod tests {
192218

193219
// Since we have cancelled the tasks above, the loops should eventually end
194220
while executor.num_tasks() != 0 {
195-
executor.tick();
221+
executor.tick(DELTA);
196222
}
197223
}
224+
225+
#[test]
226+
fn test_ticked_timer() {
227+
let executor = TickedAsyncExecutor::default();
228+
229+
for _ in 0..10 {
230+
let timer: TickedTimer = executor.create_timer();
231+
executor
232+
.spawn("ThreadedTimer", async move {
233+
timer.sleep_for(256.0).await;
234+
})
235+
.detach();
236+
}
237+
238+
for _ in 0..10 {
239+
let timer = executor.create_timer();
240+
executor
241+
.spawn_local("LocalTimer", async move {
242+
timer.sleep_for(256.0).await;
243+
})
244+
.detach();
245+
}
246+
247+
let now = Instant::now();
248+
let mut instances = vec![];
249+
while executor.num_tasks() != 0 {
250+
let current = Instant::now();
251+
executor.tick(DELTA);
252+
instances.push(current.elapsed());
253+
std::thread::sleep(Duration::from_millis(16));
254+
}
255+
let elapsed = now.elapsed();
256+
println!("Elapsed: {:?}", elapsed);
257+
println!("Total: {:?}", instances);
258+
259+
// Test Timer cancellation
260+
let timer = executor.create_timer();
261+
executor
262+
.spawn("ThreadedFuture", async move {
263+
timer.sleep_for(1000.0).await;
264+
})
265+
.detach();
266+
267+
let timer = executor.create_timer();
268+
executor
269+
.spawn_local("LocalFuture", async move {
270+
timer.sleep_for(1000.0).await;
271+
})
272+
.detach();
273+
274+
let mut tick_event = executor.tick_channel();
275+
executor
276+
.spawn("ThreadedTickFuture", async move {
277+
loop {
278+
let _r = tick_event.changed().await;
279+
if _r.is_err() {
280+
break;
281+
}
282+
}
283+
})
284+
.detach();
285+
286+
let mut tick_event = executor.tick_channel();
287+
executor
288+
.spawn_local("LocalTickFuture", async move {
289+
loop {
290+
let _r = tick_event.changed().await;
291+
if _r.is_err() {
292+
break;
293+
}
294+
}
295+
})
296+
.detach();
297+
298+
executor.tick(DELTA);
299+
assert_eq!(executor.num_tasks(), 4);
300+
drop(executor);
301+
}
198302
}

src/ticked_timer.rs

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
pub struct TickedTimer {
2+
pub tick_recv: tokio::sync::watch::Receiver<f64>,
3+
}
4+
5+
impl TickedTimer {
6+
pub async fn sleep_for(mut self, mut duration_in_ms: f64) {
7+
loop {
8+
let _r = self.tick_recv.changed().await;
9+
if _r.is_err() {
10+
// This means that the executor supplying the delta channel has shutdown
11+
// We must stop waiting gracefully
12+
break;
13+
}
14+
let current_dt = *self.tick_recv.borrow_and_update();
15+
duration_in_ms -= current_dt;
16+
if duration_in_ms <= 0.0 {
17+
break;
18+
}
19+
}
20+
}
21+
}

tests/tokio_tests.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use ticked_async_executor::TickedAsyncExecutor;
22

3+
const DELTA: f64 = 1000.0 / 60.0;
4+
35
#[test]
46
fn test_tokio_join() {
57
let executor = TickedAsyncExecutor::default();
@@ -27,13 +29,13 @@ fn test_tokio_join() {
2729
tx1.try_send(10).unwrap();
2830
tx3.try_send(10).unwrap();
2931
for _ in 0..10 {
30-
executor.tick();
32+
executor.tick(DELTA);
3133
}
3234
tx2.try_send(20).unwrap();
3335
tx4.try_send(20).unwrap();
3436

3537
while executor.num_tasks() != 0 {
36-
executor.tick();
38+
executor.tick(DELTA);
3739
}
3840
}
3941

@@ -68,12 +70,12 @@ fn test_tokio_select() {
6870
.detach();
6971

7072
for _ in 0..10 {
71-
executor.tick();
73+
executor.tick(DELTA);
7274
}
7375

7476
tx1.try_send(10).unwrap();
7577
tx3.try_send(10).unwrap();
7678
while executor.num_tasks() != 0 {
77-
executor.tick();
79+
executor.tick(DELTA);
7880
}
7981
}

0 commit comments

Comments
 (0)