Skip to content

Commit 9226739

Browse files
committed
Mild cleanups in the task module
* Only use one thread local * use `match` when using `compare_exchange` * less loads during the `notify` loop
1 parent f4edc67 commit 9226739

File tree

2 files changed

+61
-42
lines changed

2 files changed

+61
-42
lines changed

src/task/mod.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,41 @@ use self::unpark_mutex::UnparkMutex;
4343

4444
use typemap::{SendMap, TypeMap};
4545

46-
thread_local!(static CURRENT_TASK: Cell<*const Task> = Cell::new(0 as *const _));
47-
thread_local!(static CURRENT_TASK_DATA: Cell<*const TaskData> = Cell::new(0 as *const _));
48-
49-
// TODO: make this more robust around overflow on 32-bit machines
50-
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
46+
thread_local!(static CURRENT_TASK: Cell<(*const Task, *const TaskData)> = {
47+
Cell::new((0 as *const _, 0 as *const _))
48+
});
49+
50+
fn fresh_task_id() -> usize {
51+
// TODO: this assert is a real bummer, need to figure out how to reuse
52+
// old IDs that are no longer in use.
53+
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
54+
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
55+
assert!(id < usize::max_value() / 2,
56+
"too many previous tasks have been allocated");
57+
return id
58+
}
5159

5260
fn set<F, R>(task: &Task, data: &TaskData, f: F) -> R
5361
where F: FnOnce() -> R
5462
{
55-
struct Reset(*const Task, *const TaskData);
63+
struct Reset((*const Task, *const TaskData));
5664
impl Drop for Reset {
5765
fn drop(&mut self) {
5866
CURRENT_TASK.with(|c| c.set(self.0));
59-
CURRENT_TASK_DATA.with(|c| c.set(self.1));
6067
}
6168
}
6269

6370
CURRENT_TASK.with(|c| {
64-
CURRENT_TASK_DATA.with(|d| {
65-
let _reset = Reset(c.get(), d.get());
66-
c.set(task);
67-
d.set(data);
68-
f()
69-
})
71+
let _reset = Reset(c.get());
72+
c.set((task as *const _, data as *const _));
73+
f()
7074
})
7175
}
7276

7377
fn with<F: FnOnce(&Task, &TaskData) -> R, R>(f: F) -> R {
74-
let task = CURRENT_TASK.with(|c| c.get());
75-
let data = CURRENT_TASK_DATA.with(|d| d.get());
76-
assert!(task != 0 as *const _, "No `Task` is currently running");
77-
assert!(data != 0 as *const _, "No `TaskData` is available; is a `Task` runnin?");
78+
let (task, data) = CURRENT_TASK.with(|c| c.get());
79+
assert!(!task.is_null(), "no Task is currently running");
80+
assert!(!data.is_null());
7881
unsafe {
7982
f(&*task, &*data)
8083
}
@@ -204,6 +207,11 @@ pub struct Task {
204207
events: Events,
205208
}
206209

210+
fn _assert_kinds() {
211+
fn _assert_send<T: Send>() {}
212+
_assert_send::<Task>();
213+
}
214+
207215
#[derive(Clone)]
208216
/// The two kinds of execution models: `Executor`-based or local thread-based
209217
enum TaskKind {
@@ -245,7 +253,7 @@ impl ThreadTask {
245253
/// current thread.
246254
pub fn new() -> ThreadTask {
247255
ThreadTask {
248-
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
256+
id: fresh_task_id(),
249257
task_data: RefCell::new(TypeMap::custom()),
250258
}
251259
}
@@ -356,9 +364,9 @@ impl Task {
356364
///
357365
/// Does not actually begin task execution; use the `unpark` method to do
358366
/// so.
359-
pub fn new(exec: Arc<Executor>, future: BoxFuture<(), ()>) -> Task {
367+
pub fn new() -> Task {
360368
Task {
361-
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
369+
id: fresh_task_id(),
362370
kind: TaskKind::Executor {
363371
mutex: Arc::new(UnparkMutex::new(MutexInner {
364372
future: future,
@@ -417,15 +425,15 @@ impl Events {
417425
}
418426

419427
fn trigger(&self) {
420-
for event in &self.set {
428+
for event in self.set.iter() {
421429
event.set.insert(event.item)
422430
}
423431
}
424432

425433
fn with_event(&self, event: UnparkEvent) -> Events {
426434
let mut set = self.set.clone();
427435
set.push(event);
428-
Events{ set: set }
436+
Events { set: set }
429437
}
430438
}
431439

src/task/unpark_mutex.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,28 +57,36 @@ impl<D> UnparkMutex<D> {
5757
/// that polling is not necessary (because the task is finished or the
5858
/// polling has been delegated).
5959
pub fn notify(&self) -> Result<D, ()> {
60+
let mut status = self.status.load(SeqCst);
6061
loop {
61-
match self.status.load(SeqCst) {
62+
match status {
6263
// The task is idle, so try to run it immediately.
6364
WAITING => {
64-
if self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst).is_ok() {
65-
let data = unsafe {
66-
// SAFETY: we've ensured mutual exclusion via the status
67-
// protocol; we are the only thread that has transitioned to the
68-
// POLLING state, and we won't transition back to QUEUED until
69-
// the lock is "released" by this thread. See the protocol
70-
// diagram above.
71-
(*self.inner.get()).take().unwrap()
72-
};
73-
return Ok(data);
65+
match self.status.compare_exchange(WAITING, POLLING,
66+
SeqCst, SeqCst) {
67+
Ok(_) => {
68+
let data = unsafe {
69+
// SAFETY: we've ensured mutual exclusion via
70+
// the status protocol; we are the only thread
71+
// that has transitioned to the POLLING state,
72+
// and we won't transition back to QUEUED until
73+
// the lock is "released" by this thread. See
74+
// the protocol diagram above.
75+
(*self.inner.get()).take().unwrap()
76+
};
77+
return Ok(data);
78+
}
79+
Err(cur) => status = cur,
7480
}
7581
}
7682

77-
// The task is being polled, so we need to record that it should be *repolled*
78-
// when complete.
83+
// The task is being polled, so we need to record that it should
84+
// be *repolled* when complete.
7985
POLLING => {
80-
if self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst).is_ok() {
81-
return Err(());
86+
match self.status.compare_exchange(POLLING, REPOLL,
87+
SeqCst, SeqCst) {
88+
Ok(_) => return Err(()),
89+
Err(cur) => status = cur,
8290
}
8391
}
8492

@@ -109,14 +117,17 @@ impl<D> UnparkMutex<D> {
109117
pub unsafe fn wait(&self, data: D) -> Result<(), D> {
110118
*self.inner.get() = Some(data);
111119

112-
if self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst).is_ok() {
120+
match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
113121
// no unparks came in while we were running
114-
Ok(())
115-
} else {
122+
Ok(_) => Ok(()),
123+
116124
// guaranteed to be in REPOLL state; just clobber the
117125
// state and run again.
118-
self.status.store(POLLING, SeqCst);
119-
Err((*self.inner.get()).take().unwrap())
126+
Err(status) => {
127+
assert_eq!(status, REPOLL);
128+
self.status.store(POLLING, SeqCst);
129+
Err((*self.inner.get()).take().unwrap())
130+
}
120131
}
121132
}
122133

0 commit comments

Comments
 (0)