Skip to content

Commit af2e039

Browse files
committed
Enabled workstealing in the scheduler. Previously we had one global work queue shared by each scheduler. Now there is a separate work queue for each scheduler, and work is "stolen" from other queues when it is exhausted locally.
1 parent a0080f4 commit af2e039

File tree

9 files changed

+328
-63
lines changed

9 files changed

+328
-63
lines changed

src/libstd/rt/comm.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> {
225225
fn optimistic_check(&mut self) -> bool {
226226
// The optimistic check is never necessary for correctness. For testing
227227
// purposes, making it randomly return false simulates a racing sender.
228-
use rand::{Rand, rng};
229-
let mut rng = rng();
230-
let actually_check = Rand::rand(&mut rng);
228+
use rand::{Rand};
229+
let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
230+
Rand::rand(&mut sched.rng)
231+
};
231232
if actually_check {
232233
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
233234
} else {

src/libstd/rt/mod.rs

+22-10
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`:
6363
use cell::Cell;
6464
use clone::Clone;
6565
use container::Container;
66-
use iter::Times;
67-
use iterator::{Iterator, IteratorUtil};
66+
use iterator::{Iterator, IteratorUtil, range};
6867
use option::{Some, None};
6968
use ptr::RawPtr;
7069
use rt::local::Local;
@@ -247,24 +246,32 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
247246

248247
let main = Cell::new(main);
249248

250-
// The shared list of sleeping schedulers. Schedulers wake each other
251-
// occassionally to do new work.
249+
// The shared list of sleeping schedulers.
252250
let sleepers = SleeperList::new();
253-
// The shared work queue. Temporary until work stealing is implemented.
254-
let work_queue = WorkQueue::new();
251+
252+
// Create a work queue for each scheduler, ntimes. Create an extra
253+
// for the main thread if that flag is set. We won't steal from it.
254+
let mut work_queues = ~[];
255+
for _ in range(0u, nscheds) {
256+
let work_queue: WorkQueue<~Task> = WorkQueue::new();
257+
work_queues.push(work_queue);
258+
}
255259

256260
// The schedulers.
257261
let mut scheds = ~[];
258262
// Handles to the schedulers. When the main task ends these will be
259263
// sent the Shutdown message to terminate the schedulers.
260264
let mut handles = ~[];
261265

262-
do nscheds.times {
266+
for i in range(0u, nscheds) {
263267
rtdebug!("inserting a regular scheduler");
264268

265269
// Every scheduler is driven by an I/O event loop.
266270
let loop_ = ~UvEventLoop::new();
267-
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
271+
let mut sched = ~Scheduler::new(loop_,
272+
work_queues[i].clone(),
273+
work_queues.clone(),
274+
sleepers.clone());
268275
let handle = sched.make_handle();
269276

270277
scheds.push(sched);
@@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
280287
let friend_handle = friend_sched.make_handle();
281288
scheds.push(friend_sched);
282289

290+
// This scheduler needs a queue that isn't part of the stealee
291+
// set.
292+
let work_queue = WorkQueue::new();
293+
283294
let main_loop = ~UvEventLoop::new();
284295
let mut main_sched = ~Scheduler::new_special(main_loop,
285-
work_queue.clone(),
296+
work_queue,
297+
work_queues.clone(),
286298
sleepers.clone(),
287299
false,
288300
Some(friend_handle));
@@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
371383
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
372384
home, main.take());
373385
main_task.death.on_exit = Some(on_exit.take());
374-
rtdebug!("boostrapping main_task");
386+
rtdebug!("bootstrapping main_task");
375387

376388
main_sched.bootstrap(main_task);
377389
}

src/libstd/rt/sched.rs

+114-45
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use option::{Option, Some, None};
1313
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
1414
use clone::Clone;
1515
use unstable::raw;
16-
1716
use super::sleeper_list::SleeperList;
1817
use super::work_queue::WorkQueue;
1918
use super::stack::{StackPool};
@@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback;
2827
use rt::metrics::SchedMetrics;
2928
use borrow::{to_uint};
3029
use cell::Cell;
30+
use rand::{XorShiftRng, RngUtil};
31+
use iterator::{range};
32+
use vec::{OwnedVector};
3133

3234
/// The Scheduler is responsible for coordinating execution of Coroutines
3335
/// on a single thread. When the scheduler is running it is owned by
@@ -37,9 +39,11 @@ use cell::Cell;
3739
/// XXX: This creates too many callbacks to run_sched_once, resulting
3840
/// in too much allocation and too many events.
3941
pub struct Scheduler {
40-
/// A queue of available work. Under a work-stealing policy there
41-
/// is one per Scheduler.
42-
work_queue: WorkQueue<~Task>,
42+
/// There are N work queues, one per scheduler.
43+
priv work_queue: WorkQueue<~Task>,
44+
/// Work queues for the other schedulers. These are created by
45+
/// cloning the core work queues.
46+
work_queues: ~[WorkQueue<~Task>],
4347
/// The queue of incoming messages from other schedulers.
4448
/// These are enqueued by SchedHandles after which a remote callback
4549
/// is triggered to handle the message.
@@ -70,7 +74,10 @@ pub struct Scheduler {
7074
run_anything: bool,
7175
/// If the scheduler shouldn't run some tasks, a friend to send
7276
/// them to.
73-
friend_handle: Option<SchedHandle>
77+
friend_handle: Option<SchedHandle>,
78+
/// A fast XorShift rng for scheduler use
79+
rng: XorShiftRng
80+
7481
}
7582

7683
pub struct SchedHandle {
@@ -97,17 +104,21 @@ impl Scheduler {
97104

98105
pub fn new(event_loop: ~EventLoopObject,
99106
work_queue: WorkQueue<~Task>,
107+
work_queues: ~[WorkQueue<~Task>],
100108
sleeper_list: SleeperList)
101109
-> Scheduler {
102110

103-
Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
111+
Scheduler::new_special(event_loop, work_queue,
112+
work_queues,
113+
sleeper_list, true, None)
104114

105115
}
106116

107117
// When you create a scheduler it isn't yet "in" a task, so the
108118
// task field is None.
109119
pub fn new_special(event_loop: ~EventLoopObject,
110120
work_queue: WorkQueue<~Task>,
121+
work_queues: ~[WorkQueue<~Task>],
111122
sleeper_list: SleeperList,
112123
run_anything: bool,
113124
friend: Option<SchedHandle>)
@@ -120,12 +131,14 @@ impl Scheduler {
120131
no_sleep: false,
121132
event_loop: event_loop,
122133
work_queue: work_queue,
134+
work_queues: work_queues,
123135
stack_pool: StackPool::new(),
124136
sched_task: None,
125137
cleanup_job: None,
126138
metrics: SchedMetrics::new(),
127139
run_anything: run_anything,
128-
friend_handle: friend
140+
friend_handle: friend,
141+
rng: XorShiftRng::new()
129142
}
130143
}
131144

@@ -248,7 +261,7 @@ impl Scheduler {
248261

249262
// Second activity is to try resuming a task from the queue.
250263

251-
let result = sched.resume_task_from_queue();
264+
let result = sched.do_work();
252265
let mut sched = match result {
253266
Some(sched) => {
254267
// Failed to dequeue a task, so we return.
@@ -415,47 +428,98 @@ impl Scheduler {
415428
}
416429
}
417430

418-
// Resume a task from the queue - but also take into account that
419-
// it might not belong here.
431+
// Workstealing: In this iteration of the runtime each scheduler
432+
// thread has a distinct work queue. When no work is available
433+
// locally, make a few attempts to steal work from the queues of
434+
// other scheduler threads. If a few steals fail we end up in the
435+
// old "no work" path which is fine.
436+
437+
// First step in the process is to find a task. This function does
438+
// that by first checking the local queue, and if there is no work
439+
// there, trying to steal from the remote work queues.
440+
fn find_work(&mut self) -> Option<~Task> {
441+
rtdebug!("scheduler looking for work");
442+
match self.work_queue.pop() {
443+
Some(task) => {
444+
rtdebug!("found a task locally");
445+
return Some(task)
446+
}
447+
None => {
448+
// Our naive stealing, try kinda hard.
449+
rtdebug!("scheduler trying to steal");
450+
let _len = self.work_queues.len();
451+
return self.try_steals(2);
452+
}
453+
}
454+
}
455+
456+
// With no backoff try stealing n times from the queues the
457+
// scheduler knows about. This naive implementation can steal from
458+
// our own queue or from other special schedulers.
459+
fn try_steals(&mut self, n: uint) -> Option<~Task> {
460+
for _ in range(0, n) {
461+
let index = self.rng.gen_uint_range(0, self.work_queues.len());
462+
let work_queues = &mut self.work_queues;
463+
match work_queues[index].steal() {
464+
Some(task) => {
465+
rtdebug!("found task by stealing"); return Some(task)
466+
}
467+
None => ()
468+
}
469+
};
470+
rtdebug!("giving up on stealing");
471+
return None;
472+
}
420473

421-
// If we perform a scheduler action we give away the scheduler ~
422-
// pointer, if it is still available we return it.
474+
// Given a task, execute it correctly.
475+
fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
476+
let mut this = self;
477+
let mut task = task;
423478

424-
fn resume_task_from_queue(~self) -> Option<~Scheduler> {
479+
rtdebug!("processing a task");
425480

481+
let home = task.take_unwrap_home();
482+
match home {
483+
Sched(home_handle) => {
484+
if home_handle.sched_id != this.sched_id() {
485+
rtdebug!("sending task home");
486+
task.give_home(Sched(home_handle));
487+
Scheduler::send_task_home(task);
488+
return Some(this);
489+
} else {
490+
rtdebug!("running task here");
491+
task.give_home(Sched(home_handle));
492+
this.resume_task_immediately(task);
493+
return None;
494+
}
495+
}
496+
AnySched if this.run_anything => {
497+
rtdebug!("running anysched task here");
498+
task.give_home(AnySched);
499+
this.resume_task_immediately(task);
500+
return None;
501+
}
502+
AnySched => {
503+
rtdebug!("sending task to friend");
504+
task.give_home(AnySched);
505+
this.send_to_friend(task);
506+
return Some(this);
507+
}
508+
}
509+
}
510+
511+
// Bundle the helpers together.
512+
fn do_work(~self) -> Option<~Scheduler> {
426513
let mut this = self;
427514

428-
match this.work_queue.pop() {
515+
rtdebug!("scheduler calling do work");
516+
match this.find_work() {
429517
Some(task) => {
430-
let mut task = task;
431-
let home = task.take_unwrap_home();
432-
match home {
433-
Sched(home_handle) => {
434-
if home_handle.sched_id != this.sched_id() {
435-
task.give_home(Sched(home_handle));
436-
Scheduler::send_task_home(task);
437-
return Some(this);
438-
} else {
439-
this.event_loop.callback(Scheduler::run_sched_once);
440-
task.give_home(Sched(home_handle));
441-
this.resume_task_immediately(task);
442-
return None;
443-
}
444-
}
445-
AnySched if this.run_anything => {
446-
this.event_loop.callback(Scheduler::run_sched_once);
447-
task.give_home(AnySched);
448-
this.resume_task_immediately(task);
449-
return None;
450-
}
451-
AnySched => {
452-
task.give_home(AnySched);
453-
this.send_to_friend(task);
454-
return Some(this);
455-
}
456-
}
518+
rtdebug!("found some work! processing the task");
519+
return this.process_task(task);
457520
}
458521
None => {
522+
rtdebug!("no work was found, returning the scheduler struct");
459523
return Some(this);
460524
}
461525
}
@@ -711,7 +775,6 @@ impl Scheduler {
711775
GiveTask(task, f) => f.to_fn()(self, task)
712776
}
713777
}
714-
715778
}
716779

717780
// The cases for the below function.
@@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver {
745808

746809
#[cfg(test)]
747810
mod test {
811+
extern mod extra;
812+
748813
use prelude::*;
749814
use rt::test::*;
750815
use unstable::run_in_bare_thread;
@@ -862,12 +927,15 @@ mod test {
862927
do run_in_bare_thread {
863928

864929
let sleepers = SleeperList::new();
865-
let work_queue = WorkQueue::new();
930+
let normal_queue = WorkQueue::new();
931+
let special_queue = WorkQueue::new();
932+
let queues = ~[normal_queue.clone(), special_queue.clone()];
866933

867934
// Our normal scheduler
868935
let mut normal_sched = ~Scheduler::new(
869936
~UvEventLoop::new(),
870-
work_queue.clone(),
937+
normal_queue,
938+
queues.clone(),
871939
sleepers.clone());
872940

873941
let normal_handle = Cell::new(normal_sched.make_handle());
@@ -877,7 +945,8 @@ mod test {
877945
// Our special scheduler
878946
let mut special_sched = ~Scheduler::new_special(
879947
~UvEventLoop::new(),
880-
work_queue.clone(),
948+
special_queue.clone(),
949+
queues.clone(),
881950
sleepers.clone(),
882951
false,
883952
Some(friend_handle));

src/libstd/rt/select.rs

+2
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ mod test {
182182
fn select_stream() {
183183
use util;
184184
use comm::GenericChan;
185+
use iter::Times;
185186

186187
// Sends 10 buffered packets, and uses select to retrieve them all.
187188
// Puts the port in a different spot in the vector each time.
@@ -265,6 +266,7 @@ mod test {
265266

266267
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
267268
use rt::test::spawntask_random;
269+
use iter::Times;
268270

269271
do run_in_newsched_task {
270272
// A bit of stress, since ordinarily this is just smoke and mirrors.

0 commit comments

Comments
 (0)