Skip to content

std::rt: Implement task yielding. Fix a starvation problem #9507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 26, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ impl<T> ChanOne<T> {
rtassert!(!rt::in_sched_context());
}

// In order to prevent starvation of other tasks in situations
// where a task sends repeatedly without ever receiving, we
// occassionally yield instead of doing a send immediately.
// Only doing this if we're doing a rescheduling send,
// otherwise the caller is expecting not to context switch.
if do_resched {
// XXX: This TLS hit should be combined with other uses of the scheduler below
let sched: ~Scheduler = Local::take();
sched.maybe_yield();
}

let mut this = self;
let mut recvr_active = true;
let packet = this.packet();
Expand Down
157 changes: 129 additions & 28 deletions src/libstd/rt/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rt::local::Local;
use rt::rtio::{RemoteCallback, PausibleIdleCallback};
use borrow::{to_uint};
use cell::Cell;
use rand::{XorShiftRng, Rng};
use rand::{XorShiftRng, Rng, Rand};
use iter::range;
use vec::{OwnedVector};

Expand Down Expand Up @@ -78,7 +78,14 @@ pub struct Scheduler {
/// A fast XorShift rng for scheduler use
rng: XorShiftRng,
/// A toggleable idle callback
idle_callback: Option<~PausibleIdleCallback>
idle_callback: Option<~PausibleIdleCallback>,
/// A countdown that starts at a random value and is decremented
/// every time a yield check is performed. When it hits 0 a task
/// will yield.
yield_check_count: uint,
/// A flag to tell the scheduler loop it needs to do some stealing
/// in order to introduce randomness as part of a yield
steal_for_yield: bool
}

/// An indication of how hard to work on a given operation, the difference
Expand All @@ -89,6 +96,13 @@ enum EffortLevel {
GiveItYourBest
}

static MAX_YIELD_CHECKS: uint = 200;

fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
let r: uint = Rand::rand(rng);
r % MAX_YIELD_CHECKS + 1
}

