Skip to content

Workstealing for the scheduler #8356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ impl<T> Select for PortOne<T> {
fn optimistic_check(&mut self) -> bool {
// The optimistic check is never necessary for correctness. For testing
// purposes, making it randomly return false simulates a racing sender.
use rand::{Rand, rng};
let mut rng = rng();
let actually_check = Rand::rand(&mut rng);
use rand::{Rand};
let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
Rand::rand(&mut sched.rng)
};
if actually_check {
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
} else {
Expand Down
32 changes: 22 additions & 10 deletions src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ Several modules in `core` are clients of `rt`:
use cell::Cell;
use clone::Clone;
use container::Container;
use iter::Times;
use iterator::{Iterator, IteratorUtil};
use iterator::{Iterator, IteratorUtil, range};
use option::{Some, None};
use ptr::RawPtr;
use rt::local::Local;
Expand Down Expand Up @@ -247,24 +246,32 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {

let main = Cell::new(main);

// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
// The shared list of sleeping schedulers.
let sleepers = SleeperList::new();
// The shared work queue. Temporary until work stealing is implemented.
let work_queue = WorkQueue::new();

// Create a work queue for each scheduler, ntimes. Create an extra
// for the main thread if that flag is set. We won't steal from it.
let mut work_queues = ~[];
for _ in range(0u, nscheds) {
let work_queue: WorkQueue<~Task> = WorkQueue::new();
work_queues.push(work_queue);
}

// The schedulers.
let mut scheds = ~[];
// Handles to the schedulers. When the main task ends these will be
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];

do nscheds.times {
for i in range(0u, nscheds) {
rtdebug!("inserting a regular scheduler");

// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let mut sched = ~Scheduler::new(loop_,
work_queues[i].clone(),
work_queues.clone(),
sleepers.clone());
let handle = sched.make_handle();

scheds.push(sched);
Expand All @@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);

// This scheduler needs a queue that isn't part of the stealee
// set.
let work_queue = WorkQueue::new();

let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue.clone(),
work_queue,
work_queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
Expand Down Expand Up @@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
rtdebug!("boostrapping main_task");
rtdebug!("bootstrapping main_task");

main_sched.bootstrap(main_task);
}
Expand Down
159 changes: 114 additions & 45 deletions src/libstd/rt/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use option::{Option, Some, None};
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;

use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool};
Expand All @@ -28,6 +27,9 @@ use rt::rtio::RemoteCallback;
use rt::metrics::SchedMetrics;
use borrow::{to_uint};
use cell::Cell;
use rand::{XorShiftRng, RngUtil};
use iterator::{range};
use vec::{OwnedVector};

