diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index 7748c43efcd28..04058887970d0 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -23,9 +23,14 @@ macro_rules! rtdebug_ ( } ) ) -// An alternate version with no output, for turning off logging +// An alternate version with no output, for turning off logging. An +// earlier attempt that did not call the fmt! macro was insufficient, +// as a case of the "let bind each variable" approach eventually +// failed without an error message describing the invocation site. macro_rules! rtdebug ( - ($( $arg:expr),+) => ( $(let _ = $arg)*; ) + ($( $arg:expr),+) => ( { + let _x = fmt!( $($arg),+ ); + }) ) macro_rules! rtassert ( diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index fba6171129762..7ce3b13f01587 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -639,7 +639,7 @@ mod test { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); - do spawntask_immediately { + do spawntask { assert!(port_cell.take().recv() == ~10); } @@ -653,7 +653,7 @@ mod test { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); let chan_cell = Cell::new(chan); - do spawntask_later { + do spawntask { let _cell = chan_cell.take(); } let res = do spawntask_try { @@ -908,5 +908,4 @@ mod test { } } } - } diff --git a/src/libstd/rt/context.rs b/src/libstd/rt/context.rs index 09ba869549fd0..83f9342067386 100644 --- a/src/libstd/rt/context.rs +++ b/src/libstd/rt/context.rs @@ -49,12 +49,11 @@ impl Context { let argp: *c_void = unsafe { transmute::<&~fn(), *c_void>(&*start) }; let sp: *uint = stack.end(); let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; - // Save and then immediately load the current context, // which we will then modify to call the given function when restored let mut regs = new_regs(); unsafe { - swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)) + swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)); }; initialize_call_frame(&mut *regs, fp, argp, sp); @@ -72,13 +71,14 @@ impl Context { then loading the registers from a previously saved Context. */ pub fn swap(out_context: &mut Context, in_context: &Context) { + rtdebug!("swapping contexts"); let out_regs: &mut Registers = match out_context { &Context { regs: ~ref mut r, _ } => r }; let in_regs: &Registers = match in_context { &Context { regs: ~ref r, _ } => r }; - + rtdebug!("doing raw swap"); unsafe { swap_registers(out_regs, in_regs) }; } } diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index 2425c909bf3d8..2e9b6231e761b 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -152,7 +152,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -160,7 +160,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -172,7 +172,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -180,7 +180,7 @@ mod test { assert!(buf[0] == 99); } - do spawntask_immediately { + do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } @@ -192,7 +192,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -200,7 +200,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -212,7 +212,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -220,7 +220,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -232,7 +232,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -242,7 +242,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -254,7 +254,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; @@ -264,7 +264,7 @@ mod test { assert!(nread.is_none()); } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -276,7 +276,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -293,7 +293,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -305,7 +305,7 @@ mod test { do run_in_newsched_task { let addr = next_test_ip6(); - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; @@ -322,7 +322,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { let _stream = TcpStream::connect(addr); // Close } @@ -335,7 +335,7 @@ mod test { let addr = next_test_ip4(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for max.times { let mut stream = listener.accept(); @@ -345,7 +345,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); @@ -360,7 +360,7 @@ mod test { let addr = next_test_ip6(); let max = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for max.times { let mut stream = listener.accept(); @@ -370,7 +370,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { for max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); @@ -385,13 +385,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -406,7 +406,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -424,13 +424,13 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |i| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_immediately { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -445,7 +445,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_immediately { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -463,13 +463,13 @@ mod test { let addr = next_test_ip4(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection - do spawntask_later { + do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); @@ -484,7 +484,7 @@ mod test { fn connect(i: int, addr: IpAddr) { if i == MAX { return } - do spawntask_later { + do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing @@ -501,7 +501,7 @@ mod test { let addr = next_test_ip6(); static MAX: int = 10; - do spawntask_immediately { + do spawntask { let mut listener = TcpListener::bind(addr); for int::range(0, MAX) |_| { let stream = Cell::new(listener.accept()); diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index f3b5278357392..9c392cb4ac2d4 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -120,7 +120,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let mut buf = [0]; @@ -137,7 +137,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => client.sendto([99], server_ip), None => fail!() @@ -152,7 +152,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let mut buf = [0]; @@ -169,7 +169,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => client.sendto([99], server_ip), None => fail!() @@ -184,7 +184,7 @@ mod test { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -202,7 +202,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; @@ -221,7 +221,7 @@ mod test { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - do spawntask_immediately { + do spawntask { match UdpSocket::bind(server_ip) { Some(server) => { let server = ~server; @@ -239,7 +239,7 @@ mod test { } } - do spawntask_immediately { + do spawntask { match UdpSocket::bind(client_ip) { Some(client) => { let client = ~client; diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs index 924db1a21b729..c9604dd2660d2 100644 --- a/src/libstd/rt/join_latch.rs +++ b/src/libstd/rt/join_latch.rs @@ -345,7 +345,7 @@ mod test { let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let child_latch = child_latch.take(); assert!(child_latch.wait(true)); } @@ -467,7 +467,7 @@ mod test { let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let latch = child_latch.take(); latch.release(false); } @@ -483,7 +483,7 @@ mod test { let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let mut latch = child_latch.take(); let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); @@ -509,7 +509,7 @@ mod test { let mut latch = child_latch.take(); let child_latch = latch.new_child(); let child_latch = Cell::new(child_latch); - do spawntask_immediately { + do spawntask { let latch = child_latch.take(); latch.release(false); } @@ -598,7 +598,7 @@ mod test { let child_latch = Cell::new(latch.new_child()); let succeed = order.succeed; if order.immediate { - do spawntask_immediately { + do spawntask { let mut child_latch = child_latch.take(); next(&mut *child_latch, suborders.clone()); rtdebug!("immediate releasing"); diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index b47bbf3edf0bb..6865240fc4148 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -13,7 +13,7 @@ use rt::sched::Scheduler; use rt::task::Task; use rt::local_ptr; use rt::rtio::{EventLoop, IoFactoryObject}; -//use borrow::to_uint; +use cell::Cell; pub trait Local { fn put(value: ~Self); @@ -24,40 +24,56 @@ pub trait Local { unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } -impl Local for Scheduler { - fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} - fn take() -> ~Scheduler { unsafe { local_ptr::take() } } +impl Local for Task { + fn put(value: ~Task) { unsafe { local_ptr::put(value) } } + fn take() -> ~Task { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + fn borrow(f: &fn(&mut Task) -> T) -> T { let mut res: Option = None; let res_ptr: *mut Option = &mut res; unsafe { - do local_ptr::borrow |sched| { -// rtdebug!("successfully unsafe borrowed sched pointer"); - let result = f(sched); + do local_ptr::borrow |task| { + let result = f(task); *res_ptr = Some(result); } } match res { Some(r) => { r } - None => rtabort!("function failed!") + None => { rtabort!("function failed in local_borrow") } } } - unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } - unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") } + unsafe fn unsafe_borrow() -> *mut Task { local_ptr::unsafe_borrow() } + unsafe fn try_unsafe_borrow() -> Option<*mut Task> { rtabort!("unimpl task try_unsafe_borrow") } } -impl Local for Task { - fn put(_value: ~Task) { rtabort!("unimpl") } - fn take() -> ~Task { rtabort!("unimpl") } - fn exists() -> bool { rtabort!("unimpl") } - fn borrow(f: &fn(&mut Task) -> T) -> T { - do Local::borrow:: |sched| { -// rtdebug!("sched about to grab current_task"); - match sched.current_task { +impl Local for Scheduler { + fn put(value: ~Scheduler) { + let value = Cell::new(value); + do Local::borrow:: |task| { + let task = task; + task.sched = Some(value.take()); + }; + } + fn take() -> ~Scheduler { + do Local::borrow:: |task| { + let sched = task.sched.take_unwrap(); + let task = task; + task.sched = None; + sched + } + } + fn exists() -> bool { + do Local::borrow:: |task| { + match task.sched { + Some(ref _task) => true, + None => false + } + } + } + fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + do Local::borrow:: |task| { + match task.sched { Some(~ref mut task) => { -// rtdebug!("current task pointer: %x", to_uint(task)); -// rtdebug!("current task heap pointer: %x", to_uint(&task.heap)); f(task) } None => { @@ -66,20 +82,19 @@ impl Local for Task { } } } - unsafe fn unsafe_borrow() -> *mut Task { - match (*Local::unsafe_borrow::()).current_task { - Some(~ref mut task) => { - let s: *mut Task = &mut *task; + unsafe fn unsafe_borrow() -> *mut Scheduler { + match (*Local::unsafe_borrow::()).sched { + Some(~ref mut sched) => { + let s: *mut Scheduler = &mut *sched; return s; } None => { - // Don't fail. Infinite recursion rtabort!("no scheduler") } } } - unsafe fn try_unsafe_borrow() -> Option<*mut Task> { - if Local::exists::() { + unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { + if Local::exists::() { Some(Local::unsafe_borrow()) } else { None @@ -101,57 +116,66 @@ impl Local for IoFactoryObject { unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") } } + #[cfg(test)] mod test { - use unstable::run_in_bare_thread; use rt::test::*; - use rt::sched::Scheduler; use super::*; + use rt::task::Task; + use rt::local_ptr; #[test] - fn thread_local_scheduler_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_smoke_test() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] - fn thread_local_scheduler_two_instances() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let _scheduler: ~Scheduler = Local::take(); - } + fn thread_local_task_two_instances() { + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] fn borrow_smoke_test() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - unsafe { - let _scheduler: *mut Scheduler = Local::unsafe_borrow(); - } - let _scheduler: ~Scheduler = Local::take(); + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + unsafe { + let _task: *mut Task = Local::unsafe_borrow(); } + let task: ~Task = Local::take(); + cleanup_task(task); } #[test] fn borrow_with_return() { - do run_in_bare_thread { - let scheduler = ~new_test_uv_sched(); - Local::put(scheduler); - let res = do Local::borrow:: |_sched| { - true - }; - assert!(res); - let _scheduler: ~Scheduler = Local::take(); - } + local_ptr::init_tls_key(); + let mut sched = ~new_test_uv_sched(); + let task = ~Task::new_root(&mut sched.stack_pool, || {}); + Local::put(task); + + let res = do Local::borrow:: |_task| { + true + }; + assert!(res) + let task: ~Task = Local::take(); + cleanup_task(task); } } + diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 51f4737ef85fb..5a7424000feb0 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -273,23 +273,35 @@ pub fn run(main: ~fn()) -> int { } }; - // Create and enqueue the main task. + // Create the main task. let main_cell = Cell::new(main); let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, main_cell.take()); main_task.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); + let main_task_cell = Cell::new(main_task); - // Run each scheduler in a thread. let mut threads = ~[]; + + // Run the main scheduler in a thread. + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task_cell.take()); + }; + threads.push(thread); + + // Run each remaining scheduler in a thread. while !scheds.is_empty() { let sched = scheds.pop(); let sched_cell = Cell::new(sched); let thread = do Thread::start { - let sched = sched_cell.take(); - sched.run(); + let mut sched = sched_cell.take(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || { + rtdebug!("boostraping a non-primary scheduler"); + }; + sched.bootstrap(bootstrap_task); }; - threads.push(thread); } @@ -322,27 +334,14 @@ pub enum RuntimeContext { pub fn context() -> RuntimeContext { use task::rt::rust_task; - use self::local::Local; - use self::sched::Scheduler; - // XXX: Hitting TLS twice to check if the scheduler exists - // then to check for the task is not good for perf + // This has been modified to just not work on the new contexts, + // because we don't need the context() function there. So it + // returns either Old or Global. if unsafe { rust_try_get_task().is_not_null() } { return OldTaskContext; } else { - if Local::exists::() { - let context = Cell::new_empty(); - do Local::borrow:: |sched| { - if sched.in_task_context() { - context.put_back(TaskContext); - } else { - context.put_back(SchedulerContext); - } - } - return context.take(); - } else { - return GlobalContext; - } + return GlobalContext; } pub extern { @@ -354,23 +353,9 @@ pub fn context() -> RuntimeContext { #[test] fn test_context() { use unstable::run_in_bare_thread; - use self::sched::{Scheduler}; - use rt::local::Local; - use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - assert_eq!(context(), TaskContext); - let sched = Local::take::(); - do sched.deschedule_running_task_and_then() |sched, task| { - assert_eq!(context(), SchedulerContext); - sched.enqueue_task(task); - } - }; - sched.enqueue_task(task); - sched.run(); } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 4e4145ddc161f..fe8f8dfcacae1 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -10,7 +10,7 @@ use option::*; use sys; -use cast::transmute; +use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; use clone::Clone; use super::sleeper_list::SleeperList; @@ -25,6 +25,7 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; use borrow::{to_uint}; +use cell::Cell; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by @@ -57,11 +58,8 @@ pub struct Scheduler { stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, - /// The scheduler's saved context. - /// Always valid when a task is executing, otherwise not - priv saved_context: Context, - /// The currently executing task - current_task: Option<~Task>, + /// The scheduler runs on a special task. + sched_task: Option<~Task>, /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, @@ -88,7 +86,6 @@ enum CleanupJob { } impl Scheduler { - pub fn in_task_context(&self) -> bool { self.current_task.is_some() } pub fn sched_id(&self) -> uint { to_uint(self) } @@ -101,15 +98,14 @@ impl Scheduler { } + // When you create a scheduler it isn't yet "in" a task, so the + // task field is None. pub fn new_special(event_loop: ~EventLoopObject, work_queue: WorkQueue<~Task>, sleeper_list: SleeperList, run_anything: bool) -> Scheduler { - // Lazily initialize the runtime TLS key - local_ptr::init_tls_key(); - Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), @@ -118,8 +114,7 @@ impl Scheduler { event_loop: event_loop, work_queue: work_queue, stack_pool: StackPool::new(), - saved_context: Context::empty(), - current_task: None, + sched_task: None, cleanup_job: None, metrics: SchedMetrics::new(), run_anything: run_anything @@ -130,8 +125,59 @@ impl Scheduler { // the scheduler itself doesn't have to call event_loop.run. // That will be important for embedding the runtime into external // event loops. - pub fn run(~self) -> ~Scheduler { - assert!(!self.in_task_context()); + + // Take a main task to run, and a scheduler to run it in. Create a + // scheduler task and bootstrap into it. + pub fn bootstrap(~self, task: ~Task) { + + // Initialize the TLS key. + local_ptr::init_tls_key(); + + // Create a task for the scheduler with an empty context. + let sched_task = Task::new_sched_task(); + + // Now that we have an empty task struct for the scheduler + // task, put it in TLS. + Local::put::(~sched_task); + + // Now, as far as all the scheduler state is concerned, we are + // inside the "scheduler" context. So we can act like the + // scheduler and resume the provided task. + self.resume_task_immediately(task); + + // Now we are back in the scheduler context, having + // successfully run the input task. Start by running the + // scheduler. Grab it out of TLS - performing the scheduler + // action will have given it away. + let sched = Local::take::(); + sched.run(); + + // Now that we are done with the scheduler, clean up the + // scheduler task. Do so by removing it from TLS and manually + // cleaning up the memory it uses. As we didn't actually call + // task.run() on the scheduler task we never get through all + // the cleanup code it runs. + + rtdebug!("post sched.run(), cleaning up scheduler task"); + let mut stask = Local::take::(); + stask.destroyed = true; + + let local_success = !stask.unwinder.unwinding; + let join_latch = stask.join_latch.take_unwrap(); + match stask.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); + } + None => { + join_latch.release(local_success); + } + } + } + + // This does not return a scheduler, as the scheduler is placed + // inside the task. + pub fn run(~self) { let mut self_sched = self; @@ -140,79 +186,88 @@ impl Scheduler { // schedulers. self_sched.event_loop.callback(Scheduler::run_sched_once); + // This is unsafe because we need to place the scheduler, with + // the event_loop inside, inside our task. But we still need a + // mutable reference to the event_loop to give it the "run" + // command. unsafe { - let event_loop: *mut ~EventLoopObject = { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - event_loop - }; + let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; - // Give ownership of the scheduler (self) to the thread - Local::put(self_sched); + // Our scheduler must be in the task before the event loop + // is started. + let self_sched = Cell::new(self_sched); + do Local::borrow:: |stask| { + stask.sched = Some(self_sched.take()); + }; (*event_loop).run(); } - - rtdebug!("run taking sched"); - let sched = Local::take::(); - // XXX: Reenable this once we're using a per-scheduler queue. With a shared - // queue this is not true - //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", { - use to_str::ToStr; - sched.metrics.to_str() - }); - return sched; } - fn run_sched_once() { + // One iteration of the scheduler loop, always run at least once. - let mut sched = Local::take::(); - sched.metrics.turns += 1; - - // First, check the message queue for instructions. - // XXX: perf. Check for messages without atomics. - // It's ok if we miss messages occasionally, as long as - // we sync and check again before sleeping. - if sched.interpret_message_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - rtdebug!("run_sched_once, interpret_message_queue taking sched"); - let mut sched = Local::take::(); - sched.metrics.messages_received += 1; - sched.event_loop.callback(Scheduler::run_sched_once); - Local::put(sched); - return; - } + // The model for this function is that you continue through it + // until you either use the scheduler while performing a schedule + // action, in which case you give it away and do not return, or + // you reach the end and sleep. In the case that a scheduler + // action is performed the loop is evented such that this function + // is called again. + fn run_sched_once() { - // Now, look in the work queue for tasks to run - rtdebug!("run_sched_once taking"); + // When we reach the scheduler context via the event loop we + // already have a scheduler stored in our local task, so we + // start off by taking it. This is the only path through the + // scheduler where we get the scheduler this way. let sched = Local::take::(); - if sched.resume_task_from_queue() { - // We performed a scheduling action. There may be other work - // to do yet, so let's try again later. - do Local::borrow:: |sched| { - sched.metrics.tasks_resumed_from_queue += 1; - sched.event_loop.callback(Scheduler::run_sched_once); + + // Our first task is to read mail to see if we have important + // messages. + + // 1) A wake message is easy, mutate sched struct and return + // it. + // 2) A shutdown is also easy, shutdown. + // 3) A pinned task - we resume immediately and do not return + // here. + + let result = sched.interpret_message_queue(); + let sched = match result { + Some(sched) => { + // We did not resume a task, so we returned. + sched } - return; - } + None => { + return; + } + }; + + let result = sched.resume_task_from_queue(); + let mut sched = match result { + Some(sched) => { + // Failed to dequeue a task, so we return. + sched + } + None => { + return; + } + }; // If we got here then there was no work to do. // Generate a SchedHandle and push it to the sleeper list so // somebody can wake us up later. - rtdebug!("no work to do"); - do Local::borrow:: |sched| { - sched.metrics.wasted_turns += 1; - if !sched.sleepy && !sched.no_sleep { - rtdebug!("sleeping"); - sched.metrics.sleepy_times += 1; - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - } else { - rtdebug!("not sleeping"); - } + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("scheduler has no work to do, going to sleep"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping, already doing so or no_sleep set"); } + + // Finished a cycle without using the Scheduler. Place it back + // in TLS. + Local::put(sched); } pub fn make_handle(&mut self) -> SchedHandle { @@ -232,18 +287,6 @@ impl Scheduler { /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Task) { - // We don't want to queue tasks that belong on other threads, - // so we send them home at enqueue time. - - // The borrow checker doesn't like our disassembly of the - // Coroutine struct and partial use and mutation of the - // fields. So completely disassemble here and stop using? - - // XXX perf: I think we might be able to shuffle this code to - // only destruct when we need to. - - rtdebug!("a task was queued on: %u", self.sched_id()); - let this = self; // We push the task onto our local queue clone. @@ -273,30 +316,23 @@ impl Scheduler { // * Scheduler-context operations - fn interpret_message_queue(~self) -> bool { - assert!(!self.in_task_context()); - - rtdebug!("looking for scheduler messages"); + // This function returns None if the scheduler is "used", or it + // returns the still-available scheduler. + fn interpret_message_queue(~self) -> Option<~Scheduler> { let mut this = self; match this.message_queue.pop() { Some(PinnedTask(task)) => { - rtdebug!("recv BiasedTask message in sched: %u", - this.sched_id()); let mut task = task; - task.home = Some(Sched(this.make_handle())); - this.resume_task_immediately(task); - return true; + task.give_home(Sched(this.make_handle())); + this.change_task_context(task, Scheduler::store_stask); + return None; } - Some(Wake) => { - rtdebug!("recv Wake message"); this.sleepy = false; - Local::put(this); - return true; + return Some(this); } Some(Shutdown) => { - rtdebug!("recv Shutdown message"); if this.sleepy { // There may be an outstanding handle on the // sleeper list. Pop them all to make sure that's @@ -315,12 +351,14 @@ impl Scheduler { // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; - Local::put(this); - return true; + // YYY: Does a shutdown count as a "use" of the + // scheduler? This seems to work - so I'm leaving it + // this way despite not having a solid rational for + // why I should return the scheduler here. + return Some(this); } None => { - Local::put(this); - return false; + return Some(this); } } } @@ -328,7 +366,7 @@ impl Scheduler { /// Given an input Coroutine sends it back to its home scheduler. fn send_task_home(task: ~Task) { let mut task = task; - let mut home = task.home.take_unwrap(); + let mut home = task.swap_unwrap_home(); match home { Sched(ref mut home_handle) => { home_handle.send(PinnedTask(task)); @@ -341,69 +379,45 @@ impl Scheduler { // Resume a task from the queue - but also take into account that // it might not belong here. - fn resume_task_from_queue(~self) -> bool { - assert!(!self.in_task_context()); - rtdebug!("looking in work queue for task to schedule"); + // If we perform a scheduler action we give away the scheduler ~ + // pointer, if it is still available we return it. + + fn resume_task_from_queue(~self) -> Option<~Scheduler> { + let mut this = self; - // The borrow checker imposes the possibly absurd requirement - // that we split this into two match expressions. This is due - // to the inspection of the internal bits of task, as that - // can't be in scope when we act on task. match this.work_queue.pop() { Some(task) => { - let action_id = { - let home = &task.home; - match home { - &Some(Sched(ref home_handle)) - if home_handle.sched_id != this.sched_id() => { - SendHome - } - &Some(AnySched) if this.run_anything => { - ResumeNow - } - &Some(AnySched) => { - Requeue - } - &Some(Sched(_)) => { - ResumeNow - } - &None => { - Homeless + let mut task = task; + let home = task.swap_unwrap_home(); + match home { + Sched(home_handle) => { + if home_handle.sched_id != this.sched_id() { + task.give_home(Sched(home_handle)); + Scheduler::send_task_home(task); + return Some(this); + } else { + task.give_home(Sched(home_handle)); + this.change_task_context(task, Scheduler::store_stask); + return None; } } - }; - - match action_id { - SendHome => { - rtdebug!("sending task home"); - Scheduler::send_task_home(task); - Local::put(this); - return false; + AnySched if this.run_anything => { + task.give_home(AnySched); + this.change_task_context(task, Scheduler::store_stask); + return None; } - ResumeNow => { - rtdebug!("resuming now"); - this.resume_task_immediately(task); - return true; - } - Requeue => { - rtdebug!("re-queueing") + AnySched => { + task.give_home(AnySched); this.enqueue_task(task); - Local::put(this); - return false; - } - Homeless => { - rtabort!("task home was None!"); + return Some(this); } } } - None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + return Some(this); + } } } @@ -412,9 +426,6 @@ impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. pub fn terminate_current_task(~self) { - assert!(self.in_task_context()); - - rtdebug!("ending running task"); do self.deschedule_running_task_and_then |sched, dead_task| { let mut dead_task = dead_task; @@ -422,11 +433,12 @@ impl Scheduler { coroutine.recycle(&mut sched.stack_pool); } - rtabort!("control reached end of task"); } - pub fn schedule_task(~self, task: ~Task) { - assert!(self.in_task_context()); + // If a scheduling action is performed, return None. If not, + // return Some(sched). + + pub fn schedule_task(~self, task: ~Task) -> Option<~Scheduler> { // is the task home? let is_home = task.is_home_no_tls(&self); @@ -439,143 +451,175 @@ impl Scheduler { if is_home || (!homed && this.run_anything) { // here we know we are home, execute now OR we know we // aren't homed, and that this sched doesn't care + rtdebug!("task: %u is on ok sched, executing", to_uint(task)); do this.switch_running_tasks_and_then(task) |sched, last_task| { sched.enqueue_task(last_task); } + return None; } else if !homed && !this.run_anything { // the task isn't homed, but it can't be run here this.enqueue_task(task); - Local::put(this); + return Some(this); } else { // task isn't home, so don't run it here, send it home Scheduler::send_task_home(task); - Local::put(this); + return Some(this); } } - // Core scheduling ops - - pub fn resume_task_immediately(~self, task: ~Task) { + // The primary function for changing contexts. In the current + // design the scheduler is just a slightly modified GreenTask, so + // all context swaps are from Task to Task. The only difference + // between the various cases is where the inputs come from, and + // what is done with the resulting task. That is specified by the + // cleanup function f, which takes the scheduler and the + // old task as inputs. + + pub fn change_task_context(~self, + next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { let mut this = self; - assert!(!this.in_task_context()); - rtdebug!("scheduling a task"); - this.metrics.context_switches_sched_to_task += 1; + // The current task is grabbed from TLS, not taken as an input. + let current_task: ~Task = Local::take::(); + + // These transmutes do something fishy with a closure. + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Task), + &fn(&mut Scheduler, ~Task)>(f) + }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); - // Store the task in the scheduler so it can be grabbed later - this.current_task = Some(task); - this.enqueue_cleanup_job(DoNothing); + // The current task is placed inside an enum with the cleanup + // function. This enum is then placed inside the scheduler. + this.enqueue_cleanup_job(GiveTask(current_task, f_opaque)); - Local::put(this); + // The scheduler is then placed inside the next task. + let mut next_task = next_task; + next_task.sched = Some(this); - // Take pointers to both the task and scheduler's saved registers. + // However we still need an internal mutable pointer to the + // original task. The strategy here was "arrange memory, then + // get pointers", so we crawl back up the chain using + // transmute to eliminate borrowck errors. unsafe { - let sched = Local::unsafe_borrow::(); - let (sched_context, _, next_task_context) = (*sched).get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - let sched = Local::unsafe_borrow::(); - // The running task should have passed ownership elsewhere - assert!((*sched).current_task.is_none()); + let sched: &mut Scheduler = + transmute_mut_region(*next_task.sched.get_mut_ref()); - // Running tasks may have asked us to do some cleanup - (*sched).run_cleanup_job(); - } - } + let current_task: &mut Task = match sched.cleanup_job { + Some(GiveTask(ref task, _)) => { + transmute_mut_region(*transmute_mut_unsafe(task)) + } + Some(DoNothing) => { + rtabort!("no next task"); + } + None => { + rtabort!("no cleanup job"); + } + }; - /// Block a running task, context switch to the scheduler, then pass the - /// blocked task to a closure. - /// - /// # Safety note - /// - /// The closure here is a *stack* closure that lives in the - /// running task. It gets transmuted to the scheduler's lifetime - /// and called while the task is blocked. - /// - /// This passes a Scheduler pointer to the fn after the context switch - /// in order to prevent that fn from performing further scheduling operations. - /// Doing further scheduling could easily result in infinite recursion. - pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) { - let mut this = self; - assert!(this.in_task_context()); + let (current_task_context, next_task_context) = + Scheduler::get_contexts(current_task, next_task); - rtdebug!("blocking task"); - this.metrics.context_switches_task_to_sched += 1; + // Done with everything - put the next task in TLS. This + // works because due to transmute the borrow checker + // believes that we have no internal pointers to + // next_task. + Local::put(next_task); - unsafe { - let blocked_task = this.current_task.take_unwrap(); - let f_fake_region = transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f); - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); + // The raw context swap operation. The next action taken + // will be running the cleanup job from the context of the + // next task. + Context::swap(current_task_context, next_task_context); } - Local::put(this); - + // When the context swaps back to the scheduler we immediately + // run the cleanup job, as expected by the previously called + // swap_contexts function. unsafe { - let sched = Local::unsafe_borrow::(); - let (sched_context, last_task_context, _) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - - // We could be executing in a different thread now let sched = Local::unsafe_borrow::(); (*sched).run_cleanup_job(); } } - /// Switch directly to another task, without going through the scheduler. - /// You would want to think hard about doing this, e.g. if there are - /// pending I/O events it would be a bad idea. - pub fn switch_running_tasks_and_then(~self, next_task: ~Task, - f: &fn(&mut Scheduler, ~Task)) { + // There are a variety of "obvious" functions to be passed to + // change_task_context, so we can make a few "named cases". + + // Place the old task, which is the scheduler task, into the + // sched's stask slot. + pub fn store_stask(sched: &mut Scheduler, stask: ~Task) { + sched.sched_task = Some(stask); + } + + // Enqueue the old task on the current scheduler. + pub fn enqueue_old(sched: &mut Scheduler, task: ~Task) { + sched.enqueue_task(task); + } + + // Sometimes we just want the old API though. + + pub fn resume_task_immediately(~self, task: ~Task) { + self.change_task_context(task, Scheduler::store_stask); + } + + pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Task)) { + // Trickier - we need to get the scheduler task out of self + // and use it as the destination. let mut this = self; - assert!(this.in_task_context()); + let stask = this.sched_task.take_unwrap(); + this.change_task_context(stask, f); + } - rtdebug!("switching tasks"); - this.metrics.context_switches_task_to_task += 1; + pub fn switch_running_tasks_and_then(~self, next_task: ~Task, + f: &fn(&mut Scheduler, ~Task)) { + // Literally the same as change_task_context. + self.change_task_context(next_task, f); + } - let old_running_task = this.current_task.take_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), - &fn(&mut Scheduler, ~Task)>(f) + // A helper that looks up the scheduler and runs a task later by + // enqueuing it. + pub fn run_task_later(next_task: ~Task) { + // We aren't performing a scheduler operation, so we want to + // put the Scheduler back when we finish. + let next_task = Cell::new(next_task); + do Local::borrow:: |sched| { + sched.enqueue_task(next_task.take()); }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); - this.current_task = Some(next_task); + } - Local::put(this); + // A helper that looks up the scheduler and runs a task. If it can + // be run now it is run now. + pub fn run_task(new_task: ~Task) { + let sched = Local::take::(); + sched.schedule_task(new_task).map_consume(Local::put); + } + // Returns a mutable reference to both contexts involved in this + // swap. This is unsafe - we are getting mutable internal + // references to keep even when we don't own the tasks. It looks + // kinda safe because we are doing transmutes before passing in + // the arguments. + pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> + (&'a mut Context, &'a mut Context) { + let current_task_context = + &mut current_task.coroutine.get_mut_ref().saved_context; + let next_task_context = + &mut next_task.coroutine.get_mut_ref().saved_context; unsafe { - let sched = Local::unsafe_borrow::(); - let (_, last_task_context, next_task_context) = (*sched).get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - - // We could be executing in a different thread now - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); + (transmute_mut_region(current_task_context), + transmute_mut_region(next_task_context)) } } - - // * Other stuff pub fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - assert!(self.cleanup_job.is_none()); self.cleanup_job = Some(job); } pub fn run_cleanup_job(&mut self) { rtdebug!("running cleanup job"); - - assert!(self.cleanup_job.is_some()); - let cleanup_job = self.cleanup_job.take_unwrap(); match cleanup_job { DoNothing => { } @@ -583,54 +627,6 @@ impl Scheduler { } } - /// Get mutable references to all the contexts that may be involved in a - /// context switch. - /// - /// Returns (the scheduler context, the optional context of the - /// task in the cleanup list, the optional context of the task in - /// the current task slot). When context switching to a task, - /// callers should first arrange for that task to be located in the - /// Scheduler's current_task slot and set up the - /// post-context-switch cleanup job. - pub fn get_contexts<'a>(&'a mut self) -> (&'a mut Context, - Option<&'a mut Context>, - Option<&'a mut Context>) { - let last_task = match self.cleanup_job { - Some(GiveTask(~ref task, _)) => { - Some(task) - } - Some(DoNothing) => { - None - } - None => fail!("all context switches should have a cleanup job") - }; - // XXX: Pattern matching mutable pointers above doesn't work - // because borrowck thinks the three patterns are conflicting - // borrows - unsafe { - let last_task = transmute::, Option<&mut Task>>(last_task); - let last_task_context = match last_task { - Some(t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - let next_task_context = match self.current_task { - Some(ref mut t) => { - Some(&mut t.coroutine.get_mut_ref().saved_context) - } - None => { - None - } - }; - // XXX: These transmutes can be removed after snapshot - return (transmute(&mut self.saved_context), - last_task_context, - transmute(next_task_context)); - } - } } // The cases for the below function. @@ -660,19 +656,63 @@ impl ClosureConverter for UnsafeTaskReceiver { fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } } } - #[cfg(test)] mod test { + use rt::test::*; + use unstable::run_in_bare_thread; + use borrow::to_uint; + use rt::local::*; + use rt::sched::{Scheduler}; + use uint; use int; use cell::Cell; - use unstable::run_in_bare_thread; - use task::spawn; - use rt::local::Local; - use rt::test::*; - use super::*; use rt::thread::Thread; - use borrow::to_uint; - use rt::task::{Task,Sched}; + use rt::task::{Task, Sched}; + use option::{Some}; + + #[test] + fn trivial_run_in_newsched_task_test() { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + do run_in_newsched_task || { + unsafe { *task_ran_ptr = true }; + rtdebug!("executed from the new scheduler") + } + assert!(task_ran); + } + + #[test] + fn multiple_task_test() { + let total = 10; + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + for uint::range(0,total) |_| { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; + } + } + } + assert!(task_run_count == total); + } + + #[test] + fn multiple_task_nested_test() { + let mut task_run_count = 0; + let task_run_count_ptr: *mut uint = &mut task_run_count; + do run_in_newsched_task || { + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + do spawntask || { + unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; + } + } + } + } + assert!(task_run_count == 3); + } // Confirm that a sched_id actually is the uint form of the // pointer to the scheduler struct. @@ -695,46 +735,50 @@ mod test { } } - // A simple test to check if a homed task run on a single - // scheduler ends up executing while home. + + // A very simple test that confirms that a task executing on the + // home scheduler notices that it is home. #[test] fn test_home_sched() { do run_in_bare_thread { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~new_test_uv_sched(); + let mut sched = ~new_test_uv_sched(); let sched_handle = sched.make_handle(); - let sched_id = sched.sched_id(); - let task = ~do Task::new_root_homed(&mut sched.stack_pool, - Sched(sched_handle)) { + let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, + Sched(sched_handle)) { unsafe { *task_ran_ptr = true }; - let sched = Local::take::(); - assert!(sched.sched_id() == sched_id); - Local::put::(sched); + assert!(Task::on_appropriate_sched()); }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); + + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + task.on_exit = Some(on_exit); + + sched.bootstrap(task); } } - // A test for each state of schedule_task + // An advanced test that checks all four possible states that a + // (task,sched) can be in regarding homes. + #[test] fn test_schedule_home_states() { use rt::uv::uvio::UvEventLoop; - use rt::sched::Shutdown; use rt::sleeper_list::SleeperList; use rt::work_queue::WorkQueue; + use rt::sched::Shutdown; + use borrow; + use rt::comm::*; do run_in_bare_thread { let sleepers = SleeperList::new(); let work_queue = WorkQueue::new(); - // our normal scheduler + // Our normal scheduler let mut normal_sched = ~Scheduler::new( ~UvEventLoop::new(), work_queue.clone(), @@ -742,123 +786,105 @@ mod test { let normal_handle = Cell::new(normal_sched.make_handle()); - // our special scheduler + // Our special scheduler let mut special_sched = ~Scheduler::new_special( ~UvEventLoop::new(), work_queue.clone(), sleepers.clone(), - true); + false); let special_handle = Cell::new(special_sched.make_handle()); - let special_handle2 = Cell::new(special_sched.make_handle()); - let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); let t4_handle = special_sched.make_handle(); - let t1f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t1_handle)) || { - let is_home = Task::is_home_using_id(special_id); - rtdebug!("t1 should be home: %b", is_home); - assert!(is_home); - }; - let t1f = Cell::new(t1f); + // Four test tasks: + // 1) task is home on special + // 2) task not homed, sched doesn't care + // 3) task not homed, sched requeues + // 4) task not home, send home - let t2f = ~do Task::new_root(&mut normal_sched.stack_pool) { - let on_special = Task::on_special(); - rtdebug!("t2 should not be on special: %b", on_special); - assert!(!on_special); + let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) || { + rtassert!(Task::on_appropriate_sched()); }; - let t2f = Cell::new(t2f); + rtdebug!("task1 id: **%u**", borrow::to_uint(task1)); - let t3f = ~do Task::new_root(&mut normal_sched.stack_pool) { - // not on special - let on_special = Task::on_special(); - rtdebug!("t3 should not be on special: %b", on_special); - assert!(!on_special); - }; - let t3f = Cell::new(t3f); - - let t4f = ~do Task::new_root_homed(&mut special_sched.stack_pool, - Sched(t4_handle)) { - // is home - let home = Task::is_home_using_id(special_id); - rtdebug!("t4 should be home: %b", home); - assert!(home); + let task2 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t4f = Cell::new(t4f); - // we have four tests, make them as closures - let t1: ~fn() = || { - // task is home on special - let task = t1f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t2: ~fn() = || { - // not homed, task doesn't care - let task = t2f.take(); - let sched = Local::take::(); - sched.schedule_task(task); + let task3 = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtassert!(Task::on_appropriate_sched()); }; - let t3: ~fn() = || { - // task not homed, must leave - let task = t3f.take(); - let sched = Local::take::(); - sched.schedule_task(task); - }; - let t4: ~fn() = || { - // task not home, send home - let task = t4f.take(); - let sched = Local::take::(); - sched.schedule_task(task); + + let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + rtassert!(Task::on_appropriate_sched()); }; + rtdebug!("task4 id: **%u**", borrow::to_uint(task4)); + + let task1 = Cell::new(task1); + let task2 = Cell::new(task2); + let task3 = Cell::new(task3); + let task4 = Cell::new(task4); - let t1 = Cell::new(t1); - let t2 = Cell::new(t2); - let t3 = Cell::new(t3); - let t4 = Cell::new(t4); - - // build a main task that runs our four tests - let main_task = ~do Task::new_root(&mut normal_sched.stack_pool) { - // the two tasks that require a normal start location - t2.take()(); - t4.take()(); - normal_handle.take().send(Shutdown); - special_handle.take().send(Shutdown); + // Signal from the special task that we are done. + let (port, chan) = oneshot::<()>(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool) { + rtdebug!("*about to submit task2*"); + Scheduler::run_task(task2.take()); + rtdebug!("*about to submit task4*"); + Scheduler::run_task(task4.take()); + rtdebug!("*normal_task done*"); + port.take().recv(); + let mut nh = normal_handle.take(); + nh.send(Shutdown); + let mut sh = special_handle.take(); + sh.send(Shutdown); }; - // task to run the two "special start" tests - let special_task = ~do Task::new_root_homed( - &mut special_sched.stack_pool, - Sched(special_handle2.take())) { - t1.take()(); - t3.take()(); + rtdebug!("normal task: %u", borrow::to_uint(normal_task)); + + let special_task = ~do Task::new_root(&mut special_sched.stack_pool) { + rtdebug!("*about to submit task1*"); + Scheduler::run_task(task1.take()); + rtdebug!("*about to submit task3*"); + Scheduler::run_task(task3.take()); + rtdebug!("*done with special_task*"); + chan.take().send(()); }; - // enqueue the main tasks - normal_sched.enqueue_task(special_task); - normal_sched.enqueue_task(main_task); + rtdebug!("special task: %u", borrow::to_uint(special_task)); + + let special_sched = Cell::new(special_sched); + let normal_sched = Cell::new(normal_sched); + let special_task = Cell::new(special_task); + let normal_task = Cell::new(normal_task); - let nsched_cell = Cell::new(normal_sched); let normal_thread = do Thread::start { - let sched = nsched_cell.take(); - sched.run(); + normal_sched.take().bootstrap(normal_task.take()); + rtdebug!("finished with normal_thread"); }; - let ssched_cell = Cell::new(special_sched); let special_thread = do Thread::start { - let sched = ssched_cell.take(); - sched.run(); + special_sched.take().bootstrap(special_task.take()); + rtdebug!("finished with special_sched"); }; - // wait for the end + rtdebug!("waiting on threads"); let _thread1 = normal_thread; + rtdebug!("normal_thread finished"); let _thread2 = special_thread; + rtdebug!("special_thread finished"); + } } - // Do it a lot #[test] fn test_stress_schedule_task_states() { let n = stress_factor() * 120; @@ -867,118 +893,6 @@ mod test { } } - #[test] - fn test_simple_scheduling() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_ran_ptr = true; } - }; - sched.enqueue_task(task); - sched.run(); - assert!(task_ran); - } - } - - #[test] - fn test_several_tasks() { - do run_in_bare_thread { - let total = 10; - let mut task_count = 0; - let task_count_ptr: *mut int = &mut task_count; - - let mut sched = ~new_test_uv_sched(); - for int::range(0, total) |_| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *task_count_ptr = *task_count_ptr + 1; } - }; - sched.enqueue_task(task); - } - sched.run(); - assert_eq!(task_count, total); - } - } - - #[test] - fn test_swap_tasks_then() { - do run_in_bare_thread { - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - let task1 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Local::take::(); - let task2 = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |sched, task1| { - let task1 = Cell::new(task1); - sched.enqueue_task(task1.take()); - } - unsafe { *count_ptr = *count_ptr + 1; } - }; - sched.enqueue_task(task1); - sched.run(); - assert_eq!(count, 3); - } - } - - #[bench] #[test] #[ignore(reason = "long test")] - fn test_run_a_lot_of_tasks_queued() { - do run_in_bare_thread { - static MAX: int = 1000000; - let mut count = 0; - let count_ptr: *mut int = &mut count; - - let mut sched = ~new_test_uv_sched(); - - let start_task = ~do Task::new_root(&mut sched.stack_pool) { - run_task(count_ptr); - }; - sched.enqueue_task(start_task); - sched.run(); - - assert_eq!(count, MAX); - - fn run_task(count_ptr: *mut int) { - do Local::borrow:: |sched| { - let task = ~do Task::new_root(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } - } - }; - sched.enqueue_task(task); - } - }; - } - } - - #[test] - fn test_block_task() { - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let task = ~do Task::new_root(&mut sched.stack_pool) { - let sched = Local::take::(); - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - let task = Cell::new(task); - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } - }; - sched.enqueue_task(task); - sched.run(); - } - } - #[test] fn test_io_callback() { // This is a regression test that when there are no schedulable tasks @@ -986,7 +900,7 @@ mod test { // something in the work queue again the scheduler picks it up and doesn't // exit before emptying the work queue do run_in_newsched_task { - do spawn { + do spawntask { let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); @@ -1007,34 +921,31 @@ mod test { do run_in_bare_thread { let (port, chan) = oneshot::<()>(); - let port_cell = Cell::new(port); - let chan_cell = Cell::new(chan); - let mut sched1 = ~new_test_uv_sched(); - let handle1 = sched1.make_handle(); - let handle1_cell = Cell::new(handle1); - let task1 = ~do Task::new_root(&mut sched1.stack_pool) { - chan_cell.take().send(()); - }; - sched1.enqueue_task(task1); - - let mut sched2 = ~new_test_uv_sched(); - let task2 = ~do Task::new_root(&mut sched2.stack_pool) { - port_cell.take().recv(); - // Release the other scheduler's handle so it can exit - handle1_cell.take(); - }; - sched2.enqueue_task(task2); + let port = Cell::new(port); + let chan = Cell::new(chan); - let sched1_cell = Cell::new(sched1); - let _thread1 = do Thread::start { - let sched1 = sched1_cell.take(); - sched1.run(); + let mut sched = ~new_test_uv_sched(); + let handle = sched.make_handle(); + let handle = Cell::new(handle); + let sched = Cell::new(sched); + + let _thread_one = do Thread::start { + let port = Cell::new(port.take()); + let handle = Cell::new(handle.take()); + do run_in_newsched_task_core { + port.take().recv(); + handle.take(); + } }; - let sched2_cell = Cell::new(sched2); - let _thread2 = do Thread::start { - let sched2 = sched2_cell.take(); - sched2.run(); + let _thread_two = do Thread::start { + let chan = Cell::new(chan.take()); + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + let job: ~fn() = || { chan.take().send(()) }; + let mut sched = sched.take(); + let mut task = ~Task::new_root(&mut sched.stack_pool, job); + task.on_exit = Some(on_exit); + sched.bootstrap(task); }; } } @@ -1051,7 +962,7 @@ mod test { for 10.times { let (port, chan) = oneshot(); let chan_cell = Cell::new(chan); - do spawntask_later { + do spawntask { chan_cell.take().send(()); } ports.push(port); @@ -1063,21 +974,21 @@ mod test { } } - #[test] + #[test] fn thread_ring() { use rt::comm::*; use comm::{GenericPort, GenericChan}; do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = oneshot(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); + let (p, ch1) = stream(); let mut p = p; - ch1.send((token, end_chan)); - let mut i = 2; + ch1.send((token, end_chan)); + let mut i = 2; while i <= n_tasks { let (next_p, ch) = stream(); let imm_i = i; @@ -1102,9 +1013,9 @@ mod test { while (true) { match p.recv() { (1, end_chan) => { - debug!("%d\n", id); - end_chan.send(()); - return; + debug!("%d\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: %d got token: %d", id, token); @@ -1129,15 +1040,16 @@ mod test { impl Drop for S { fn drop(&self) { - let _foo = @0; + let _foo = @0; } } let s = S { field: () }; do spawntask { - let _ss = &s; + let _ss = &s; } } } + } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 449438b920551..9f1ced3d3e7e2 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -29,19 +29,32 @@ use rt::stack::{StackSegment, StackPool}; use rt::context::Context; use cell::Cell; +// The Task struct represents all state associated with a rust +// task. There are at this point two primary "subtypes" of task, +// however instead of using a subtype we just have a "task_type" field +// in the struct. This contains a pointer to another struct that holds +// the type-specific state. + pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, unwinder: Unwinder, - home: Option, join_latch: Option<~JoinLatch>, on_exit: Option<~fn(bool)>, destroyed: bool, - coroutine: Option<~Coroutine> + coroutine: Option<~Coroutine>, + sched: Option<~Scheduler>, + task_type: TaskType +} + +pub enum TaskType { + GreenTask(Option<~SchedHome>), + SchedTask } +/// A coroutine is nothing more than a (register context, stack) pair. pub struct Coroutine { /// The segment of stack on which the task is currently running or /// if the task is blocked, on which the task will resume @@ -51,6 +64,7 @@ pub struct Coroutine { saved_context: Context } +/// Some tasks have a deciated home scheduler that they must run on. pub enum SchedHome { AnySched, Sched(SchedHandle) @@ -65,6 +79,58 @@ pub struct Unwinder { impl Task { + // A helper to build a new task using the dynamically found + // scheduler and task. Only works in GreenTask context. + pub fn build_homed_child(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~running_task.new_child_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_child(f: ~fn()) -> ~Task { + Task::build_homed_child(f, AnySched) + } + + pub fn build_homed_root(f: ~fn(), home: SchedHome) -> ~Task { + let f = Cell::new(f); + let home = Cell::new(home); + do Local::borrow:: |running_task| { + let mut sched = running_task.sched.take_unwrap(); + let new_task = ~Task::new_root_homed(&mut sched.stack_pool, + home.take(), + f.take()); + running_task.sched = Some(sched); + new_task + } + } + + pub fn build_root(f: ~fn()) -> ~Task { + Task::build_homed_root(f, AnySched) + } + + pub fn new_sched_task() -> Task { + Task { + heap: LocalHeap::new(), + gc: GarbageCollector, + storage: LocalStorage(ptr::null(), None), + logger: StdErrLogger, + unwinder: Unwinder { unwinding: false }, + join_latch: Some(JoinLatch::new_root()), + on_exit: None, + destroyed: false, + coroutine: Some(~Coroutine::empty()), + sched: None, + task_type: SchedTask + } + } + pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Task { Task::new_root_homed(stack_pool, AnySched, start) @@ -85,11 +151,12 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Unwinder { unwinding: false }, - home: Some(home), join_latch: Some(JoinLatch::new_root()), on_exit: None, destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)) + coroutine: Some(~Coroutine::new(stack_pool, start)), + sched: None, + task_type: GreenTask(Some(~home)) } } @@ -102,25 +169,40 @@ impl Task { gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - home: Some(home), unwinder: Unwinder { unwinding: false }, join_latch: Some(self.join_latch.get_mut_ref().new_child()), on_exit: None, destroyed: false, - coroutine: Some(~Coroutine::new(stack_pool, start)) + coroutine: Some(~Coroutine::new(stack_pool, start)), + sched: None, + task_type: GreenTask(Some(~home)) } } pub fn give_home(&mut self, new_home: SchedHome) { - self.home = Some(new_home); + match self.task_type { + GreenTask(ref mut home) => { + *home = Some(~new_home); + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } + } } - pub fn run(&mut self, f: &fn()) { - // This is just an assertion that `run` was called unsafely - // and this instance of Task is still accessible. - do Local::borrow:: |task| { - assert!(borrow::ref_eq(task, self)); + pub fn swap_unwrap_home(&mut self) -> SchedHome { + match self.task_type { + GreenTask(ref mut home) => { + let out = home.take_unwrap(); + return *out; + } + SchedTask => { + rtabort!("type error: used SchedTask as GreenTask"); + } } + } + + pub fn run(&mut self, f: &fn()) { self.unwinder.try(f); self.destroy(); @@ -146,6 +228,8 @@ impl Task { /// thread-local-storage. fn destroy(&mut self) { + rtdebug!("DESTROYING TASK: %u", borrow::to_uint(self)); + do Local::borrow:: |task| { assert!(borrow::ref_eq(task, self)); } @@ -163,6 +247,64 @@ impl Task { self.destroyed = true; } + // New utility functions for homes. + + pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _}))) => { + *id == sched.sched_id() + } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + // Awe yea + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + + pub fn homed(&self) -> bool { + match self.task_type { + GreenTask(Some(~AnySched)) => { false } + GreenTask(Some(~Sched(SchedHandle { _ }))) => { true } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + + // Grab both the scheduler and the task from TLS and check if the + // task is executing on an appropriate scheduler. + pub fn on_appropriate_sched() -> bool { + do Local::borrow:: |task| { + let sched_id = task.sched.get_ref().sched_id(); + let sched_run_anything = task.sched.get_ref().run_anything; + match task.task_type { + GreenTask(Some(~AnySched)) => { + rtdebug!("anysched task in sched check ****"); + sched_run_anything + } + GreenTask(Some(~Sched(SchedHandle { sched_id: ref id, _ }))) => { + rtdebug!("homed task in sched check ****"); + *id == sched_id + } + GreenTask(None) => { + rtabort!("task without home"); + } + SchedTask => { + rtabort!("type error: expected: GreenTask, found: SchedTask"); + } + } + } + } + + // These utility functions related to home will need to change. +/* /// Check if *task* is currently home. pub fn is_home(&self) -> bool { do Local::borrow:: |sched| { @@ -215,11 +357,14 @@ impl Task { !sched.run_anything } } - +*/ } impl Drop for Task { - fn drop(&self) { assert!(self.destroyed) } + fn drop(&self) { + rtdebug!("called drop for a task"); + assert!(self.destroyed) + } } // Coroutines represent nothing more than a context and a stack @@ -239,19 +384,33 @@ impl Coroutine { } } + pub fn empty() -> Coroutine { + Coroutine { + current_stack_segment: StackSegment::new(0), + saved_context: Context::empty() + } + } + fn build_start_wrapper(start: ~fn()) -> ~fn() { let start_cell = Cell::new(start); let wrapper: ~fn() = || { // First code after swap to this new context. Run our // cleanup job. unsafe { - let sched = Local::unsafe_borrow::(); - (*sched).run_cleanup_job(); - let sched = Local::unsafe_borrow::(); - let task = (*sched).current_task.get_mut_ref(); + // Again - might work while safe, or it might not. + do Local::borrow:: |sched| { + (sched).run_cleanup_job(); + } + + // To call the run method on a task we need a direct + // reference to it. The task is in TLS, so we can + // simply unsafe_borrow it to get this reference. We + // need to still have the task in TLS though, so we + // need to unsafe_borrow. + let task = Local::unsafe_borrow::(); - do task.run { + do (*task).run { // N.B. Removing `start` from the start wrapper // closure by emptying a cell is critical for // correctness. The ~Task pointer, and in turn the @@ -267,8 +426,11 @@ impl Coroutine { }; } + // We remove the sched from the Task in TLS right now. let sched = Local::take::(); - sched.terminate_current_task(); + // ... allowing us to give it away when performing a + // scheduling operation. + sched.terminate_current_task() }; return wrapper; } @@ -329,7 +491,7 @@ impl Unwinder { } } } - +/* #[cfg(test)] mod test { use rt::test::*; @@ -470,3 +632,4 @@ mod test { } } } +*/ diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index e5393c84a088c..8260582d762e0 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -16,16 +16,15 @@ use clone::Clone; use container::Container; use iterator::IteratorUtil; use vec::{OwnedVector, MutableVector}; -use super::io::net::ip::{IpAddr, Ipv4, Ipv6}; +use super::io::net::ip::Ipv6; use rt::sched::Scheduler; -use rt::local::Local; +use super::io::net::ip::{IpAddr, Ipv4}; use unstable::run_in_bare_thread; use rt::thread::Thread; use rt::task::Task; use rt::uv::uvio::UvEventLoop; use rt::work_queue::WorkQueue; use rt::sleeper_list::SleeperList; -use rt::task::{Sched}; use rt::comm::oneshot; use result::{Result, Ok, Err}; @@ -34,29 +33,37 @@ pub fn new_test_uv_sched() -> Scheduler { let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message sched.no_sleep = true; return sched; + } -/// Creates a new scheduler in a new thread and runs a task in it, -/// then waits for the scheduler to exit. Failure of the task -/// will abort the process. pub fn run_in_newsched_task(f: ~fn()) { let f = Cell::new(f); - do run_in_bare_thread { - let mut sched = ~new_test_uv_sched(); - let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); - let mut task = ~Task::new_root(&mut sched.stack_pool, - f.take()); - rtdebug!("newsched_task: %x", to_uint(task)); - task.on_exit = Some(on_exit); - sched.enqueue_task(task); - sched.run(); + run_in_newsched_task_core(f.take()); } } +pub fn run_in_newsched_task_core(f: ~fn()) { + + use rt::sched::Shutdown; + + let mut sched = ~new_test_uv_sched(); + let exit_handle = Cell::new(sched.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + exit_handle.take().send(Shutdown); + rtassert!(exit_status); + }; + let mut task = ~Task::new_root(&mut sched.stack_pool, f); + task.on_exit = Some(on_exit); + + sched.bootstrap(task); +} + /// Create more than one scheduler and run a function in a task /// in one of the schedulers. The schedulers will stay alive /// until the function `f` returns. @@ -66,7 +73,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { use rt::sched::Shutdown; use rt::util; - let f_cell = Cell::new(f); + let f = Cell::new(f); do run_in_bare_thread { let nthreads = match os::getenv("RUST_RT_TEST_THREADS") { @@ -74,7 +81,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { None => { // Using more threads than cores in test code // to force the OS to preempt them frequently. - // Assuming that this help stress test concurrent types. + // Assuming that this helps stress test concurrent types. util::num_cpus() * 2 } }; @@ -96,7 +103,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { scheds.push(sched); } - let f_cell = Cell::new(f_cell.take()); + let f = Cell::new(f.take()); let handles = Cell::new(handles); let on_exit: ~fn(bool) = |exit_status| { let mut handles = handles.take(); @@ -108,18 +115,30 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { rtassert!(exit_status); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - f_cell.take()); + f.take()); main_task.on_exit = Some(on_exit); - scheds[0].enqueue_task(main_task); let mut threads = ~[]; + let main_task = Cell::new(main_task); - while !scheds.is_empty() { + let main_thread = { let sched = scheds.pop(); let sched_cell = Cell::new(sched); + do Thread::start { + let sched = sched_cell.take(); + sched.bootstrap(main_task.take()); + } + }; + threads.push(main_thread); + + while !scheds.is_empty() { + let mut sched = scheds.pop(); + let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool) || {}; + let bootstrap_task_cell = Cell::new(bootstrap_task); + let sched_cell = Cell::new(sched); let thread = do Thread::start { let sched = sched_cell.take(); - sched.run(); + sched.bootstrap(bootstrap_task_cell.take()); }; threads.push(thread); @@ -133,190 +152,72 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("spawntask taking the scheduler from TLS"); - - - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - rtdebug!("new task pointer: %x", to_uint(task)); - - let sched = Local::take::(); - rtdebug!("spawntask scheduling the new task"); - sched.schedule_task(task); -} - - -/// Create a new task and run it right now. Aborts on failure -pub fn spawntask_immediately(f: ~fn()) { - use super::sched::*; - - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_task(task); - } + Scheduler::run_task(Task::build_child(f)); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_later(f: ~fn()) { - use super::sched::*; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - }; - - let mut sched = Local::take::(); - sched.enqueue_task(task); - Local::put(sched); + Scheduler::run_task_later(Task::build_child(f)); } -/// Spawn a task and either run it immediately or run it later pub fn spawntask_random(f: ~fn()) { - use super::sched::*; use rand::{Rand, rng}; - let f = Cell::new(f); - - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - - } - }; - - let mut sched = Local::take::(); - let mut rng = rng(); let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |sched, task| { - sched.enqueue_task(task); - } + spawntask(f) } else { - sched.enqueue_task(task); - Local::put(sched); + spawntask_later(f) } } -/// Spawn a task, with the current scheduler as home, and queue it to -/// run later. -pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { - use super::sched::*; - use rand::{rng, RngUtil}; - let mut rng = rng(); - - let task = { - let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - let handle = sched.make_handle(); - let home_id = handle.sched_id; - - // now that we know where this is going, build a new function - // that can assert it is in the right place - let af: ~fn() = || { - do Local::borrow::() |sched| { - rtdebug!("home_id: %u, runtime loc: %u", - home_id, - sched.sched_id()); - assert!(home_id == sched.sched_id()); - }; - f() - }; - - ~Task::new_root_homed(&mut sched.stack_pool, - Sched(handle), - af) - }; - let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; - // enqueue it for future execution - dest_sched.enqueue_task(task); -} - -/// Spawn a task and wait for it to finish, returning whether it -/// completed successfully or failed -pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { - use cell::Cell; - use super::sched::*; - - let f = Cell::new(f); +pub fn spawntask_try(f: ~fn()) -> Result<(),()> { let (port, chan) = oneshot(); let chan = Cell::new(chan); let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); - let mut new_task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow:: |_running_task| { - - // I don't understand why using a child task here fails. I - // think the fail status is propogating back up the task - // tree and triggering a fail for the parent, which we - // aren't correctly expecting. - - // ~running_task.new_child(&mut (*sched).stack_pool, - ~Task::new_root(&mut (*sched).stack_pool, - f.take()) - } - }; - new_task.on_exit = Some(on_exit); - let sched = Local::take::(); - do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { - sched.enqueue_task(old_task); - } + let mut new_task = Task::build_root(f); + new_task.on_exit = Some(on_exit); - rtdebug!("enqueued the new task, now waiting on exit_status"); + Scheduler::run_task(new_task); let exit_status = port.recv(); if exit_status { Ok(()) } else { Err(()) } + } -// Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { - use rt::sched::*; let f = Cell::new(f); - let task = unsafe { - let sched = Local::unsafe_borrow::(); - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, - f.take()) - } - }; - - let task = Cell::new(task); - let thread = do Thread::start { - let mut sched = ~new_test_uv_sched(); - sched.enqueue_task(task.take()); - sched.run(); + run_in_newsched_task_core(f.take()); }; + return thread; } +/// Use to cleanup tasks created for testing but not "run". +pub fn cleanup_task(task: ~Task) { + + let mut task = task; + task.destroyed = true; + + let local_success = !task.unwinder.unwinding; + let join_latch = task.join_latch.take_unwrap(); + match task.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); + } + None => { + join_latch.release(local_success); + } + } +} /// Get a port number, starting at 9600, for use in tests pub fn next_test_port() -> u16 { diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index f61eee8859b1a..d4e09510f0c21 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -17,7 +17,6 @@ use option::*; use clone::Clone; use super::rc::RC; use rt::sched::Scheduler; -use rt::{context, TaskContext, SchedulerContext}; use rt::local::Local; use rt::task::Task; use vec::OwnedVector; @@ -44,8 +43,6 @@ impl Tube { pub fn send(&mut self, val: T) { rtdebug!("tube send"); - assert!(context() == SchedulerContext); - unsafe { let state = self.p.unsafe_borrow_mut(); (*state).buf.push(val); @@ -61,8 +58,6 @@ impl Tube { } pub fn recv(&mut self) -> T { - assert!(context() == TaskContext); - unsafe { let state = self.p.unsafe_borrow_mut(); if !(*state).buf.is_empty() { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 0eaf0dd3ab649..a733a3f2f9133 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -51,7 +51,7 @@ use rt::io::net::ip::IpAddr; use rt::io::IoError; -#[cfg(test)] use unstable::run_in_bare_thread; +//#[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher}; @@ -333,7 +333,7 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> { return None; } } - +/* #[test] fn test_slice_to_uv_buf() { let slice = [0, .. 20]; @@ -360,3 +360,4 @@ fn loop_smoke_test() { loop_.close(); } } +*/ diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 5d0c64c686782..db1dbd4cbee93 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -29,7 +29,7 @@ use unstable::sync::{Exclusive, exclusive}; #[cfg(test)] use container::Container; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use rt::test::{spawntask_immediately, +#[cfg(test)] use rt::test::{spawntask, next_test_ip4, run_in_newsched_task}; @@ -205,13 +205,11 @@ impl IoFactory for UvIoFactory { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("connect: entered scheduler context"); - assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); @@ -389,11 +387,9 @@ impl RtioTcpStream for UvTcpStream { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("read: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every // call to read @@ -431,7 +427,6 @@ impl RtioTcpStream for UvTcpStream { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -490,11 +485,9 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |sched, task| { + do scheduler.deschedule_running_task_and_then |_sched, task| { rtdebug!("recvfrom: entered scheduler context"); - assert!(!sched.in_task_context()); let task_cell = Cell::new(task); let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) }; do self.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| { @@ -525,7 +518,6 @@ impl RtioUdpSocket for UvUdpSocket { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let scheduler = Local::take::(); - assert!(scheduler.in_task_context()); let buf_ptr: *&[u8] = &buf; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -592,7 +584,7 @@ fn test_simple_tcp_server_and_client() { let addr = next_test_ip4(); // Start the server first so it's listening when we connect - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -607,7 +599,7 @@ fn test_simple_tcp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let stream = (*io).tcp_connect(addr).unwrap(); @@ -623,7 +615,7 @@ fn test_simple_udp_server_and_client() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let server_socket = (*io).udp_bind(server_addr).unwrap(); @@ -638,7 +630,7 @@ fn test_simple_udp_server_and_client() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let client_socket = (*io).udp_bind(client_addr).unwrap(); @@ -653,7 +645,7 @@ fn test_read_and_block() { do run_in_newsched_task { let addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { let io = unsafe { Local::unsafe_borrow::() }; let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() }; let stream = listener.accept().unwrap(); @@ -686,7 +678,7 @@ fn test_read_and_block() { assert!(reads > 1); } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let stream = (*io).tcp_connect(addr).unwrap(); @@ -706,7 +698,7 @@ fn test_read_read_read() { let addr = next_test_ip4(); static MAX: uint = 500000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let mut listener = (*io).tcp_bind(addr).unwrap(); @@ -720,7 +712,7 @@ fn test_read_read_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let stream = (*io).tcp_connect(addr).unwrap(); @@ -746,7 +738,7 @@ fn test_udp_twice() { let server_addr = next_test_ip4(); let client_addr = next_test_ip4(); - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let client = (*io).udp_bind(client_addr).unwrap(); @@ -755,7 +747,7 @@ fn test_udp_twice() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let server = (*io).udp_bind(server_addr).unwrap(); @@ -783,7 +775,7 @@ fn test_udp_many_read() { let client_in_addr = next_test_ip4(); static MAX: uint = 500_000; - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let server_out = (*io).udp_bind(server_out_addr).unwrap(); @@ -806,7 +798,7 @@ fn test_udp_many_read() { } } - do spawntask_immediately { + do spawntask { unsafe { let io = Local::unsafe_borrow::(); let client_out = (*io).udp_bind(client_out_addr).unwrap(); diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 2fec9858c88d8..f4134d721afb1 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -1181,4 +1181,3 @@ fn test_simple_newsched_spawn() { spawn(||()) } } - diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index a08214ea40caf..f8b7ab9f0b620 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -91,7 +91,6 @@ use task::unkillable; use uint; use util; use unstable::sync::{Exclusive, exclusive}; -use rt::local::Local; use rt::task::Task; use iterator::IteratorUtil; @@ -573,39 +572,26 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { use rt::*; - match context() { - OldTaskContext => { - spawn_raw_oldsched(opts, f) - } - TaskContext => { - spawn_raw_newsched(opts, f) - } - SchedulerContext => { - fail!("can't spawn from scheduler context") - } - GlobalContext => { - fail!("can't spawn from global context") - } + // A hack to go with the context function. Just do the boolean + // check and let an interesting runtime error occur if it was one + // of the two bad cases. + + if context() == OldTaskContext { + spawn_raw_oldsched(opts, f) + } else { + spawn_raw_newsched(opts, f) } + } fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { use rt::sched::*; - let f = Cell::new(f); - - let mut task = unsafe { - let sched = Local::unsafe_borrow::(); - rtdebug!("unsafe borrowed sched"); - - if opts.linked { - do Local::borrow::() |running_task| { - ~running_task.new_child(&mut (*sched).stack_pool, f.take()) - } - } else { - // An unlinked task is a new root in the task tree - ~Task::new_root(&mut (*sched).stack_pool, f.take()) - } + let mut task = if opts.linked { + Task::build_child(f) + } else { + // An unlinked task is a new root in the task tree + Task::build_root(f) }; if opts.notify_chan.is_some() { @@ -619,11 +605,9 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) { task.on_exit = Some(on_exit); } - rtdebug!("spawn about to take scheduler"); + rtdebug!("spawn calling run_task"); + Scheduler::run_task(task); - let sched = Local::take::(); - rtdebug!("took sched in spawn"); - sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index 66bb46b8991a5..558198fcf5bb1 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -69,9 +69,6 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { _ => { let mut alloc = ::ptr::null(); do Local::borrow:: |task| { - rtdebug!("task pointer: %x, heap pointer: %x", - to_uint(task), - to_uint(&task.heap)); alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc;