Skip to content

Commit f1c1f92

Browse files
committed
auto merge of #8221 : brson/rust/single-threaded, r=graydon
This is the last major runtime feature needed for the transition to the new scheduler.
2 parents fc57182 + c3fa411 commit f1c1f92

File tree

3 files changed

+107
-16
lines changed

3 files changed

+107
-16
lines changed

src/libstd/rt/sched.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ use cell::Cell;
3939
pub struct Scheduler {
4040
/// A queue of available work. Under a work-stealing policy there
4141
/// is one per Scheduler.
42-
priv work_queue: WorkQueue<~Task>,
42+
work_queue: WorkQueue<~Task>,
4343
/// The queue of incoming messages from other schedulers.
4444
/// These are enqueued by SchedHandles after which a remote callback
4545
/// is triggered to handle the message.
4646
priv message_queue: MessageQueue<SchedMessage>,
4747
/// A shared list of sleeping schedulers. We'll use this to wake
4848
/// up schedulers when pushing work onto the work queue.
49-
priv sleeper_list: SleeperList,
49+
sleeper_list: SleeperList,
5050
/// Indicates that we have previously pushed a handle onto the
5151
/// SleeperList but have not yet received the Wake message.
5252
/// Being `true` does not necessarily mean that the scheduler is
@@ -158,6 +158,9 @@ impl Scheduler {
158158
// scheduler. Grab it out of TLS - performing the scheduler
159159
// action will have given it away.
160160
let sched = Local::take::<Scheduler>();
161+
162+
rtdebug!("starting scheduler %u", sched.sched_id());
163+
161164
sched.run();
162165

163166
// Now that we are done with the scheduler, clean up the
@@ -166,6 +169,9 @@ impl Scheduler {
166169
// task.run() on the scheduler task we never get through all
167170
// the cleanup code it runs.
168171
let mut stask = Local::take::<Task>();
172+
173+
rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());
174+
169175
stask.destroyed = true;
170176
}
171177

@@ -484,7 +490,7 @@ impl Scheduler {
484490
return None;
485491
} else if !homed && !this.run_anything {
486492
// the task isn't homed, but it can't be run here
487-
this.enqueue_task(task);
493+
this.send_to_friend(task);
488494
return Some(this);
489495
} else {
490496
// task isn't home, so don't run it here, send it home

src/libstd/task/mod.rs

+18-5
Original file line numberDiff line numberDiff line change
@@ -971,16 +971,29 @@ fn test_try_fail() {
971971
}
972972
}
973973

974+
#[cfg(test)]
975+
fn get_sched_id() -> int {
976+
if context() == OldTaskContext {
977+
unsafe {
978+
rt::rust_get_sched_id() as int
979+
}
980+
} else {
981+
do Local::borrow::<::rt::sched::Scheduler, int> |sched| {
982+
sched.sched_id() as int
983+
}
984+
}
985+
}
986+
974987
#[test]
975988
fn test_spawn_sched() {
976989
let (po, ch) = stream::<()>();
977990
let ch = SharedChan::new(ch);
978991

979992
fn f(i: int, ch: SharedChan<()>) {
980-
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
993+
let parent_sched_id = get_sched_id();
981994

982995
do spawn_sched(SingleThreaded) {
983-
let child_sched_id = unsafe { rt::rust_get_sched_id() };
996+
let child_sched_id = get_sched_id();
984997
assert!(parent_sched_id != child_sched_id);
985998

986999
if (i == 0) {
@@ -1000,15 +1013,15 @@ fn test_spawn_sched_childs_on_default_sched() {
10001013
let (po, ch) = stream();
10011014

10021015
// Assuming tests run on the default scheduler
1003-
let default_id = unsafe { rt::rust_get_sched_id() };
1016+
let default_id = get_sched_id();
10041017

10051018
let ch = Cell::new(ch);
10061019
do spawn_sched(SingleThreaded) {
1007-
let parent_sched_id = unsafe { rt::rust_get_sched_id() };
1020+
let parent_sched_id = get_sched_id();
10081021
let ch = Cell::new(ch.take());
10091022
do spawn {
10101023
let ch = ch.take();
1011-
let child_sched_id = unsafe { rt::rust_get_sched_id() };
1024+
let child_sched_id = get_sched_id();
10121025
assert!(parent_sched_id != child_sched_id);
10131026
assert_eq!(child_sched_id, default_id);
10141027
ch.send(());

src/libstd/task/spawn.rs

+80-8
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ use cast::transmute;
7878
use cast;
7979
use cell::Cell;
8080
use container::MutableMap;
81-
use comm::{Chan, GenericChan};
81+
use comm::{Chan, GenericChan, oneshot};
8282
use hashmap::{HashSet, HashSetConsumeIterator};
8383
use local_data;
8484
use task::local_data_priv::{local_get, local_set, OldHandle};
8585
use task::rt::rust_task;
8686
use task::rt;
87-
use task::{Failure};
87+
use task::{Failure, SingleThreaded};
8888
use task::{Success, TaskOpts, TaskResult};
8989
use task::unkillable;
9090
use to_bytes::IterBytes;
@@ -93,9 +93,11 @@ use util;
9393
use unstable::sync::Exclusive;
9494
use rt::{OldTaskContext, TaskContext, SchedulerContext, GlobalContext, context};
9595
use rt::local::Local;
96-
use rt::task::Task;
96+
use rt::task::{Task, Sched};
9797
use rt::kill::KillHandle;
9898
use rt::sched::Scheduler;
99+
use rt::uv::uvio::UvEventLoop;
100+
use rt::thread::Thread;
99101

100102
#[cfg(test)] use task::default_task_opts;
101103
#[cfg(test)] use comm;
@@ -694,11 +696,81 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
694696
}
695697
};
696698

697-
let mut task = if opts.watched {
698-
Task::build_child(child_wrapper)
699-
} else {
700-
// An unwatched task is a new root in the exit-code propagation tree
701-
Task::build_root(child_wrapper)
699+
let mut task = unsafe {
700+
if opts.sched.mode != SingleThreaded {
701+
if opts.watched {
702+
Task::build_child(child_wrapper)
703+
} else {
704+
Task::build_root(child_wrapper)
705+
}
706+
} else {
707+
// Creating a 1:1 task:thread ...
708+
let sched = Local::unsafe_borrow::<Scheduler>();
709+
let sched_handle = (*sched).make_handle();
710+
711+
// Create a new scheduler to hold the new task
712+
let new_loop = ~UvEventLoop::new();
713+
let mut new_sched = ~Scheduler::new_special(new_loop,
714+
(*sched).work_queue.clone(),
715+
(*sched).sleeper_list.clone(),
716+
false,
717+
Some(sched_handle));
718+
let mut new_sched_handle = new_sched.make_handle();
719+
720+
// Allow the scheduler to exit when the pinned task exits
721+
new_sched_handle.send(Shutdown);
722+
723+
// Pin the new task to the new scheduler
724+
let new_task = if opts.watched {
725+
Task::build_homed_child(child_wrapper, Sched(new_sched_handle))
726+
} else {
727+
Task::build_homed_root(child_wrapper, Sched(new_sched_handle))
728+
};
729+
730+
// Create a task that will later be used to join with the new scheduler
731+
// thread when it is ready to terminate
732+
let (thread_port, thread_chan) = oneshot();
733+
let thread_port_cell = Cell::new(thread_port);
734+
let join_task = do Task::build_child() {
735+
rtdebug!("running join task");
736+
let thread_port = thread_port_cell.take();
737+
let thread: Thread = thread_port.recv();
738+
thread.join();
739+
};
740+
741+
// Put the scheduler into another thread
742+
let new_sched_cell = Cell::new(new_sched);
743+
let orig_sched_handle_cell = Cell::new((*sched).make_handle());
744+
let join_task_cell = Cell::new(join_task);
745+
746+
let thread = do Thread::start {
747+
let mut new_sched = new_sched_cell.take();
748+
let mut orig_sched_handle = orig_sched_handle_cell.take();
749+
let join_task = join_task_cell.take();
750+
751+
let bootstrap_task = ~do Task::new_root(&mut new_sched.stack_pool) || {
752+
rtdebug!("boostraping a 1:1 scheduler");
753+
};
754+
new_sched.bootstrap(bootstrap_task);
755+
756+
rtdebug!("enqueing join_task");
757+
// Now tell the original scheduler to join with this thread
758+
// by scheduling a thread-joining task on the original scheduler
759+
orig_sched_handle.send(TaskFromFriend(join_task));
760+
761+
// NB: We can't simply send a message from here to another task
762+
// because this code isn't running in a task and message passing doesn't
763+
// work outside of tasks. Hence we're sending a scheduler message
764+
// to execute a new task directly to a scheduler.
765+
};
766+
767+
// Give the thread handle to the join task
768+
thread_chan.send(thread);
769+
770+
// When this task is enqueued on the current scheduler it will then get
771+
// forwarded to the scheduler to which it is pinned
772+
new_task
773+
}
702774
};
703775

704776
if opts.notify_chan.is_some() {

0 commit comments

Comments
 (0)