/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
Expand All @@ -37,9 +39,11 @@ use cell::Cell;
/// XXX: This creates too many callbacks to run_sched_once, resulting
/// in too much allocation and too many events.
pub struct Scheduler {
/// A queue of available work. Under a work-stealing policy there
/// is one per Scheduler.
work_queue: WorkQueue<~Task>,
/// There are N work queues, one per scheduler.
priv work_queue: WorkQueue<~Task>,
/// Work queues for the other schedulers. These are created by
/// cloning the core work queues.
work_queues: ~[WorkQueue<~Task>],
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
Expand Down Expand Up @@ -70,7 +74,10 @@ pub struct Scheduler {
run_anything: bool,
/// If the scheduler shouldn't run some tasks, a friend to send
/// them to.
friend_handle: Option<SchedHandle>
friend_handle: Option<SchedHandle>,
/// A fast XorShift rng for scheduler use
rng: XorShiftRng

}

pub struct SchedHandle {
Expand All @@ -97,17 +104,21 @@ impl Scheduler {

pub fn new(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList)
-> Scheduler {

Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
Scheduler::new_special(event_loop, work_queue,
work_queues,
sleeper_list, true, None)

}

// When you create a scheduler it isn't yet "in" a task, so the
// task field is None.
pub fn new_special(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
Expand All @@ -120,12 +131,14 @@ impl Scheduler {
no_sleep: false,
event_loop: event_loop,
work_queue: work_queue,
work_queues: work_queues,
stack_pool: StackPool::new(),
sched_task: None,
cleanup_job: None,
metrics: SchedMetrics::new(),
run_anything: run_anything,
friend_handle: friend
friend_handle: friend,
rng: XorShiftRng::new()
}
}

Expand Down Expand Up @@ -248,7 +261,7 @@ impl Scheduler {

// Second activity is to try resuming a task from the queue.

let result = sched.resume_task_from_queue();
let result = sched.do_work();
let mut sched = match result {
Some(sched) => {
// Failed to dequeue a task, so we return.
Expand Down Expand Up @@ -415,47 +428,98 @@ impl Scheduler {
}
}

// Resume a task from the queue - but also take into account that
// it might not belong here.
// Workstealing: In this iteration of the runtime each scheduler
// thread has a distinct work queue. When no work is available
// locally, make a few attempts to steal work from the queues of
// other scheduler threads. If a few steals fail we end up in the
// old "no work" path which is fine.

// First step in the process is to find a task. This function does
// that by first checking the local queue, and if there is no work
// there, trying to steal from the remote work queues.
fn find_work(&mut self) -> Option<~Task> {
rtdebug!("scheduler looking for work");
match self.work_queue.pop() {
Some(task) => {
rtdebug!("found a task locally");
return Some(task)
}
None => {
// Our naive stealing, try kinda hard.
rtdebug!("scheduler trying to steal");
let _len = self.work_queues.len();
return self.try_steals(2);
}
}
}

// With no backoff try stealing n times from the queues the
// scheduler knows about. This naive implementation can steal from
// our own queue or from other special schedulers.
fn try_steals(&mut self, n: uint) -> Option<~Task> {
for _ in range(0, n) {
let index = self.rng.gen_uint_range(0, self.work_queues.len());
let work_queues = &mut self.work_queues;
match work_queues[index].steal() {
Some(task) => {
rtdebug!("found task by stealing"); return Some(task)
}
None => ()
}
};
rtdebug!("giving up on stealing");
return None;
}

// If we perform a scheduler action we give away the scheduler ~
// pointer, if it is still available we return it.
// Given a task, execute it correctly.
fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
let mut this = self;
let mut task = task;

fn resume_task_from_queue(~self) -> Option<~Scheduler> {
rtdebug!("processing a task");

let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
}
}
AnySched if this.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
}
}
}

// Bundle the helpers together.
fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;

match this.work_queue.pop() {
rtdebug!("scheduler calling do work");
match this.find_work() {
Some(task) => {
let mut task = task;
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
} else {
this.event_loop.callback(Scheduler::run_sched_once);
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
}
}
AnySched if this.run_anything => {
this.event_loop.callback(Scheduler::run_sched_once);
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
}
AnySched => {
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
}
}
rtdebug!("found some work! processing the task");
return this.process_task(task);
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
}
}
Expand Down Expand Up @@ -711,7 +775,6 @@ impl Scheduler {
GiveTask(task, f) => f.to_fn()(self, task)
}
}

}

// The cases for the below function.
Expand Down Expand Up @@ -745,6 +808,8 @@ impl ClosureConverter for UnsafeTaskReceiver {

#[cfg(test)]
mod test {
extern mod extra;

use prelude::*;
use rt::test::*;
use unstable::run_in_bare_thread;
Expand Down Expand Up @@ -862,12 +927,15 @@ mod test {
do run_in_bare_thread {

let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let normal_queue = WorkQueue::new();
let special_queue = WorkQueue::new();
let queues = ~[normal_queue.clone(), special_queue.clone()];

// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
~UvEventLoop::new(),
work_queue.clone(),
normal_queue,
queues.clone(),
sleepers.clone());

let normal_handle = Cell::new(normal_sched.make_handle());
Expand All @@ -877,7 +945,8 @@ mod test {
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
~UvEventLoop::new(),
work_queue.clone(),
special_queue.clone(),
queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
Expand Down
2 changes: 2 additions & 0 deletions src/libstd/rt/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod test {
fn select_stream() {
use util;
use comm::GenericChan;
use iter::Times;

// Sends 10 buffered packets, and uses select to retrieve them all.
// Puts the port in a different spot in the vector each time.
Expand Down Expand Up @@ -265,6 +266,7 @@ mod test {

fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
use rt::test::spawntask_random;
use iter::Times;

do run_in_newsched_task {
// A bit of stress, since ordinarily this is just smoke and mirrors.
Expand Down
Loading