impl Scheduler {

// * Initialization Functions
Expand All @@ -113,7 +127,7 @@ impl Scheduler {
friend: Option<SchedHandle>)
-> Scheduler {

Scheduler {
let mut sched = Scheduler {
sleeper_list: sleeper_list,
message_queue: MessageQueue::new(),
sleepy: false,
Expand All @@ -127,8 +141,14 @@ impl Scheduler {
run_anything: run_anything,
friend_handle: friend,
rng: XorShiftRng::new(),
idle_callback: None
}
idle_callback: None,
yield_check_count: 0,
steal_for_yield: false
};

sched.yield_check_count = reset_yield_check(&mut sched.rng);

return sched;
}

// XXX: This may eventually need to be refactored so that
Expand Down Expand Up @@ -307,8 +327,7 @@ impl Scheduler {
}
Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!");
this.process_task(task,
Scheduler::resume_task_immediately_cl).map_move(Local::put);
this.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
Some(Wake) => {
Expand Down Expand Up @@ -352,8 +371,8 @@ impl Scheduler {
match this.find_work() {
Some(task) => {
rtdebug!("found some work! processing the task");
return this.process_task(task,
Scheduler::resume_task_immediately_cl);
this.process_task(task, Scheduler::resume_task_immediately_cl);
return None;
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
Expand All @@ -373,14 +392,35 @@ impl Scheduler {
// there, trying to steal from the remote work queues.
fn find_work(&mut self) -> Option<~Task> {
rtdebug!("scheduler looking for work");
match self.work_queue.pop() {
Some(task) => {
rtdebug!("found a task locally");
return Some(task)
if !self.steal_for_yield {
match self.work_queue.pop() {
Some(task) => {
rtdebug!("found a task locally");
return Some(task)
}
None => {
rtdebug!("scheduler trying to steal");
return self.try_steals();
}
}
None => {
rtdebug!("scheduler trying to steal");
return self.try_steals();
} else {
// During execution of the last task, it performed a 'yield',
// so we're doing some work stealing in order to introduce some
// scheduling randomness. Otherwise we would just end up popping
// that same task again. This is pretty lame and is to work around
// the problem that work stealing is not designed for 'non-strict'
// (non-fork-join) task parallelism.
self.steal_for_yield = false;
match self.try_steals() {
Some(task) => {
rtdebug!("stole a task after yielding");
return Some(task);
}
None => {
rtdebug!("did not steal a task after yielding");
// Back to business
return self.find_work();
}
}
}
}
Expand Down Expand Up @@ -409,7 +449,7 @@ impl Scheduler {
// place.

fn process_task(~self, task: ~Task,
schedule_fn: SchedulingFn) -> Option<~Scheduler> {
schedule_fn: SchedulingFn) {
let mut this = self;
let mut task = task;

Expand All @@ -422,23 +462,23 @@ impl Scheduler {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
Local::put(this);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
return schedule_fn(this, task);
schedule_fn(this, task);
}
}
AnySched if this.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
return schedule_fn(this, task);
schedule_fn(this, task);
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
Local::put(this);
}
}
}
Expand Down Expand Up @@ -607,15 +647,14 @@ impl Scheduler {

// * Context Swapping Helpers - Here be ugliness!

pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
pub fn resume_task_immediately(~self, task: ~Task) {
do self.change_task_context(task) |sched, stask| {
sched.sched_task = Some(stask);
}
return None;
}

fn resume_task_immediately_cl(sched: ~Scheduler,
task: ~Task) -> Option<~Scheduler> {
task: ~Task) {
sched.resume_task_immediately(task)
}

Expand Down Expand Up @@ -662,11 +701,10 @@ impl Scheduler {
}
}

fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
fn switch_task(sched: ~Scheduler, task: ~Task) {
do sched.switch_running_tasks_and_then(task) |sched, last_task| {
sched.enqueue_blocked_task(last_task);
};
return None;
}

// * Task Context Helpers
Expand All @@ -686,7 +724,7 @@ impl Scheduler {

pub fn run_task(task: ~Task) {
let sched: ~Scheduler = Local::take();
sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
sched.process_task(task, Scheduler::switch_task);
}

pub fn run_task_later(next_task: ~Task) {
Expand All @@ -696,6 +734,33 @@ impl Scheduler {
};
}

/// Yield control to the scheduler, executing another task. This is guaranteed
/// to introduce some amount of randomness to the scheduler. Currently the
/// randomness is a result of performing a round of work stealing (which
/// may end up stealing from the current scheduler).
pub fn yield_now(~self) {
let mut this = self;
this.yield_check_count = reset_yield_check(&mut this.rng);
// Tell the scheduler to start stealing on the next iteration
this.steal_for_yield = true;
do this.deschedule_running_task_and_then |sched, task| {
sched.enqueue_blocked_task(task);
}
}

pub fn maybe_yield(~self) {
// The number of times to do the yield check before yielding, chosen arbitrarily.
let mut this = self;
rtassert!(this.yield_check_count > 0);
this.yield_check_count -= 1;
if this.yield_check_count == 0 {
this.yield_now();
} else {
Local::put(this);
}
}


// * Utility Functions

pub fn sched_id(&self) -> uint { to_uint(self) }
Expand All @@ -718,7 +783,7 @@ impl Scheduler {

// Supporting types

type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
type SchedulingFn = ~fn(~Scheduler, ~Task);

pub enum SchedMessage {
Wake,
Expand Down Expand Up @@ -1231,4 +1296,40 @@ mod test {
}
}
}

#[test]
fn dont_starve_2() {
use rt::comm::oneshot;

do stress_factor().times {
do run_in_newsched_task {
let (port, chan) = oneshot();
let (_port2, chan2) = stream();

// This task should not be able to starve the other task.
// The sends should eventually yield.
do spawntask {
while !port.peek() {
chan2.send(());
}
}

chan.send(());
}
}
}

// Regression test for a logic bug that would cause single-threaded schedulers
// to sleep forever after yielding and stealing another task.
#[test]
fn single_threaded_yield() {
use task::{spawn, spawn_sched, SingleThreaded, deschedule};
use num::Times;

do spawn_sched(SingleThreaded) {
do 5.times { deschedule(); }
}
do spawn { }
do spawn { }
}
}
5 changes: 1 addition & 4 deletions src/libstd/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,9 @@ pub fn deschedule() {
use rt::local::Local;
use rt::sched::Scheduler;

// FIXME #6842: What does yield really mean in newsched?
// FIXME(#7544): Optimize this, since we know we won't block.
let sched: ~Scheduler = Local::take();
do sched.deschedule_running_task_and_then |sched, task| {
sched.enqueue_blocked_task(task);
}
sched.yield_now();
}

pub fn failing() -> bool {
Expand Down