From c524e8f0d12ca08fae220ebfc1d9360ea395b08e Mon Sep 17 00:00:00 2001 From: toddaaro Date: Fri, 19 Jul 2013 14:25:05 -0700 Subject: [PATCH 1/2] A major refactoring that changes the way the runtime uses TLS. In the old design the TLS held the scheduler struct, and the scheduler struct held the active task. This posed all sorts of weird problems due to how we wanted to use the contents of TLS. The cleaner approach is to leave the active task in TLS and have the task hold the scheduler. To make this work out the scheduler has to run inside a regular task, and then once that is the case the context switching code is massively simplified, as instead of three possible paths there is only one. The logical flow is also easier to follow, as the scheduler struct acts somewhat like a "token" indicating what is active. These changes also necessitated changing a large number of runtime tests, and rewriting most of the runtime testing helpers. Polish level is "low", as I will very soon start on more scheduler changes that will require wiping the polish off. That being said there should be sufficient comments around anything complex to make this entirely respectable as a standalone commit. --- src/libstd/macros.rs | 9 +- src/libstd/rt/comm.rs | 5 +- src/libstd/rt/context.rs | 6 +- src/libstd/rt/io/net/tcp.rs | 32 +- src/libstd/rt/join_latch.rs | 10 +- src/libstd/rt/local.rs | 131 +++-- src/libstd/rt/mod.rs | 59 +- src/libstd/rt/sched.rs | 1019 ++++++++++++++++------------------- src/libstd/rt/task.rs | 205 ++++++- src/libstd/rt/test.rs | 238 +++----- src/libstd/rt/tube.rs | 5 - src/libstd/rt/uv/mod.rs | 5 +- src/libstd/rt/uv/uvio.rs | 23 +- src/libstd/task/mod.rs | 1 - src/libstd/task/spawn.rs | 48 +- src/libstd/unstable/lang.rs | 3 - 16 files changed, 893 insertions(+), 906 deletions(-) diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index 7748c43efcd28..04058887970d0 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -23,9 +23,14 @@ macro_rules! rtdebug_ ( } ) ) -// An alternate version with no output, for turning off logging +// An alternate version with no output, for turning off logging. An +// earlier attempt that did not call the fmt! macro was insufficient, +// as a case of the "let bind each variable" approach eventually +// failed without an error message describing the invocation site. macro_rules! rtdebug ( - ($( $arg:expr),+) => ( $(let _ = $arg)*; ) + ($( $arg:expr),+) => ( { + let _x = fmt!( $($arg),+ ); + }) ) macro_rules! rtassert ( diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index fba6171129762..7ce3b13f01587 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -639,7 +639,7 @@ mod test { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); - do spawntask_immediately { + do spawntask { assert!(port_cell.take().recv() == ~10); } @@ -653,7 +653,7 @@ mod test { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); let chan_cell = Cell::new(chan); - do spawntask_later { + do spawntask { let _cell = chan_cell.take(); } let res = do spawntask_try { @@ -908,5 +908,4 @@ mod test { } } } - } diff --git a/src/libstd/rt/context.rs b/src/libstd/rt/context.rs index 09ba869549fd0..83f9342067386 100644 --- a/src/libstd/rt/context.rs +++ b/src/libstd/rt/context.rs @@ -49,12 +49,11 @@ impl Context { let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) }; let sp: *uint = stack.end(); let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; - // Save and then immediately load the current context, // which we will then modify to call the given function when restored let mut regs = new_regs(); unsafe { - swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)) + swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)); }; initialize_call_frame(&mut *regs, fp, argp, sp); @@ -72,13 +71,14 @@ impl Context { then loading the registers from a previously saved Context. */ pub fn swap(out_context: &mut Context, in_context: &Context) { + rtdebug!("swapping contexts"); let out_regs: &mut Registers = match out_context { &Context { regs: ~ref mut r, _ } => r }; let in_regs: &Registers = match in_context { &Context { regs: ~ref r, _ } => r }; - + rtdebug!("doing raw swap"); unsafe { swap_registers(out_regs, in_regs) }; } } diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 3607f781da3ff..c81fc475cd899 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -167,7 +167,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -175,7 +175,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -187,7 +187,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -195,7 +195,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -207,7 +207,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -217,7 +217,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -229,7 +229,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -246,7 +246,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -259,7 +259,7 @@ mod test { let addr = next_test_ip4(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for max.times { let mut stream = listener.accept(); @@ -269,7 +269,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); @@ -284,13 +284,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -305,7 +305,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -323,13 +323,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_later { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -344,7 +344,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_later { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs index 8073c4a75b88b..f2f1b4b6b5f66 100644 --- a/src/libstd/rt/join_latch.rs +++ b/src/libstd/rt/join_latch.rs @@ -345,7 +345,7 @@ mod test { let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let child_latch = child_latch.take(); assert!(child_latch.wait(true)); } @@ -467,7 +467,7 @@ mod test { let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let latch = child_latch.take(); latch.release(false); } @@ -483,7 +483,7 @@ mod test { let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let mut latch = child_latch.take(); let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); @@ -509,7 +509,7 @@ mod test { let mut latch = child_latch.take(); let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let latch = child_latch.take(); latch.release(false); } @@ -596,7 +596,7 @@ mod test { let child_latch = Cell::new(latch.new_child()); let succeed = order.succeed; if order.immediate { - do spawntask_immediately { + do spawntask { let mut child_latch = child_latch.take(); next(&mut *child_latch, copy suborders); rtdebug!("immediate releasing"); diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 374933ab281b8..98399abfb4563 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -14,6 +14,7 @@ use rt::task::Task; use rt::local_ptr; use rt::rtio::{EventLoop, IoFactoryObject}; //use borrow::to_uint; +use cell::Cell; pub trait Local { fn put(value: ~Self); @@ -24,40 +25,56 @@ pub trait Local { unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } -impl Local for Scheduler { - fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} - fn take() -> ~Scheduler { unsafe { local_ptr::take() } } +impl Local for Task { + fn put(value: ~Task) { unsafe { local_ptr::put(value) } } + fn take() -> ~Task { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + fn borrow(f: &fn(&mut Task) -> T) -> T { let mut res: Option = None; let res_ptr: *mut Option = &mut res; unsafe { - do local_ptr::borrow |sched| { -// rtdebug!("successfully unsafe borrowed sched pointer"); - let result = f(sched); + do local_ptr::borrow |task| { + let result = f(task); *res_ptr = Some(result); } } match res { Some(r) => { r } - None => rtabort!("function failed!") + None => { rtabort!("function failed in local_borrow") } } } - unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } - unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") } + unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() } + unsafe fn try_unsafe_borrow() -> Option<*mut Task> { rtabort!("unimpl task try_unsafe_borrow") } } -impl Local for Task { - fn put(_value: ~Task) { rtabort!("unimpl") } - fn take() -> ~Task { rtabort!("unimpl") } - fn exists() -> bool { rtabort!("unimpl") } - fn borrow(f: &fn(&mut Task) -> T) -> T { - do Local::borrow:: |sched| { -// rtdebug!("sched about to grab current_task"); - match sched.current_task { +impl Local for Scheduler { + fn put(value: ~Scheduler) { + let value = Cell::new(value); + do Local::borrow:: |task| { + let task = task; + task.sched = Some(value.take()); + }; + } + fn take() -> ~Scheduler { + do Local::borrow:: |task| { + let sched = task.sched.swap_unwrap(); + let task = task; + task.sched = None; + sched + } + } + fn exists() -> bool { + do Local::borrow:: |task| { + match task.sched { + Some(ref _task) => true, + None => false + } + } + } + fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + do Local::borrow:: |task| { + match task.sched { Some(~ref mut task) => { -// rtdebug!("current task pointer: %x", to_uint(task)); -// rtdebug!("current task heap pointer: %x", to_uint(&task.heap)); f(task) } None => { @@ -66,20 +83,19 @@ impl Local for Task { } } } - unsafe fn unsafe_borrow() -> *mut Task { - match (*Local::unsafe_borrow::()).current_task { - Some(~ref mut task) => { - let s: *mut Task = &mut *task; + unsafe fn unsafe_borrow() -> *mut Scheduler { + match (*Local::unsafe_borrow::()).sched { + Some(~ref mut sched) => { + let s: *mut Scheduler = &mut *sched; return s; } None => { - // Don't fail. Infinite recursion rtabort!("no scheduler") } } } - unsafe fn try_unsafe_borrow() -> Option<*mut Task> { - if Local::exists::() { + unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { + if Local::exists::() { Some(Local::unsafe_borrow()) } else { None @@ -101,48 +117,69 @@ impl Local for IoFactoryObject { unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") } } + #[cfg(test)] mod test { use rt::test::*; - use rt::sched::Scheduler; +// use rt::sched::Scheduler; use super::*; + use rt::task::Task; + use rt::local_ptr; #[test] - fn thread_local_scheduler_smoke_test() { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); + fn thread_local_task_smoke_test() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] - fn thread_local_scheduler_two_instances() { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); + fn thread_local_task_two_instances() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + } #[test] fn borrow_smoke_test() { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); + + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + unsafe { - let _scheduler: *mut Scheduler = Local::unsafe_borrow(); + let _task: *mut Task = Local::unsafe_borrow(); } - let _scheduler: ~Scheduler = Local::take(); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] fn borrow_with_return() { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let res = do Local::borrow:: |_sched| { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + let res = do Local::borrow:: |_task| { true }; assert!(res) - let _scheduler: ~Scheduler = Local::take(); + let task: ~Task = Local::take(); + cleanup_task(task); } } + diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index b70549c266a1e..230a241780855 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -265,23 +265,35 @@ pub fn run(main: ~fn()) -> int { } }; - // Create and enqueue the main task. + // Create the main task. let main_cell = Cell::new(main); let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, main_cell.take()); main_task.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); + let main_task_cell = Cell::new(main_task); - // Run each scheduler in a thread. let mut threads = ~[]; + + // Run the main scheduler in a thread. + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task_cell.take()); + }; + threads.push(thread); + + // Run each remaining scheduler in a thread. while !scheds.is_empty() { let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { - let sched = sched_cell.take(); - sched.run(); + let mut sched = sched_cell.take(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("boostraping a non-primary scheduler"); + }; + sched.bootstrap(bootstrap_task); }; - threads.push(thread); } @@ -314,27 +326,14 @@ pub enum RuntimeContext { pub fn context() -> RuntimeContext { use task::rt::rust_task; - use self::local::Local; - use self::sched::Scheduler; - // XXX: Hitting TLS twice to check if the scheduler exists - // then to check for the task is not good for perf + // This has been modified to just not work on the new contexts, + // because we don't need the context() function there. So it + // returns either Old or Global. if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; } else { - if Local::exists::() { - let context = Cell::new_empty(); - do Local::borrow:: |sched| { - if sched.in_task_context() { - context.put_back(TaskContext); - } else { - context.put_back(SchedulerContext); - } - } - return context.take(); - } else { - return GlobalContext; - } + return GlobalContext; } pub extern { @@ -346,23 +345,9 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler}; - use rt::local::Local; - use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - assert_eq!(context(), TaskContext); - let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |sched, task| { - assert_eq!(context(), SchedulerContext); - sched.enqueue_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 6e9aef7773051..03dcc64daf3f0 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,7 @@ use option::*; use sys; -use cast::transmute; +use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use super::sleeper_list::SleeperList; @@ -25,6 +25,7 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; +use cell::Cell; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -57,11 +58,8 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler's saved context. - /// Always valid when a task is executing, otherwise not - priv saved_context: Context, - /// The currently executing task - current_task: Option<~Task>, + /// The scheduler runs on a special task. + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, @@ -88,7 +86,6 @@ enum CleanupJob { } impl Scheduler { - pub fn in_task_context(&self) -> bool { self.current_task.is_some() } pub fn sched_id(&self) -> uint { to_uint(self) } @@ -101,15 +98,14 @@ impl Scheduler { } + // When you create a scheduler it isn't yet "in" a task, so the + // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, run_anything: bool) -> Scheduler { - // Lazily initialize the runtime TLS key - local_ptr::init_tls_key(); - Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -118,8 +114,7 @@ impl Scheduler { event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), - saved_context: Context::empty(), - current_task: None, + sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything @@ -130,8 +125,59 @@ impl Scheduler { // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. - pub fn run(~self) -> ~Scheduler { - assert!(!self.in_task_context()); + + // Take a main task to run, and a scheduler to run it in. Create a + // scheduler task and bootstrap into it. + pub fn bootstrap(~self, task: ~Task) { + + // Initialize the TLS key. + local_ptr::init_tls_key(); + + // Create a task for the scheduler with an empty context. + let sched_task = Task::new_sched_task(); + + // Now that we have an empty task struct for the scheduler + // task, put it in TLS. + Local::put::(~sched_task); + + // Now, as far as all the scheduler state is concerned, we are + // inside the "scheduler" context. So we can act like the + // scheduler and resume the provided task. + self.resume_task_immediately(task); + + // Now we are back in the scheduler context, having + // successfully run the input task. Start by running the + // scheduler. Grab it out of TLS - performing the scheduler + // action will have given it away. + let sched = Local::take::(); + sched.run(); + + // Now that we are done with the scheduler, clean up the + // scheduler task. Do so by removing it from TLS and manually + // cleaning up the memory it uses. As we didn't actually call + // task.run() on the scheduler task we never get through all + // the cleanup code it runs. + + rtdebug!("post sched.run(), cleaning up scheduler task"); + let mut stask = Local::take::(); + stask.destroyed = true; + + let local_success = !stask.unwinder.unwinding; + let join_latch = stask.join_latch.swap_unwrap(); + match stask.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); + } + None => { + join_latch.release(local_success); + } + } + } + + // This does not return a scheduler, as the scheduler is placed + // inside the task. + pub fn run(~self) { let mut self_sched = self; @@ -140,79 +186,88 @@ impl Scheduler { // schedulers. self_sched.event_loop.callback(Scheduler::run_sched_once); + // This is unsafe because we need to place the scheduler, with + // the event_loop inside, inside our task. But we still need a + // mutable reference to the event_loop to give it the "run" + // command. unsafe { - let event_loop: *mut ~EventLoopObject = { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - event_loop - }; + let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - // Give ownership of the scheduler (self) to the thread - Local::put(self_sched); + // Our scheduler must be in the task before the event loop + // is started. + let self_sched = Cell::new(self_sched); + do Local::borrow:: |stask| { + stask.sched = Some(self_sched.take()); + }; (*event_loop).run(); } - - rtdebug!("run taking sched"); - let sched = Local::take::(); - // XXX: Reenable this once we're using a per-scheduler queue. With a shared - // queue this is not true - //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", { - use to_str::ToStr; - sched.metrics.to_str() - }); - return sched; } - fn run_sched_once() { + // One iteration of the scheduler loop, always run at least once. - let mut sched = Local::take::(); - sched.metrics.turns += 1; - - // First, check the message queue for instructions. - // XXX: perf. Check for messages without atomics. - // It's ok if we miss messages occasionally, as long as - // we sync and check again before sleeping. - if sched.interpret_message_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - rtdebug!("run_sched_once, interpret_message_queue taking sched"); - let mut sched = Local::take::(); - sched.metrics.messages_received += 1; - sched.event_loop.callback(Scheduler::run_sched_once); - Local::put(sched); - return; - } + // The model for this function is that you continue through it + // until you either use the scheduler while performing a schedule + // action, in which case you give it away and do not return, or + // you reach the end and sleep. In the case that a scheduler + // action is performed the loop is evented such that this function + // is called again. + fn run_sched_once() { - // Now, look in the work queue for tasks to run - rtdebug!("run_sched_once taking"); + // When we reach the scheduler context via the event loop we + // already have a scheduler stored in our local task, so we + // start off by taking it. This is the only path through the + // scheduler where we get the scheduler this way. let sched = Local::take::(); - if sched.resume_task_from_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - do Local::borrow:: |sched| { - sched.metrics.tasks_resumed_from_queue += 1; - sched.event_loop.callback(Scheduler::run_sched_once); + + // Our first task is to read mail to see if we have important + // messages. + + // 1) A wake message is easy, mutate sched struct and return + // it. + // 2) A shutdown is also easy, shutdown. + // 3) A pinned task - we resume immediately and do not return + // here. + + let result = sched.interpret_message_queue(); + let sched = match result { + Some(sched) => { + // We did not resume a task, so we returned. + sched } - return; - } + None => { + return; + } + }; + + let result = sched.resume_task_from_queue(); + let mut sched = match result { + Some(sched) => { + // Failed to dequeue a task, so we return. + sched + } + None => { + return; + } + }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - rtdebug!("no work to do"); - do Local::borrow:: |sched| { - sched.metrics.wasted_turns += 1; - if !sched.sleepy && !sched.no_sleep { - rtdebug!("sleeping"); - sched.metrics.sleepy_times += 1; - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - } else { - rtdebug!("not sleeping"); - } + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("scheduler has no work to do, going to sleep"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); } + + // Finished a cycle without using the Scheduler. Place it back + // in TLS. + Local::put(sched); } pub fn make_handle(&mut self) -> SchedHandle { @@ -232,18 +287,6 @@ impl Scheduler { /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { - // We don't want to queue tasks that belong on other threads, - // so we send them home at enqueue time. - - // The borrow checker doesn't like our disassembly of the - // Coroutine struct and partial use and mutation of the - // fields. So completely disassemble here and stop using? - - // XXX perf: I think we might be able to shuffle this code to - // only destruct when we need to. - - rtdebug!("a task was queued on: %u", self.sched_id()); - let this = self; // We push the task onto our local queue clone. @@ -273,30 +316,23 @@ impl Scheduler { // * Scheduler-context operations - fn interpret_message_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking for scheduler messages"); + // This function returns None if the scheduler is "used", or it + // returns the still-available scheduler. + fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - rtdebug!("recv BiasedTask message in sched: %u", - this.sched_id()); let mut task = task; - task.home = Some(Sched(this.make_handle())); - this.resume_task_immediately(task); - return true; + task.give_home(Sched(this.make_handle())); + this.change_task_context(task, Scheduler::store_stask); + return None; } - Some(Wake) => { - rtdebug!("recv Wake message"); this.sleepy = false; - Local::put(this); - return true; + return Some(this); } Some(Shutdown) => { - rtdebug!("recv Shutdown message"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -315,12 +351,14 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - Local::put(this); - return true; + // YYY: Does a shutdown count as a "use" of the + // scheduler? This seems to work - so I'm leaving it + // this way despite not having a solid rational for + // why I should return the scheduler here. + return Some(this); } None => { - Local::put(this); - return false; + return Some(this); } } } @@ -328,7 +366,7 @@ impl Scheduler { /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.home.swap_unwrap(); + let mut home = task.swap_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -341,69 +379,45 @@ impl Scheduler { // Resume a task from the queue - but also take into account that // it might not belong here. - fn resume_task_from_queue(~self) -> bool { - assert!(!self.in_task_context()); - rtdebug!("looking in work queue for task to schedule"); + // If we perform a scheduler action we give away the scheduler ~ + // pointer, if it is still available we return it. + + fn resume_task_from_queue(~self) -> Option<~Scheduler> { + let mut this = self; - // The borrow checker imposes the possibly absurd requirement - // that we split this into two match expressions. This is due - // to the inspection of the internal bits of task, as that - // can't be in scope when we act on task. match this.work_queue.pop() { Some(task) => { - let action_id = { - let home = &task.home; - match home { - &Some(Sched(ref home_handle)) - if home_handle.sched_id != this.sched_id() => { - SendHome - } - &Some(AnySched) if this.run_anything => { - ResumeNow - } - &Some(AnySched) => { - Requeue - } - &Some(Sched(_)) => { - ResumeNow - } - &None => { - Homeless + let mut task = task; + let home = task.swap_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + task.give_home(Sched(home_handle)); + this.change_task_context(task, Scheduler::store_stask); + return None; } } - }; - - match action_id { - SendHome => { - rtdebug!("sending task home"); - Scheduler::send_task_home(task); - Local::put(this); - return false; - } - ResumeNow => { - rtdebug!("resuming now"); - this.resume_task_immediately(task); - return true; + AnySched if this.run_anything => { + task.give_home(AnySched); + this.change_task_context(task, Scheduler::store_stask); + return None; } - Requeue => { - rtdebug!("re-queueing") + AnySched => { + task.give_home(AnySched); this.enqueue_task(task); - Local::put(this); - return false; - } - Homeless => { - rtabort!("task home was None!"); + return Some(this); } } } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + return Some(this); + } } } @@ -412,9 +426,6 @@ impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { - assert!(self.in_task_context()); - - rtdebug!("ending running task"); do self.deschedule_running_task_and_then |sched, dead_task| { let mut dead_task = dead_task; @@ -422,11 +433,12 @@ impl Scheduler { coroutine.recycle(&mut sched.stack_pool); } - rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Task) { - assert!(self.in_task_context()); + // If a scheduling action is performed, return None. If not, + // return Some(sched). + + pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -439,143 +451,175 @@ impl Scheduler { if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); do this.switch_running_tasks_and_then(task) |sched, last_task| { sched.enqueue_task(last_task); } + return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.enqueue_task(task); - Local::put(this); + return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); - Local::put(this); + return Some(this); } } - // Core scheduling ops - - pub fn resume_task_immediately(~self, task: ~Task) { + // The primary function for changing contexts. In the current + // design the scheduler is just a slightly modified GreenTask, so + // all context swaps are from Task to Task. The only difference + // between the various cases is where the inputs come from, and + // what is done with the resulting task. That is specified by the + // cleanup function f, which takes the scheduler and the + // old task as inputs. + + pub fn change_task_context(~self, + next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; - assert!(!this.in_task_context()); - rtdebug!("scheduling a task"); - this.metrics.context_switches_sched_to_task += 1; + // The current task is grabbed from TLS, not taken as an input. + let current_task: ~Task = Local::take::(); - // Store the task in the scheduler so it can be grabbed later - this.current_task = Some(task); - this.enqueue_cleanup_job(DoNothing); + // These transmutes do something fishy with a closure. + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); + + // The current task is placed inside an enum with the cleanup + // function. This enum is then placed inside the scheduler. + this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); - Local::put(this); + // The scheduler is then placed inside the next task. + let mut next_task = next_task; + next_task.sched = Some(this); - // Take pointers to both the task and scheduler's saved registers. + // However we still need an internal mutable pointer to the + // original task. The strategy here was "arrange memory, then + // get pointers", so we crawl back up the chain using + // transmute to eliminate borrowck errors. unsafe { - let sched = Local::unsafe_borrow::(); - let (sched_context, _, next_task_context) = (*sched).get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - let sched = Local::unsafe_borrow::(); - // The running task should have passed ownership elsewhere - assert!((*sched).current_task.is_none()); + let sched: &mut Scheduler = + transmute_mut_region(*next_task.sched.get_mut_ref()); - // Running tasks may have asked us to do some cleanup - (*sched).run_cleanup_job(); - } - } + let current_task: &mut Task = match sched.cleanup_job { + Some(GiveTask(ref task, _)) => { + transmute_mut_region(*transmute_mut_unsafe(task)) + } + Some(DoNothing) => { + rtabort!("no next task"); + } + None => { + rtabort!("no cleanup job"); + } + }; - /// Block a running task, context switch to the scheduler, then pass the - /// blocked task to a closure. - /// - /// # Safety note - /// - /// The closure here is a *stack* closure that lives in the - /// running task. It gets transmuted to the scheduler's lifetime - /// and called while the task is blocked. - /// - /// This passes a Scheduler pointer to the fn after the context switch - /// in order to prevent that fn from performing further scheduling operations. - /// Doing further scheduling could easily result in infinite recursion. - pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) { - let mut this = self; - assert!(this.in_task_context()); + let (current_task_context, next_task_context) = + Scheduler::get_contexts(current_task, next_task); - rtdebug!("blocking task"); - this.metrics.context_switches_task_to_sched += 1; + // Done with everything - put the next task in TLS. This + // works because due to transmute the borrow checker + // believes that we have no internal pointers to + // next_task. + Local::put(next_task); - unsafe { - let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f); - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); + // The raw context swap operation. The next action taken + // will be running the cleanup job from the context of the + // next task. + Context::swap(current_task_context, next_task_context); } - Local::put(this); - + // When the context swaps back to the scheduler we immediately + // run the cleanup job, as expected by the previously called + // swap_contexts function. unsafe { - let sched = Local::unsafe_borrow::(); - let (sched_context, last_task_context, _) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - - // We could be executing in a different thread now let sched = Local::unsafe_borrow::(); (*sched).run_cleanup_job(); } } - /// Switch directly to another task, without going through the scheduler. - /// You would want to think hard about doing this, e.g. if there are - /// pending I/O events it would be a bad idea. - pub fn switch_running_tasks_and_then(~self, next_task: ~Task, - f: &fn(&mut Scheduler, ~Task)) { + // There are a variety of "obvious" functions to be passed to + // change_task_context, so we can make a few "named cases". + + // Place the old task, which is the scheduler task, into the + // sched's stask slot. + pub fn store_stask(sched: &mut Scheduler, stask: ~Task) { + sched.sched_task = Some(stask); + } + + // Enqueue the old task on the current scheduler. + pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { + sched.enqueue_task(task); + } + + // Sometimes we just want the old API though. + + pub fn resume_task_immediately(~self, task: ~Task) { + self.change_task_context(task, Scheduler::store_stask); + } + + pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) { + // Trickier - we need to get the scheduler task out of self + // and use it as the destination. let mut this = self; - assert!(this.in_task_context()); + let stask = this.sched_task.swap_unwrap(); + this.change_task_context(stask, f); + } - rtdebug!("switching tasks"); - this.metrics.context_switches_task_to_task += 1; + pub fn switch_running_tasks_and_then(~self, next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { + // Literally the same as change_task_context. + self.change_task_context(next_task, f); + } - let old_running_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f) + // A helper that looks up the scheduler and runs a task later by + // enqueuing it. + pub fn run_task_later(next_task: ~Task) { + // We aren't performing a scheduler operation, so we want to + // put the Scheduler back when we finish. + let next_task = Cell::new(next_task); + do Local::borrow:: |sched| { + sched.enqueue_task(next_task.take()); }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); - this.current_task = Some(next_task); + } - Local::put(this); + // A helper that looks up the scheduler and runs a task. If it can + // be run now it is run now. + pub fn run_task(new_task: ~Task) { + let sched = Local::take::(); + sched.schedule_task(new_task).map_consume(Local::put); + } + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; unsafe { - let sched = Local::unsafe_borrow::(); - let (_, last_task_context, next_task_context) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) } } - - // * Other stuff pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - assert!(self.cleanup_job.is_none()); self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - - assert!(self.cleanup_job.is_some()); - let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { DoNothing => { } @@ -583,54 +627,6 @@ impl Scheduler { } } - /// Get mutable references to all the contexts that may be involved in a - /// context switch. - /// - /// Returns (the scheduler context, the optional context of the - /// task in the cleanup list, the optional context of the task in - /// the current task slot). When context switching to a task, - /// callers should first arrange for that task to be located in the - /// Scheduler's current_task slot and set up the - /// post-context-switch cleanup job. - pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, - Option<&'a mut Context>, - Option<&'a mut Context>) { - let last_task = match self.cleanup_job { - Some(GiveTask(~ref task, _)) => { - Some(task) - } - Some(DoNothing) => { - None - } - None => fail!("all context switches should have a cleanup job") - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - unsafe { - let last_task = transmute::, Option<&mut Task>>(last_task); - let last_task_context = match last_task { - Some(t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - let next_task_context = match self.current_task { - Some(ref mut t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - // XXX: These transmutes can be removed after snapshot - return (transmute(&mut self.saved_context), - last_task_context, - transmute(next_task_context)); - } - } } // The cases for the below function. @@ -660,19 +656,63 @@ impl ClosureConverter for UnsafeTaskReceiver { fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } - #[cfg(test)] mod test { + use rt::test::*; + use unstable::run_in_bare_thread; + use borrow::to_uint; + use rt::local::*; + use rt::sched::{Scheduler}; + use uint; use int; use cell::Cell; - use unstable::run_in_bare_thread; - use task::spawn; - use rt::local::Local; - use rt::test::*; - use super::*; use rt::thread::Thread; - use borrow::to_uint; - use rt::task::{Task,Sched}; + use rt::task::{Task, Sched}; + use option::{Some}; + + #[test] + fn trivial_run_in_newsched_task_test() { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + do run_in_newsched_task || { + unsafe { *task_ran_ptr = true }; + rtdebug!("executed from the new scheduler") + } + assert!(task_ran); + } + + #[test] + fn multiple_task_test() { + let total = 10; + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + for uint::range(0,total) |_| { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; + } + } + } + assert!(task_run_count == total); + } + + #[test] + fn multiple_task_nested_test() { + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + } + } + } + } + assert!(task_run_count == 3); + } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. @@ -695,46 +735,50 @@ mod test { } } - // A simple test to check if a homed task run on a single - // scheduler ends up executing while home. + + // A very simple test that confirms that a task executing on the + // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~new_test_uv_sched(); + let mut sched = ~new_test_uv_sched(); let sched_handle = sched.make_handle(); - let sched_id = sched.sched_id(); - let task = ~do Task::new_root_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; - let sched = Local::take::(); - assert!(sched.sched_id() == sched_id); - Local::put::(sched); + assert!(Task::on_appropriate_sched()); }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); + + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + task.on_exit = Some(on_exit); + + sched.bootstrap(task); } } - // A test for each state of schedule_task + // An advanced test that checks all four possible states that a + // (task,sched) can be in regarding homes. + #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; + use rt::sched::Shutdown; + use borrow; + use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - // our normal scheduler + // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), work_queue.clone(), @@ -742,123 +786,105 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); - // our special scheduler + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - true); + false); let special_handle = Cell::new(special_sched.make_handle()); - let special_handle2 = Cell::new(special_sched.make_handle()); - let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) || { - let is_home = Task::is_home_using_id(special_id); - rtdebug!("t1 should be home: %b", is_home); - assert!(is_home); - }; - let t1f = Cell::new(t1f); + // Four test tasks: + // 1) task is home on special + // 2) task not homed, sched doesn't care + // 3) task not homed, sched requeues + // 4) task not home, send home - let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { - let on_special = Task::on_special(); - rtdebug!("t2 should not be on special: %b", on_special); - assert!(!on_special); + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + rtassert!(Task::on_appropriate_sched()); }; - let t2f = Cell::new(t2f); + rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { - // not on special - let on_special = Task::on_special(); - rtdebug!("t3 should not be on special: %b", on_special); - assert!(!on_special); + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t3f = Cell::new(t3f); - - let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { - // is home - let home = Task::is_home_using_id(special_id); - rtdebug!("t4 should be home: %b", home); - assert!(home); - }; - let t4f = Cell::new(t4f); - // we have four tests, make them as closures - let t1: ~fn() = || { - // task is home on special - let task = t1f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t2: ~fn() = || { - // not homed, task doesn't care - let task = t2f.take(); - let sched = Local::take::(); - sched.schedule_task(task); + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t3: ~fn() = || { - // task not homed, must leave - let task = t3f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t4: ~fn() = || { - // task not home, send home - let task = t4f.take(); - let sched = Local::take::(); - sched.schedule_task(task); + + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + rtassert!(Task::on_appropriate_sched()); }; + rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); - let t1 = Cell::new(t1); - let t2 = Cell::new(t2); - let t3 = Cell::new(t3); - let t4 = Cell::new(t4); - - // build a main task that runs our four tests - let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { - // the two tasks that require a normal start location - t2.take()(); - t4.take()(); - normal_handle.take().send(Shutdown); - special_handle.take().send(Shutdown); + let task1 = Cell::new(task1); + let task2 = Cell::new(task2); + let task3 = Cell::new(task3); + let task4 = Cell::new(task4); + + // Signal from the special task that we are done. + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtdebug!("*about to submit task2*"); + Scheduler::run_task(task2.take()); + rtdebug!("*about to submit task4*"); + Scheduler::run_task(task4.take()); + rtdebug!("*normal_task done*"); + port.take().recv(); + let mut nh = normal_handle.take(); + nh.send(Shutdown); + let mut sh = special_handle.take(); + sh.send(Shutdown); }; - // task to run the two "special start" tests - let special_task = ~do Task::new_root_homed( - &mut special_sched.stack_pool, - Sched(special_handle2.take())) { - t1.take()(); - t3.take()(); + rtdebug!("normal task: %u", borrow::to_uint(normal_task)); + + let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + rtdebug!("*about to submit task1*"); + Scheduler::run_task(task1.take()); + rtdebug!("*about to submit task3*"); + Scheduler::run_task(task3.take()); + rtdebug!("*done with special_task*"); + chan.take().send(()); }; - // enqueue the main tasks - normal_sched.enqueue_task(special_task); - normal_sched.enqueue_task(main_task); + rtdebug!("special task: %u", borrow::to_uint(special_task)); + + let special_sched = Cell::new(special_sched); + let normal_sched = Cell::new(normal_sched); + let special_task = Cell::new(special_task); + let normal_task = Cell::new(normal_task); - let nsched_cell = Cell::new(normal_sched); let normal_thread = do Thread::start { - let sched = nsched_cell.take(); - sched.run(); + normal_sched.take().bootstrap(normal_task.take()); + rtdebug!("finished with normal_thread"); }; - let ssched_cell = Cell::new(special_sched); let special_thread = do Thread::start { - let sched = ssched_cell.take(); - sched.run(); + special_sched.take().bootstrap(special_task.take()); + rtdebug!("finished with special_sched"); }; - // wait for the end + rtdebug!("waiting on threads"); let _thread1 = normal_thread; + rtdebug!("normal_thread finished"); let _thread2 = special_thread; + rtdebug!("special_thread finished"); + } } - // Do it a lot #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -867,118 +893,6 @@ mod test { } } - #[test] - fn test_simple_scheduling() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_ran_ptr = true; } - }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); - } - } - - #[test] - fn test_several_tasks() { - do run_in_bare_thread { - let total = 10; - let mut task_count = 0; - let task_count_ptr: *mut int = &mut task_count; - - let mut sched = ~new_test_uv_sched(); - for int::range(0, total) |_| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_count_ptr = *task_count_ptr + 1; } - }; - sched.enqueue_task(task); - } - sched.run(); - assert_eq!(task_count, total); - } - } - - #[test] - fn test_swap_tasks_then() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - let task1 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Local::take::(); - let task2 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |sched, task1| { - let task1 = Cell::new(task1); - sched.enqueue_task(task1.take()); - } - unsafe { *count_ptr = *count_ptr + 1; } - }; - sched.enqueue_task(task1); - sched.run(); - assert_eq!(count, 3); - } - } - - #[bench] #[test] #[ignore(reason = "long test")] - fn test_run_a_lot_of_tasks_queued() { - do run_in_bare_thread { - static MAX: int = 1000000; - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - - let start_task = ~do Task::new_root(&mut sched.stack_pool) { - run_task(count_ptr); - }; - sched.enqueue_task(start_task); - sched.run(); - - assert_eq!(count, MAX); - - fn run_task(count_ptr: *mut int) { - do Local::borrow:: |sched| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } - } - }; - sched.enqueue_task(task); - } - }; - } - } - - #[test] - fn test_block_task() { - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - let sched = Local::take::(); - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - let task = Cell::new(task); - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } - }; - sched.enqueue_task(task); - sched.run(); - } - } - #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks @@ -986,7 +900,7 @@ mod test { // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { - do spawn { + do spawntask { let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); @@ -1007,34 +921,46 @@ mod test { do run_in_bare_thread { let (port, chan) = oneshot::<()>(); - let port_cell = Cell::new(port); - let chan_cell = Cell::new(chan); - let mut sched1 = ~new_test_uv_sched(); - let handle1 = sched1.make_handle(); - let handle1_cell = Cell::new(handle1); - let task1 = ~do Task::new_root(&mut sched1.stack_pool) { - chan_cell.take().send(()); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let _thread_one = do Thread::start { + do run_in_newsched_task_core { + chan.take().send(()); + } }; - sched1.enqueue_task(task1); - let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Task::new_root(&mut sched2.stack_pool) { - port_cell.take().recv(); - // Release the other scheduler's handle so it can exit - handle1_cell.take(); + let _thread_two = do Thread::start { + do run_in_newsched_task_core { + port.take().recv(); + } }; - sched2.enqueue_task(task2); + } - let sched1_cell = Cell::new(sched1); - let _thread1 = do Thread::start { - let sched1 = sched1_cell.take(); - sched1.run(); + do run_in_bare_thread { + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let mut sched = ~new_test_uv_sched(); + let handle = sched.make_handle(); + let handle = Cell::new(handle); + let sched = Cell::new(sched); + + let _thread_one = do Thread::start { + do run_in_newsched_task_core { + port.take().recv(); + handle.take(); + } }; - let sched2_cell = Cell::new(sched2); - let _thread2 = do Thread::start { - let sched2 = sched2_cell.take(); - sched2.run(); + let _thread_two = do Thread::start { + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + let job: ~fn() = || { chan.take().send(()) }; + let mut sched = sched.take(); + let mut task = ~Task::new_root(&mut sched.stack_pool, job); + task.on_exit = Some(on_exit); + sched.bootstrap(task); }; } } @@ -1051,7 +977,7 @@ mod test { for 10.times { let (port, chan) = oneshot(); let chan_cell = Cell::new(chan); - do spawntask_later { + do spawntask { chan_cell.take().send(()); } ports.push(port); @@ -1063,21 +989,21 @@ mod test { } } - #[test] + #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); + let (p, ch1) = stream(); let mut p = p; - ch1.send((token, end_chan)); - let mut i = 2; + ch1.send((token, end_chan)); + let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; @@ -1102,9 +1028,9 @@ mod test { while (true) { match p.recv() { (1, end_chan) => { - debug!("%d\n", id); - end_chan.send(()); - return; + debug!("%d\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); @@ -1129,15 +1055,16 @@ mod test { impl Drop for S { fn drop(&self) { - let _foo = @0; + let _foo = @0; } } let s = S { field: () }; do spawntask { - let _ss = &s; + let _ss = &s; } } } + } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index b4f4c1b3e35b9..a1e553cb9ba22 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -29,19 +29,32 @@ use rt::stack::{StackSegment, StackPool}; use rt::context::Context; use cell::Cell; +// The Task struct represents all state associated with a rust +// task. There are at this point two primary "subtypes" of task, +// however instead of using a subtype we just have a "task_type" field +// in the struct. This contains a pointer to another struct that holds +// the type-specific state. + pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, unwinder: Unwinder, - home: Option, join_latch: Option<~JoinLatch>, on_exit: Option<~fn(bool)>, destroyed: bool, - coroutine: Option<~Coroutine> + coroutine: Option<~Coroutine>, + sched: Option<~Scheduler>, + task_type: TaskType +} + +pub enum TaskType { + GreenTask(Option<~SchedHome>), + SchedTask } +/// A coroutine is nothing more than a (register context, stack) pair. pub struct Coroutine { /// The segment of stack on which the task is currently running or /// if the task is blocked, on which the task will resume @@ -51,6 +64,7 @@ pub struct Coroutine { saved_context: Context } +/// Some tasks have a deciated home scheduler that they must run on. pub enum SchedHome { AnySched, Sched(SchedHandle) @@ -65,6 +79,58 @@ pub struct Unwinder { impl Task { + // A helper to build a new task using the dynamically found + // scheduler and task. Only works in GreenTask context. + pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.swap_unwrap(); + let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_child(f: ~fn()) -> ~Task { + Task::build_homed_child(f, AnySched) + } + + pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.swap_unwrap(); + let new_task = ~Task::new_root_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_root(f: ~fn()) -> ~Task { + Task::build_homed_root(f, AnySched) + } + + pub fn new_sched_task() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Unwinder { unwinding: false }, + join_latch: Some(JoinLatch::new_root()), + on_exit: None, + destroyed: false, + coroutine: Some(~Coroutine::empty()), + sched: None, + task_type: SchedTask + } + } + pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Task { Task::new_root_homed(stack_pool, AnySched, start) @@ -85,11 +151,12 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, - home: Some(home), join_latch: Some(JoinLatch::new_root()), on_exit: None, destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)) + coroutine: Some(~Coroutine::new(stack_pool, start)), + sched: None, + task_type: GreenTask(Some(~home)) } } @@ -102,25 +169,40 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - home: Some(home), unwinder: Unwinder { unwinding: false }, join_latch: Some(self.join_latch.get_mut_ref().new_child()), on_exit: None, destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)) + coroutine: Some(~Coroutine::new(stack_pool, start)), + sched: None, + task_type: GreenTask(Some(~home)) } } pub fn give_home(&mut self, new_home: SchedHome) { - self.home = Some(new_home); + match self.task_type { + GreenTask(ref mut home) => { + *home = Some(~new_home); + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } } - pub fn run(&mut self, f: &fn()) { - // This is just an assertion that `run` was called unsafely - // and this instance of Task is still accessible. - do Local::borrow:: |task| { - assert!(borrow::ref_eq(task, self)); + pub fn swap_unwrap_home(&mut self) -> SchedHome { + match self.task_type { + GreenTask(ref mut home) => { + let out = home.swap_unwrap(); + return *out; + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } } + } + + pub fn run(&mut self, f: &fn()) { self.unwinder.try(f); self.destroy(); @@ -146,6 +228,8 @@ impl Task { /// thread-local-storage. fn destroy(&mut self) { + rtdebug!("DESTROYING TASK: %u", borrow::to_uint(self)); + do Local::borrow:: |task| { assert!(borrow::ref_eq(task, self)); } @@ -163,6 +247,64 @@ impl Task { self.destroyed = true; } + // New utility functions for homes. + + pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _}))) => { + *id == sched.sched_id() + } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + // Awe yea + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + + pub fn homed(&self) -> bool { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { _ }))) => { true } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + + // Grab both the scheduler and the task from TLS and check if the + // task is executing on an appropriate scheduler. + pub fn on_appropriate_sched() -> bool { + do Local::borrow:: |task| { + let sched_id = task.sched.get_ref().sched_id(); + let sched_run_anything = task.sched.get_ref().run_anything; + match task.task_type { + GreenTask(Some(~AnySched)) => { + rtdebug!("anysched task in sched check ****"); + sched_run_anything + } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _ }))) => { + rtdebug!("homed task in sched check ****"); + *id == sched_id + } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + } + + // These utility functions related to home will need to change. +/* /// Check if *task* is currently home. pub fn is_home(&self) -> bool { do Local::borrow:: |sched| { @@ -215,11 +357,14 @@ impl Task { !sched.run_anything } } - +*/ } impl Drop for Task { - fn drop(&self) { assert!(self.destroyed) } + fn drop(&self) { + rtdebug!("called drop for a task"); + assert!(self.destroyed) + } } // Coroutines represent nothing more than a context and a stack @@ -239,19 +384,33 @@ impl Coroutine { } } + pub fn empty() -> Coroutine { + Coroutine { + current_stack_segment: StackSegment::new(0), + saved_context: Context::empty() + } + } + fn build_start_wrapper(start: ~fn()) -> ~fn() { let start_cell = Cell::new(start); let wrapper: ~fn() = || { // First code after swap to this new context. Run our // cleanup job. unsafe { - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - let sched = Local::unsafe_borrow::(); - let task = (*sched).current_task.get_mut_ref(); + // Again - might work while safe, or it might not. + do Local::borrow:: |sched| { + (sched).run_cleanup_job(); + } + + // To call the run method on a task we need a direct + // reference to it. The task is in TLS, so we can + // simply unsafe_borrow it to get this reference. We + // need to still have the task in TLS though, so we + // need to unsafe_borrow. + let task = Local::unsafe_borrow::(); - do task.run { + do (*task).run { // N.B. Removing `start` from the start wrapper // closure by emptying a cell is critical for // correctness. The ~Task pointer, and in turn the @@ -267,8 +426,11 @@ impl Coroutine { }; } + // We remove the sched from the Task in TLS right now. let sched = Local::take::(); - sched.terminate_current_task(); + // ... allowing us to give it away when performing a + // scheduling operation. + sched.terminate_current_task() }; return wrapper; } @@ -329,7 +491,7 @@ impl Unwinder { } } } - +/* #[cfg(test)] mod test { use rt::test::*; @@ -472,3 +634,4 @@ mod test { } } } +*/ \ No newline at end of file diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 659c7eb498573..d291c5648aab4 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -13,14 +13,12 @@ use uint; use option::{Some, None}; use rt::sched::Scheduler; use super::io::net::ip::{IpAddr, Ipv4}; -use rt::local::Local; use unstable::run_in_bare_thread; use rt::thread::Thread; use rt::task::Task; use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; -use rt::task::{Sched}; use rt::comm::oneshot; use result::{Result, Ok, Err}; @@ -29,29 +27,37 @@ pub fn new_test_uv_sched() -> Scheduler { let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message sched.no_sleep = true; return sched; + } -/// Creates a new scheduler in a new thread and runs a task in it, -/// then waits for the scheduler to exit. Failure of the task -/// will abort the process. pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); - let mut task = ~Task::new_root(&mut sched.stack_pool, - f.take()); - rtdebug!("newsched_task: %x", to_uint(task)); - task.on_exit = Some(on_exit); - sched.enqueue_task(task); - sched.run(); + run_in_newsched_task_core(f.take()); } } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_uv_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, f); + task.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + /// Create more than one scheduler and run a function in a task /// in one of the schedulers. The schedulers will stay alive /// until the function `f` returns. @@ -61,7 +67,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { use rt::sched::Shutdown; use rt::util; - let f_cell = Cell::new(f); + let f = Cell::new(f); do run_in_bare_thread { let nthreads = match os::getenv("RUST_TEST_THREADS") { @@ -69,7 +75,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { None => { // Using more threads than cores in test code // to force the OS to preempt them frequently. - // Assuming that this help stress test concurrent types. + // Assuming that this helps stress test concurrent types. util::num_cpus() * 2 } }; @@ -91,7 +97,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { scheds.push(sched); } - let f_cell = Cell::new(f_cell.take()); + let f = Cell::new(f.take()); let handles = Cell::new(handles); let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); @@ -103,18 +109,30 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - f_cell.take()); + f.take()); main_task.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); let mut threads = ~[]; + let main_task = Cell::new(main_task); - while !scheds.is_empty() { + let main_thread = { let sched = scheds.pop(); let sched_cell = Cell::new(sched); + do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task.take()); + } + }; + threads.push(main_thread); + + while !scheds.is_empty() { + let mut sched = scheds.pop(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {}; + let bootstrap_task_cell = Cell::new(bootstrap_task); + let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(bootstrap_task_cell.take()); }; threads.push(thread); @@ -128,190 +146,72 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("spawntask taking the scheduler from TLS"); - - - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - rtdebug!("new task pointer: %x", to_uint(task)); - - let sched = Local::take::(); - rtdebug!("spawntask scheduling the new task"); - sched.schedule_task(task); -} - - -/// Create a new task and run it right now. Aborts on failure -pub fn spawntask_immediately(f: ~fn()) { - use super::sched::*; - - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_task(task); - } + Scheduler::run_task(Task::build_child(f)); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - let mut sched = Local::take::(); - sched.enqueue_task(task); - Local::put(sched); + Scheduler::run_task_later(Task::build_child(f)); } -/// Spawn a task and either run it immediately or run it later pub fn spawntask_random(f: ~fn()) { - use super::sched::*; use rand::{Rand, rng}; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - - } - }; - - let mut sched = Local::take::(); - let mut rng = rng(); let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_task(task); - } + spawntask(f) } else { - sched.enqueue_task(task); - Local::put(sched); + spawntask_later(f) } } -/// Spawn a task, with the current scheduler as home, and queue it to -/// run later. -pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { - use super::sched::*; - use rand::{rng, RngUtil}; - let mut rng = rng(); - - let task = { - let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - let handle = sched.make_handle(); - let home_id = handle.sched_id; - - // now that we know where this is going, build a new function - // that can assert it is in the right place - let af: ~fn() = || { - do Local::borrow::() |sched| { - rtdebug!("home_id: %u, runtime loc: %u", - home_id, - sched.sched_id()); - assert!(home_id == sched.sched_id()); - }; - f() - }; - - ~Task::new_root_homed(&mut sched.stack_pool, - Sched(handle), - af) - }; - let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - // enqueue it for future execution - dest_sched.enqueue_task(task); -} - -/// Spawn a task and wait for it to finish, returning whether it -/// completed successfully or failed -pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { - use cell::Cell; - use super::sched::*; - - let f = Cell::new(f); +pub fn spawntask_try(f: ~fn()) -> Result<(),()> { let (port, chan) = oneshot(); let chan = Cell::new(chan); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); - let mut new_task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow:: |_running_task| { - - // I don't understand why using a child task here fails. I - // think the fail status is propogating back up the task - // tree and triggering a fail for the parent, which we - // aren't correctly expecting. - - // ~running_task.new_child(&mut (*sched).stack_pool, - ~Task::new_root(&mut (*sched).stack_pool, - f.take()) - } - }; - new_task.on_exit = Some(on_exit); - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { - sched.enqueue_task(old_task); - } + let mut new_task = Task::build_root(f); + new_task.on_exit = Some(on_exit); - rtdebug!("enqueued the new task, now waiting on exit_status"); + Scheduler::run_task(new_task); let exit_status = port.recv(); if exit_status { Ok(()) } else { Err(()) } + } -// Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { - use rt::sched::*; let f = Cell::new(f); - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let task = Cell::new(task); - let thread = do Thread::start { - let mut sched = ~new_test_uv_sched(); - sched.enqueue_task(task.take()); - sched.run(); + run_in_newsched_task_core(f.take()); }; + return thread; } +/// Use to cleanup tasks created for testing but not "run". +pub fn cleanup_task(task: ~Task) { + + let mut task = task; + task.destroyed = true; + + let local_success = !task.unwinder.unwinding; + let join_latch = task.join_latch.swap_unwrap(); + match task.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); + } + None => { + join_latch.release(local_success); + } + } +} /// Get a port number, starting at 9600, for use in tests pub fn next_test_port() -> u16 { @@ -337,4 +237,4 @@ pub fn stress_factor() -> uint { Some(val) => uint::from_str(val).get(), None => 1 } -} +} \ No newline at end of file diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index 013eb438c3657..5032a81846129 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -17,7 +17,6 @@ use option::*; use clone::Clone; use super::rc::RC; use rt::sched::Scheduler; -use rt::{context, TaskContext, SchedulerContext}; use rt::local::Local; use rt::task::Task; use vec::OwnedVector; @@ -44,8 +43,6 @@ impl Tube { pub fn send(&mut self, val: T) { rtdebug!("tube send"); - assert!(context() == SchedulerContext); - unsafe { let state = self.p.unsafe_borrow_mut(); (*state).buf.push(val); @@ -61,8 +58,6 @@ impl Tube { } pub fn recv(&mut self) -> T { - assert!(context() == TaskContext); - unsafe { let state = self.p.unsafe_borrow_mut(); if !(*state).buf.is_empty() { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 883eda0057fdb..424ef4150ba45 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -49,7 +49,7 @@ use unstable::finally::Finally; use rt::io::IoError; -#[cfg(test)] use unstable::run_in_bare_thread; +//#[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher}; @@ -325,7 +325,7 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { return None; } } - +/* #[test] fn test_slice_to_uv_buf() { let slice = [0, .. 20]; @@ -352,3 +352,4 @@ fn loop_smoke_test() { loop_.close(); } } +*/ \ No newline at end of file diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 69d14dbf9a17f..11bab69f7cfa3 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -29,7 +29,7 @@ use unstable::sync::{Exclusive, exclusive}; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use rt::test::{spawntask_immediately, +#[cfg(test)] use rt::test::{spawntask, next_test_ip4, run_in_newsched_task}; @@ -206,13 +206,11 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("connect: entered scheduler context"); - assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); @@ -366,12 +364,10 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("read: entered scheduler context"); - assert!(!sched.in_task_context()); let mut watcher = watcher; let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every @@ -410,7 +406,6 @@ impl RtioTcpStream for UvTcpStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { @@ -454,7 +449,7 @@ fn test_simple_tcp_server_and_client() { let addr = next_test_ip4(); // Start the server first so it's listening when we connect - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -469,7 +464,7 @@ fn test_simple_tcp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -484,7 +479,7 @@ fn test_read_and_block() { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let io = unsafe { Local::unsafe_borrow::() }; let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; let mut stream = listener.accept().unwrap(); @@ -517,7 +512,7 @@ fn test_read_and_block() { assert!(reads > 1); } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); @@ -537,7 +532,7 @@ fn test_read_read_read() { let addr = next_test_ip4(); static MAX: uint = 500000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -551,7 +546,7 @@ fn test_read_read_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut stream = (*io).tcp_connect(addr).unwrap(); diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index ae8f1c2101dec..92a9f07b5cd0f 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -1180,4 +1180,3 @@ fn test_simple_newsched_spawn() { spawn(||()) } } - diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index bcb7e06bf1f74..a2cc77f3b70ad 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -90,7 +90,6 @@ use task::unkillable; use uint; use util; use unstable::sync::{Exclusive, exclusive}; -use rt::local::Local; use rt::task::Task; use iterator::IteratorUtil; @@ -566,39 +565,26 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { use rt::*; - match context() { - OldTaskContext => { - spawn_raw_oldsched(opts, f) - } - TaskContext => { - spawn_raw_newsched(opts, f) - } - SchedulerContext => { - fail!("can't spawn from scheduler context") - } - GlobalContext => { - fail!("can't spawn from global context") - } + // A hack to go with the context function. Just do the boolean + // check and let an interesting runtime error occur if it was one + // of the two bad cases. + + if context() == OldTaskContext { + spawn_raw_oldsched(opts, f) + } else { + spawn_raw_newsched(opts, f) } + } fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { use rt::sched::*; - let f = Cell::new(f); - - let mut task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("unsafe borrowed sched"); - - if opts.linked { - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - } else { - // An unlinked task is a new root in the task tree - ~Task::new_root(&mut (*sched).stack_pool, f.take()) - } + let mut task = if opts.linked { + Task::build_child(f) + } else { + // An unlinked task is a new root in the task tree + Task::build_root(f) }; if opts.notify_chan.is_some() { @@ -612,11 +598,9 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { task.on_exit = Some(on_exit); } - rtdebug!("spawn about to take scheduler"); + rtdebug!("spawn calling run_task"); + Scheduler::run_task(task); - let sched = Local::take::(); - rtdebug!("took sched in spawn"); - sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index 66bb46b8991a5..558198fcf5bb1 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -69,9 +69,6 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { _ => { let mut alloc = ::ptr::null(); do Local::borrow:: |task| { - rtdebug!("task pointer: %x, heap pointer: %x", - to_uint(task), - to_uint(&task.heap)); alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; From f1a988eb751f14be90c6a743c5a5bc2c1075c0ea Mon Sep 17 00:00:00 2001 From: toddaaro Date: Fri, 19 Jul 2013 15:57:29 -0700 Subject: [PATCH 2/2] resolved code issues created by the merge with master --- src/libstd/rt/io/net/tcp.rs | 28 ++++++++++++++-------------- src/libstd/rt/io/net/udp.rs | 16 ++++++++-------- src/libstd/rt/local.rs | 3 +-- src/libstd/rt/sched.rs | 27 ++++++--------------------- src/libstd/rt/task.rs | 6 +++--- src/libstd/rt/test.rs | 4 ++-- src/libstd/rt/uv/uvio.rs | 20 +++++++------------- 7 files changed, 41 insertions(+), 63 deletions(-) diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index bcd26e951bab5..2e9b6231e761b 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -172,7 +172,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -180,7 +180,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -212,7 +212,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -220,7 +220,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -254,7 +254,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -264,7 +264,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -305,7 +305,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -322,7 +322,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -360,7 +360,7 @@ mod test { let addr = next_test_ip6(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for max.times { let mut stream = listener.accept(); @@ -370,7 +370,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); @@ -424,13 +424,13 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -445,7 +445,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -501,7 +501,7 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index f3b5278357392..9c392cb4ac2d4 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -120,7 +120,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let mut buf = [0]; @@ -137,7 +137,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => client.sendto([99], server_ip), None => fail!() @@ -152,7 +152,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let mut buf = [0]; @@ -169,7 +169,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => client.sendto([99], server_ip), None => fail!() @@ -184,7 +184,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -202,7 +202,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; @@ -221,7 +221,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -239,7 +239,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 9dbd24effb5df..6865240fc4148 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -56,7 +56,7 @@ impl Local for Scheduler { } fn take() -> ~Scheduler { do Local::borrow:: |task| { - let sched = task.sched.swap_unwrap(); + let sched = task.sched.take_unwrap(); let task = task; task.sched = None; sched @@ -119,7 +119,6 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { - use unstable::run_in_bare_thread; use rt::test::*; use super::*; use rt::task::Task; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 61a20045e4b5e..fe8f8dfcacae1 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -163,7 +163,7 @@ impl Scheduler { stask.destroyed = true; let local_success = !stask.unwinder.unwinding; - let join_latch = stask.join_latch.swap_unwrap(); + let join_latch = stask.join_latch.take_unwrap(); match stask.on_exit { Some(ref on_exit) => { let success = join_latch.wait(local_success); @@ -567,7 +567,7 @@ impl Scheduler { // Trickier - we need to get the scheduler task out of self // and use it as the destination. let mut this = self; - let stask = this.sched_task.swap_unwrap(); + let stask = this.sched_task.take_unwrap(); this.change_task_context(stask, f); } @@ -620,7 +620,7 @@ impl Scheduler { pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - let cleanup_job = self.cleanup_job.swap_unwrap(); + let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } GiveTask(task, f) => (f.to_fn())(self, task) @@ -919,24 +919,6 @@ mod test { fn handle() { use rt::comm::*; - do run_in_bare_thread { - let (port, chan) = oneshot::<()>(); - let port = Cell::new(port); - let chan = Cell::new(chan); - - let _thread_one = do Thread::start { - do run_in_newsched_task_core { - chan.take().send(()); - } - }; - - let _thread_two = do Thread::start { - do run_in_newsched_task_core { - port.take().recv(); - } - }; - } - do run_in_bare_thread { let (port, chan) = oneshot::<()>(); let port = Cell::new(port); @@ -948,6 +930,8 @@ mod test { let sched = Cell::new(sched); let _thread_one = do Thread::start { + let port = Cell::new(port.take()); + let handle = Cell::new(handle.take()); do run_in_newsched_task_core { port.take().recv(); handle.take(); @@ -955,6 +939,7 @@ mod test { }; let _thread_two = do Thread::start { + let chan = Cell::new(chan.take()); let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); let job: ~fn() = || { chan.take().send(()) }; let mut sched = sched.take(); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 355e799a15482..9f1ced3d3e7e2 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -85,7 +85,7 @@ impl Task { let f = Cell::new(f); let home = Cell::new(home); do Local::borrow:: |running_task| { - let mut sched = running_task.sched.swap_unwrap(); + let mut sched = running_task.sched.take_unwrap(); let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, home.take(), f.take()); @@ -102,7 +102,7 @@ impl Task { let f = Cell::new(f); let home = Cell::new(home); do Local::borrow:: |running_task| { - let mut sched = running_task.sched.swap_unwrap(); + let mut sched = running_task.sched.take_unwrap(); let new_task = ~Task::new_root_homed(&mut sched.stack_pool, home.take(), f.take()); @@ -193,7 +193,7 @@ impl Task { pub fn swap_unwrap_home(&mut self) -> SchedHome { match self.task_type { GreenTask(ref mut home) => { - let out = home.swap_unwrap(); + let out = home.take_unwrap(); return *out; } SchedTask => { diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index a50f0d8158ed5..8260582d762e0 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -16,7 +16,7 @@ use clone::Clone; use container::Container; use iterator::IteratorUtil; use vec::{OwnedVector, MutableVector}; -use super::io::net::ip::{IpAddr, Ipv4, Ipv6}; +use super::io::net::ip::Ipv6; use rt::sched::Scheduler; use super::io::net::ip::{IpAddr, Ipv4}; use unstable::run_in_bare_thread; @@ -207,7 +207,7 @@ pub fn cleanup_task(task: ~Task) { task.destroyed = true; let local_success = !task.unwinder.unwinding; - let join_latch = task.join_latch.swap_unwrap(); + let join_latch = task.join_latch.take_unwrap(); match task.on_exit { Some(ref on_exit) => { let success = join_latch.wait(local_success); diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 4985454777e6e..db1dbd4cbee93 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -387,11 +387,9 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("read: entered scheduler context"); - let mut watcher = watcher; let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every // call to read @@ -429,7 +427,6 @@ impl RtioTcpStream for UvTcpStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -488,11 +485,9 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("recvfrom: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; do self.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| { @@ -523,7 +518,6 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -621,7 +615,7 @@ fn test_simple_udp_server_and_client() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let server_socket = (*io).udp_bind(server_addr).unwrap(); @@ -636,7 +630,7 @@ fn test_simple_udp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let client_socket = (*io).udp_bind(client_addr).unwrap(); @@ -744,7 +738,7 @@ fn test_udp_twice() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let client = (*io).udp_bind(client_addr).unwrap(); @@ -753,7 +747,7 @@ fn test_udp_twice() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let server = (*io).udp_bind(server_addr).unwrap(); @@ -781,7 +775,7 @@ fn test_udp_many_read() { let client_in_addr = next_test_ip4(); static MAX: uint = 500_000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let server_out = (*io).udp_bind(server_out_addr).unwrap(); @@ -804,7 +798,7 @@ fn test_udp_many_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let client_out = (*io).udp_bind(client_out_addr).unwrap();