Skip to content

Commit c486634

Browse files
committed
auto merge of #9507 : brson/rust/sched, r=alexcrichton
This also includes a fix for yielding from single-threaded schedulers where the scheduler would stop working before its work queue was empty. Fixes the deadlocks that this patch had previously.
2 parents 6f991a2 + d209717 commit c486634

File tree

3 files changed

+141
-32
lines changed

3 files changed

+141
-32
lines changed

src/libstd/rt/comm.rs

+11
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ impl<T> ChanOne<T> {
118118
rtassert!(!rt::in_sched_context());
119119
}
120120

121+
// In order to prevent starvation of other tasks in situations
122+
// where a task sends repeatedly without ever receiving, we
123+
// occassionally yield instead of doing a send immediately.
124+
// Only doing this if we're doing a rescheduling send,
125+
// otherwise the caller is expecting not to context switch.
126+
if do_resched {
127+
// XXX: This TLS hit should be combined with other uses of the scheduler below
128+
let sched: ~Scheduler = Local::take();
129+
sched.maybe_yield();
130+
}
131+
121132
let mut this = self;
122133
let mut recvr_active = true;
123134
let packet = this.packet();

src/libstd/rt/sched.rs

+129-28
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use rt::local::Local;
2626
use rt::rtio::{RemoteCallback, PausibleIdleCallback};
2727
use borrow::{to_uint};
2828
use cell::Cell;
29-
use rand::{XorShiftRng, Rng};
29+
use rand::{XorShiftRng, Rng, Rand};
3030
use iter::range;
3131
use vec::{OwnedVector};
3232

@@ -78,7 +78,14 @@ pub struct Scheduler {
7878
/// A fast XorShift rng for scheduler use
7979
rng: XorShiftRng,
8080
/// A toggleable idle callback
81-
idle_callback: Option<~PausibleIdleCallback>
81+
idle_callback: Option<~PausibleIdleCallback>,
82+
/// A countdown that starts at a random value and is decremented
83+
/// every time a yield check is performed. When it hits 0 a task
84+
/// will yield.
85+
yield_check_count: uint,
86+
/// A flag to tell the scheduler loop it needs to do some stealing
87+
/// in order to introduce randomness as part of a yield
88+
steal_for_yield: bool
8289
}
8390

8491
/// An indication of how hard to work on a given operation, the difference
@@ -89,6 +96,13 @@ enum EffortLevel {
8996
GiveItYourBest
9097
}
9198

