@@ -26,7 +26,7 @@ use rt::local::Local;
2626use rt:: rtio:: { RemoteCallback , PausibleIdleCallback } ;
2727use borrow:: { to_uint} ;
2828use cell:: Cell ;
29- use rand:: { XorShiftRng , Rng } ;
29+ use rand:: { XorShiftRng , Rng , Rand } ;
3030use iter:: range;
3131use vec:: { OwnedVector } ;
3232
@@ -78,7 +78,14 @@ pub struct Scheduler {
7878 /// A fast XorShift rng for scheduler use
7979 rng : XorShiftRng ,
8080 /// A toggleable idle callback
81- idle_callback : Option < ~PausibleIdleCallback >
81+ idle_callback : Option < ~PausibleIdleCallback > ,
82+ /// A countdown that starts at a random value and is decremented
83+ /// every time a yield check is performed. When it hits 0 a task
84+ /// will yield.
85+ yield_check_count : uint ,
86+ /// A flag to tell the scheduler loop it needs to do some stealing
87+ /// in order to introduce randomness as part of a yield
88+ steal_for_yield : bool
8289}
8390
8491/// An indication of how hard to work on a given operation, the difference
@@ -89,6 +96,13 @@ enum EffortLevel {
8996 GiveItYourBest
9097}
9198
99+ static MAX_YIELD_CHECKS : uint = 200 ;
100+
101+ fn reset_yield_check ( rng : & mut XorShiftRng ) -> uint {
102+ let r: uint = Rand :: rand ( rng) ;
103+ r % MAX_YIELD_CHECKS + 1
104+ }
105+
92106impl Scheduler {
93107
94108 // * Initialization Functions
@@ -113,7 +127,7 @@ impl Scheduler {
113127 friend : Option < SchedHandle > )
114128 -> Scheduler {
115129
116- Scheduler {
130+ let mut sched = Scheduler {
117131 sleeper_list : sleeper_list,
118132 message_queue : MessageQueue :: new ( ) ,
119133 sleepy : false ,
@@ -127,8 +141,14 @@ impl Scheduler {
127141 run_anything : run_anything,
128142 friend_handle : friend,
129143 rng : XorShiftRng :: new ( ) ,
130- idle_callback : None
131- }
144+ idle_callback : None ,
145+ yield_check_count : 0 ,
146+ steal_for_yield : false
147+ } ;
148+
149+ sched. yield_check_count = reset_yield_check ( & mut sched. rng ) ;
150+
151+ return sched;
132152 }
133153
134154 // XXX: This may eventually need to be refactored so that
@@ -307,8 +327,7 @@ impl Scheduler {
307327 }
308328 Some ( TaskFromFriend ( task) ) => {
309329 rtdebug ! ( "got a task from a friend. lovely!" ) ;
310- this. process_task ( task,
311- Scheduler :: resume_task_immediately_cl) . map_move ( Local :: put) ;
330+ this. process_task ( task, Scheduler :: resume_task_immediately_cl) ;
312331 return None ;
313332 }
314333 Some ( Wake ) => {
@@ -352,8 +371,8 @@ impl Scheduler {
352371 match this. find_work ( ) {
353372 Some ( task) => {
354373 rtdebug ! ( "found some work! processing the task" ) ;
355- return this. process_task ( task,
356- Scheduler :: resume_task_immediately_cl ) ;
374+ this. process_task ( task, Scheduler :: resume_task_immediately_cl ) ;
375+ return None ;
357376 }
358377 None => {
359378 rtdebug ! ( "no work was found, returning the scheduler struct" ) ;
@@ -373,14 +392,35 @@ impl Scheduler {
373392 // there, trying to steal from the remote work queues.
374393 fn find_work ( & mut self ) -> Option < ~Task > {
375394 rtdebug ! ( "scheduler looking for work" ) ;
376- match self . work_queue . pop ( ) {
377- Some ( task) => {
378- rtdebug ! ( "found a task locally" ) ;
379- return Some ( task)
395+ if !self . steal_for_yield {
396+ match self . work_queue . pop ( ) {
397+ Some ( task) => {
398+ rtdebug ! ( "found a task locally" ) ;
399+ return Some ( task)
400+ }
401+ None => {
402+ rtdebug ! ( "scheduler trying to steal" ) ;
403+ return self . try_steals ( ) ;
404+ }
380405 }
381- None => {
382- rtdebug ! ( "scheduler trying to steal" ) ;
383- return self . try_steals ( ) ;
406+ } else {
407+ // During execution of the last task, it performed a 'yield',
408+ // so we're doing some work stealing in order to introduce some
409+ // scheduling randomness. Otherwise we would just end up popping
410+ // that same task again. This is pretty lame and is to work around
411+ // the problem that work stealing is not designed for 'non-strict'
412+ // (non-fork-join) task parallelism.
413+ self . steal_for_yield = false ;
414+ match self . try_steals ( ) {
415+ Some ( task) => {
416+ rtdebug ! ( "stole a task after yielding" ) ;
417+ return Some ( task) ;
418+ }
419+ None => {
420+ rtdebug ! ( "did not steal a task after yielding" ) ;
421+ // Back to business
422+ return self . find_work ( ) ;
423+ }
384424 }
385425 }
386426 }
@@ -409,7 +449,7 @@ impl Scheduler {
409449 // place.
410450
411451 fn process_task ( ~self , task : ~Task ,
412- schedule_fn : SchedulingFn ) -> Option < ~ Scheduler > {
452+ schedule_fn : SchedulingFn ) {
413453 let mut this = self ;
414454 let mut task = task;
415455
@@ -422,23 +462,23 @@ impl Scheduler {
422462 rtdebug ! ( "sending task home" ) ;
423463 task. give_home ( Sched ( home_handle) ) ;
424464 Scheduler :: send_task_home ( task) ;
425- return Some ( this) ;
465+ Local :: put ( this) ;
426466 } else {
427467 rtdebug ! ( "running task here" ) ;
428468 task. give_home ( Sched ( home_handle) ) ;
429- return schedule_fn ( this, task) ;
469+ schedule_fn ( this, task) ;
430470 }
431471 }
432472 AnySched if this. run_anything => {
433473 rtdebug ! ( "running anysched task here" ) ;
434474 task. give_home ( AnySched ) ;
435- return schedule_fn ( this, task) ;
475+ schedule_fn ( this, task) ;
436476 }
437477 AnySched => {
438478 rtdebug ! ( "sending task to friend" ) ;
439479 task. give_home ( AnySched ) ;
440480 this. send_to_friend ( task) ;
441- return Some ( this) ;
481+ Local :: put ( this) ;
442482 }
443483 }
444484 }
@@ -607,15 +647,14 @@ impl Scheduler {
607647
608648 // * Context Swapping Helpers - Here be ugliness!
609649
610- pub fn resume_task_immediately ( ~self , task : ~Task ) -> Option < ~ Scheduler > {
650+ pub fn resume_task_immediately ( ~self , task : ~Task ) {
611651 do self . change_task_context ( task) |sched, stask| {
612652 sched. sched_task = Some ( stask) ;
613653 }
614- return None ;
615654 }
616655
617656 fn resume_task_immediately_cl ( sched : ~Scheduler ,
618- task : ~Task ) -> Option < ~ Scheduler > {
657+ task : ~Task ) {
619658 sched. resume_task_immediately ( task)
620659 }
621660
@@ -662,11 +701,10 @@ impl Scheduler {
662701 }
663702 }
664703
665- fn switch_task ( sched : ~Scheduler , task : ~Task ) -> Option < ~ Scheduler > {
704+ fn switch_task ( sched : ~Scheduler , task : ~Task ) {
666705 do sched. switch_running_tasks_and_then ( task) |sched, last_task| {
667706 sched. enqueue_blocked_task ( last_task) ;
668707 } ;
669- return None ;
670708 }
671709
672710 // * Task Context Helpers
@@ -686,7 +724,7 @@ impl Scheduler {
686724
687725 pub fn run_task ( task : ~Task ) {
688726 let sched: ~Scheduler = Local :: take ( ) ;
689- sched. process_task ( task, Scheduler :: switch_task) . map_move ( Local :: put ) ;
727+ sched. process_task ( task, Scheduler :: switch_task) ;
690728 }
691729
692730 pub fn run_task_later ( next_task : ~Task ) {
@@ -696,6 +734,33 @@ impl Scheduler {
696734 } ;
697735 }
698736
737+ /// Yield control to the scheduler, executing another task. This is guaranteed
738+ /// to introduce some amount of randomness to the scheduler. Currently the
739+ /// randomness is a result of performing a round of work stealing (which
740+ /// may end up stealing from the current scheduler).
741+ pub fn yield_now ( ~self ) {
742+ let mut this = self ;
743+ this. yield_check_count = reset_yield_check ( & mut this. rng ) ;
744+ // Tell the scheduler to start stealing on the next iteration
745+ this. steal_for_yield = true ;
746+ do this. deschedule_running_task_and_then |sched, task| {
747+ sched. enqueue_blocked_task ( task) ;
748+ }
749+ }
750+
751+ pub fn maybe_yield ( ~self ) {
752+ // The number of times to do the yield check before yielding, chosen arbitrarily.
753+ let mut this = self ;
754+ rtassert ! ( this. yield_check_count > 0 ) ;
755+ this. yield_check_count -= 1 ;
756+ if this. yield_check_count == 0 {
757+ this. yield_now ( ) ;
758+ } else {
759+ Local :: put ( this) ;
760+ }
761+ }
762+
763+
699764 // * Utility Functions
700765
701766 pub fn sched_id ( & self ) -> uint { to_uint ( self ) }
@@ -718,7 +783,7 @@ impl Scheduler {
718783
719784// Supporting types
720785
721- type SchedulingFn = ~fn ( ~Scheduler , ~Task ) -> Option < ~ Scheduler > ;
786+ type SchedulingFn = ~fn ( ~Scheduler , ~Task ) ;
722787
723788pub enum SchedMessage {
724789 Wake ,
@@ -1231,4 +1296,40 @@ mod test {
12311296 }
12321297 }
12331298 }
1299+
1300+ #[ test]
1301+ fn dont_starve_2 ( ) {
1302+ use rt:: comm:: oneshot;
1303+
1304+ do stress_factor ( ) . times {
1305+ do run_in_newsched_task {
1306+ let ( port, chan) = oneshot ( ) ;
1307+ let ( _port2, chan2) = stream ( ) ;
1308+
1309+ // This task should not be able to starve the other task.
1310+ // The sends should eventually yield.
1311+ do spawntask {
1312+ while !port. peek ( ) {
1313+ chan2. send ( ( ) ) ;
1314+ }
1315+ }
1316+
1317+ chan. send ( ( ) ) ;
1318+ }
1319+ }
1320+ }
1321+
1322+ // Regression test for a logic bug that would cause single-threaded schedulers
1323+ // to sleep forever after yielding and stealing another task.
1324+ #[ test]
1325+ fn single_threaded_yield ( ) {
1326+ use task:: { spawn, spawn_sched, SingleThreaded , deschedule} ;
1327+ use num:: Times ;
1328+
1329+ do spawn_sched ( SingleThreaded ) {
1330+ do 5 . times { deschedule ( ) ; }
1331+ }
1332+ do spawn { }
1333+ do spawn { }
1334+ }
12341335}
0 commit comments