99+
static MAX_YIELD_CHECKS: uint = 200;
100+
101+
fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
102+
let r: uint = Rand::rand(rng);
103+
r % MAX_YIELD_CHECKS + 1
104+
}
105+
92106
impl Scheduler {
93107

94108
// * Initialization Functions
@@ -113,7 +127,7 @@ impl Scheduler {
113127
friend: Option<SchedHandle>)
114128
-> Scheduler {
115129

116-
Scheduler {
130+
let mut sched = Scheduler {
117131
sleeper_list: sleeper_list,
118132
message_queue: MessageQueue::new(),
119133
sleepy: false,
@@ -127,8 +141,14 @@ impl Scheduler {
127141
run_anything: run_anything,
128142
friend_handle: friend,
129143
rng: XorShiftRng::new(),
130-
idle_callback: None
131-
}
144+
idle_callback: None,
145+
yield_check_count: 0,
146+
steal_for_yield: false
147+
};
148+
149+
sched.yield_check_count = reset_yield_check(&mut sched.rng);
150+
151+
return sched;
132152
}
133153

134154
// XXX: This may eventually need to be refactored so that
@@ -307,8 +327,7 @@ impl Scheduler {
307327
}
308328
Some(TaskFromFriend(task)) => {
309329
rtdebug!("got a task from a friend. lovely!");
310-
this.process_task(task,
311-
Scheduler::resume_task_immediately_cl).map_move(Local::put);
330+
this.process_task(task, Scheduler::resume_task_immediately_cl);
312331
return None;
313332
}
314333
Some(Wake) => {
@@ -352,8 +371,8 @@ impl Scheduler {
352371
match this.find_work() {
353372
Some(task) => {
354373
rtdebug!("found some work! processing the task");
355-
return this.process_task(task,
356-
Scheduler::resume_task_immediately_cl);
374+
this.process_task(task, Scheduler::resume_task_immediately_cl);
375+
return None;
357376
}
358377
None => {
359378
rtdebug!("no work was found, returning the scheduler struct");
@@ -373,14 +392,35 @@ impl Scheduler {
373392
// there, trying to steal from the remote work queues.
374393
fn find_work(&mut self) -> Option<~Task> {
375394
rtdebug!("scheduler looking for work");
376-
match self.work_queue.pop() {
377-
Some(task) => {
378-
rtdebug!("found a task locally");
379-
return Some(task)
395+
if !self.steal_for_yield {
396+
match self.work_queue.pop() {
397+
Some(task) => {
398+
rtdebug!("found a task locally");
399+
return Some(task)
400+
}
401+
None => {
402+
rtdebug!("scheduler trying to steal");
403+
return self.try_steals();
404+
}
380405
}
381-
None => {
382-
rtdebug!("scheduler trying to steal");
383-
return self.try_steals();
406+
} else {
407+
// During execution of the last task, it performed a 'yield',
408+
// so we're doing some work stealing in order to introduce some
409+
// scheduling randomness. Otherwise we would just end up popping
410+
// that same task again. This is pretty lame and is to work around
411+
// the problem that work stealing is not designed for 'non-strict'
412+
// (non-fork-join) task parallelism.
413+
self.steal_for_yield = false;
414+
match self.try_steals() {
415+
Some(task) => {
416+
rtdebug!("stole a task after yielding");
417+
return Some(task);
418+
}
419+
None => {
420+
rtdebug!("did not steal a task after yielding");
421+
// Back to business
422+
return self.find_work();
423+
}
384424
}
385425
}
386426
}
@@ -409,7 +449,7 @@ impl Scheduler {
409449
// place.
410450

411451
fn process_task(~self, task: ~Task,
412-
schedule_fn: SchedulingFn) -> Option<~Scheduler> {
452+
schedule_fn: SchedulingFn) {
413453
let mut this = self;
414454
let mut task = task;
415455

@@ -422,23 +462,23 @@ impl Scheduler {
422462
rtdebug!("sending task home");
423463
task.give_home(Sched(home_handle));
424464
Scheduler::send_task_home(task);
425-
return Some(this);
465+
Local::put(this);
426466
} else {
427467
rtdebug!("running task here");
428468
task.give_home(Sched(home_handle));
429-
return schedule_fn(this, task);
469+
schedule_fn(this, task);
430470
}
431471
}
432472
AnySched if this.run_anything => {
433473
rtdebug!("running anysched task here");
434474
task.give_home(AnySched);
435-
return schedule_fn(this, task);
475+
schedule_fn(this, task);
436476
}
437477
AnySched => {
438478
rtdebug!("sending task to friend");
439479
task.give_home(AnySched);
440480
this.send_to_friend(task);
441-
return Some(this);
481+
Local::put(this);
442482
}
443483
}
444484
}
@@ -607,15 +647,14 @@ impl Scheduler {
607647

608648
// * Context Swapping Helpers - Here be ugliness!
609649

610-
pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
650+
pub fn resume_task_immediately(~self, task: ~Task) {
611651
do self.change_task_context(task) |sched, stask| {
612652
sched.sched_task = Some(stask);
613653
}
614-
return None;
615654
}
616655

617656
fn resume_task_immediately_cl(sched: ~Scheduler,
618-
task: ~Task) -> Option<~Scheduler> {
657+
task: ~Task) {
619658
sched.resume_task_immediately(task)
620659
}
621660

@@ -662,11 +701,10 @@ impl Scheduler {
662701
}
663702
}
664703

665-
fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
704+
fn switch_task(sched: ~Scheduler, task: ~Task) {
666705
do sched.switch_running_tasks_and_then(task) |sched, last_task| {
667706
sched.enqueue_blocked_task(last_task);
668707
};
669-
return None;
670708
}
671709

672710
// * Task Context Helpers
@@ -686,7 +724,7 @@ impl Scheduler {
686724

687725
pub fn run_task(task: ~Task) {
688726
let sched: ~Scheduler = Local::take();
689-
sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
727+
sched.process_task(task, Scheduler::switch_task);
690728
}
691729

692730
pub fn run_task_later(next_task: ~Task) {
@@ -696,6 +734,33 @@ impl Scheduler {
696734
};
697735
}
698736

737+
/// Yield control to the scheduler, executing another task. This is guaranteed
738+
/// to introduce some amount of randomness to the scheduler. Currently the
739+
/// randomness is a result of performing a round of work stealing (which
740+
/// may end up stealing from the current scheduler).
741+
pub fn yield_now(~self) {
742+
let mut this = self;
743+
this.yield_check_count = reset_yield_check(&mut this.rng);
744+
// Tell the scheduler to start stealing on the next iteration
745+
this.steal_for_yield = true;
746+
do this.deschedule_running_task_and_then |sched, task| {
747+
sched.enqueue_blocked_task(task);
748+
}
749+
}
750+
751+
pub fn maybe_yield(~self) {
752+
// The number of times to do the yield check before yielding, chosen arbitrarily.
753+
let mut this = self;
754+
rtassert!(this.yield_check_count > 0);
755+
this.yield_check_count -= 1;
756+
if this.yield_check_count == 0 {
757+
this.yield_now();
758+
} else {
759+
Local::put(this);
760+
}
761+
}
762+
763+
699764
// * Utility Functions
700765

701766
pub fn sched_id(&self) -> uint { to_uint(self) }
@@ -718,7 +783,7 @@ impl Scheduler {
718783

719784
// Supporting types
720785

721-
type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
786+
type SchedulingFn = ~fn(~Scheduler, ~Task);
722787

723788
pub enum SchedMessage {
724789
Wake,
@@ -1231,4 +1296,40 @@ mod test {
12311296
}
12321297
}
12331298
}
1299+
1300+
#[test]
1301+
fn dont_starve_2() {
1302+
use rt::comm::oneshot;
1303+
1304+
do stress_factor().times {
1305+
do run_in_newsched_task {
1306+
let (port, chan) = oneshot();
1307+
let (_port2, chan2) = stream();
1308+
1309+
// This task should not be able to starve the other task.
1310+
// The sends should eventually yield.
1311+
do spawntask {
1312+
while !port.peek() {
1313+
chan2.send(());
1314+
}
1315+
}
1316+
1317+
chan.send(());
1318+
}
1319+
}
1320+
}
1321+
1322+
// Regression test for a logic bug that would cause single-threaded schedulers
1323+
// to sleep forever after yielding and stealing another task.
1324+
#[test]
1325+
fn single_threaded_yield() {
1326+
use task::{spawn, spawn_sched, SingleThreaded, deschedule};
1327+
use num::Times;
1328+
1329+
do spawn_sched(SingleThreaded) {
1330+
do 5.times { deschedule(); }
1331+
}
1332+
do spawn { }
1333+
do spawn { }
1334+
}
12341335
}

src/libstd/task/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -542,12 +542,9 @@ pub fn deschedule() {
542542
use rt::local::Local;
543543
use rt::sched::Scheduler;
544544

545-
// FIXME #6842: What does yield really mean in newsched?
546545
// FIXME(#7544): Optimize this, since we know we won't block.
547546
let sched: ~Scheduler = Local::take();
548-
do sched.deschedule_running_task_and_then |sched, task| {
549-
sched.enqueue_blocked_task(task);
550-
}
547+
sched.yield_now();
551548
}
552549

553550
pub fn failing() -> bool {

0 commit comments

Comments
 (0)