diff --git a/src/comp/back/upcall.rs b/src/comp/back/upcall.rs index b12ceba8f0dd6..aa50b75e8b7d9 100644 --- a/src/comp/back/upcall.rs +++ b/src/comp/back/upcall.rs @@ -57,6 +57,8 @@ type upcalls = vec_append: ValueRef, get_type_desc: ValueRef, new_task: ValueRef, + take_task: ValueRef, + drop_task: ValueRef, start_task: ValueRef, ivec_resize: ValueRef, ivec_spill: ValueRef, @@ -129,6 +131,8 @@ fn declare_upcalls(tn: type_names, tydesc_type: TypeRef, ~[T_ptr(T_nil()), T_size_t(), T_size_t(), T_size_t(), T_ptr(T_ptr(tydesc_type))], T_ptr(tydesc_type)), new_task: d("new_task", ~[T_ptr(T_str())], taskptr_type), + take_task: dv("take_task", ~[taskptr_type]), + drop_task: dv("drop_task", ~[taskptr_type]), start_task: d("start_task", ~[taskptr_type, T_int(), T_int(), T_size_t()], taskptr_type), diff --git a/src/comp/middle/trans.rs b/src/comp/middle/trans.rs index 1475887ae4d05..6b1a091adc368 100644 --- a/src/comp/middle/trans.rs +++ b/src/comp/middle/trans.rs @@ -1221,7 +1221,13 @@ fn make_copy_glue(cx: &@block_ctxt, v: ValueRef, t: &ty::t) { // NB: v is an *alias* of type t here, not a direct value. let bcx; - if ty::type_is_boxed(bcx_tcx(cx), t) { + + if ty::type_is_task(bcx_tcx(cx), t) { + let task_ptr = cx.build.Load(v); + cx.build.Call(bcx_ccx(cx).upcalls.take_task, + ~[cx.fcx.lltaskptr, task_ptr]); + bcx = cx; + } else if ty::type_is_boxed(bcx_tcx(cx), t) { bcx = incr_refcnt_of_boxed(cx, cx.build.Load(v)).bcx; } else if (ty::type_is_structural(bcx_tcx(cx), t)) { bcx = duplicate_heap_parts_if_necessary(cx, v, t).bcx; @@ -1381,7 +1387,12 @@ fn make_drop_glue(cx: &@block_ctxt, v0: ValueRef, t: &ty::t) { ty::ty_box(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_port(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } ty::ty_chan(_) { decr_refcnt_maybe_free(cx, v0, v0, t) } - ty::ty_task. { decr_refcnt_maybe_free(cx, v0, v0, t) } + ty::ty_task. { + let task_ptr = cx.build.Load(v0); + {bcx: cx, + val: cx.build.Call(bcx_ccx(cx).upcalls.drop_task, + ~[cx.fcx.lltaskptr, task_ptr])} + } ty::ty_obj(_) { let box_cell = cx.build.GEP(v0, ~[C_int(0), C_int(abi::obj_field_box)]); diff --git a/src/comp/middle/ty.rs b/src/comp/middle/ty.rs index 674e5c9b9cbad..cbf8062a8146b 100644 --- a/src/comp/middle/ty.rs +++ b/src/comp/middle/ty.rs @@ -161,6 +161,7 @@ export type_is_bot; export type_is_box; export type_is_boxed; export type_is_chan; +export type_is_task; export type_is_fp; export type_is_integral; export type_is_native; @@ -839,6 +840,10 @@ fn type_is_chan(cx: &ctxt, ty: &t) -> bool { alt struct(cx, ty) { ty_chan(_) { ret true; } _ { ret false; } } } +fn type_is_task(cx: &ctxt, ty: &t) -> bool { + alt struct(cx, ty) { ty_task. { ret true; } _ { ret false; } } +} + fn type_is_structural(cx: &ctxt, ty: &t) -> bool { alt struct(cx, ty) { ty_rec(_) { ret true; } diff --git a/src/lib/task.rs b/src/lib/task.rs index 5968c456909a1..1936cf2097193 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -8,6 +8,8 @@ native "rust" mod rustrt { fn clone_chan(c: *rust_chan) -> *rust_chan; type rust_chan; + + fn set_min_stack(stack_size: uint); } /** @@ -40,6 +42,10 @@ fn send[T](c: chan[T], v: &T) { c <| v; } fn recv[T](p: port[T]) -> T { let v; p |> v; v } +fn set_min_stack(stack_size : uint) { + rustrt::set_min_stack(stack_size); +} + // Local Variables: // mode: rust; // fill-column: 78; diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index b645a08e5638e..ba098bbee3f20 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -5,7 +5,6 @@ #include "rust_internal.h" circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) : - sched(kernel->sched), kernel(kernel), unit_sz(unit_sz), _buffer_sz(initial_size()), @@ -13,26 +12,26 @@ circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) : _unread(0), _buffer((uint8_t *)kernel->malloc(_buffer_sz, "circular_buffer")) { - A(sched, unit_sz, "Unit size must be larger than zero."); + // A(sched, unit_sz, "Unit size must be larger than zero."); - DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)" - "-> circular_buffer=0x%" PRIxPTR, - _buffer_sz, _unread, this); + // DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)" + // "-> circular_buffer=0x%" PRIxPTR, + // _buffer_sz, _unread, this); - A(sched, _buffer, "Failed to allocate buffer."); + // A(sched, _buffer, "Failed to allocate buffer."); } circular_buffer::~circular_buffer() { - DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this); - I(sched, _buffer); - W(sched, _unread == 0, - "freeing circular_buffer with %d unread bytes", _unread); + // DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this); + // I(sched, _buffer); + // W(sched, _unread == 0, + // "freeing circular_buffer with %d unread bytes", _unread); kernel->free(_buffer); } size_t circular_buffer::initial_size() { - I(sched, unit_sz > 0); + // I(sched, unit_sz > 0); return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz; } @@ -41,8 +40,8 @@ circular_buffer::initial_size() { */ void circular_buffer::transfer(void *dst) { - I(sched, dst); - I(sched, _unread <= _buffer_sz); + // I(sched, dst); + // I(sched, _unread <= _buffer_sz); uint8_t *ptr = (uint8_t *) dst; @@ -54,13 +53,13 @@ circular_buffer::transfer(void *dst) { } else { head_sz = _buffer_sz - _next; } - I(sched, _next + head_sz <= _buffer_sz); + // I(sched, _next + head_sz <= _buffer_sz); memcpy(ptr, _buffer + _next, head_sz); // Then copy any other items from the beginning of the buffer - I(sched, _unread >= head_sz); + // I(sched, _unread >= head_sz); size_t tail_sz = _unread - head_sz; - I(sched, head_sz + tail_sz <= _buffer_sz); + // I(sched, head_sz + tail_sz <= _buffer_sz); memcpy(ptr + head_sz, _buffer, tail_sz); } @@ -70,21 +69,21 @@ circular_buffer::transfer(void *dst) { */ void circular_buffer::enqueue(void *src) { - I(sched, src); - I(sched, _unread <= _buffer_sz); - I(sched, _buffer); + // I(sched, src); + // I(sched, _unread <= _buffer_sz); + // I(sched, _buffer); // Grow if necessary. if (_unread == _buffer_sz) { grow(); } - DLOG(sched, mem, "circular_buffer enqueue " - "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", - _unread, _next, _buffer_sz, unit_sz); + // DLOG(sched, mem, "circular_buffer enqueue " + // "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", + // _unread, _next, _buffer_sz, unit_sz); - I(sched, _unread < _buffer_sz); - I(sched, _unread + unit_sz <= _buffer_sz); + // I(sched, _unread < _buffer_sz); + // I(sched, _unread + unit_sz <= _buffer_sz); // Copy data size_t dst_idx = _next + _unread; @@ -92,15 +91,15 @@ circular_buffer::enqueue(void *src) { if (dst_idx >= _buffer_sz) { dst_idx -= _buffer_sz; - I(sched, _next >= unit_sz); - I(sched, dst_idx <= _next - unit_sz); + // I(sched, _next >= unit_sz); + // I(sched, dst_idx <= _next - unit_sz); } - I(sched, dst_idx + unit_sz <= _buffer_sz); + // I(sched, dst_idx + unit_sz <= _buffer_sz); memcpy(&_buffer[dst_idx], src, unit_sz); _unread += unit_sz; - DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx); + // DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx); } /** @@ -110,21 +109,21 @@ circular_buffer::enqueue(void *src) { */ void circular_buffer::dequeue(void *dst) { - I(sched, unit_sz > 0); - I(sched, _unread >= unit_sz); - I(sched, _unread <= _buffer_sz); - I(sched, _buffer); + // I(sched, unit_sz > 0); + // I(sched, _unread >= unit_sz); + // I(sched, _unread <= _buffer_sz); + // I(sched, _buffer); - DLOG(sched, mem, - "circular_buffer dequeue " - "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", - _unread, _next, _buffer_sz, unit_sz); + // DLOG(sched, mem, + // "circular_buffer dequeue " + // "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d", + // _unread, _next, _buffer_sz, unit_sz); - I(sched, _next + unit_sz <= _buffer_sz); + // I(sched, _next + unit_sz <= _buffer_sz); if (dst != NULL) { memcpy(dst, &_buffer[_next], unit_sz); } - DLOG(sched, mem, "shifted data from index %d", _next); + //DLOG(sched, mem, "shifted data from index %d", _next); _unread -= unit_sz; _next += unit_sz; if (_next == _buffer_sz) { @@ -140,8 +139,9 @@ circular_buffer::dequeue(void *dst) { void circular_buffer::grow() { size_t new_buffer_sz = _buffer_sz * 2; - I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE); - DLOG(sched, mem, "circular_buffer is growing to %d bytes", new_buffer_sz); + // I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE); + // DLOG(sched, mem, "circular_buffer is growing to %d bytes", + // new_buffer_sz); void *new_buffer = kernel->malloc(new_buffer_sz, "new circular_buffer (grow)"); transfer(new_buffer); @@ -154,9 +154,9 @@ circular_buffer::grow() { void circular_buffer::shrink() { size_t new_buffer_sz = _buffer_sz / 2; - I(sched, initial_size() <= new_buffer_sz); - DLOG(sched, mem, "circular_buffer is shrinking to %d bytes", - new_buffer_sz); + // I(sched, initial_size() <= new_buffer_sz); + // DLOG(sched, mem, "circular_buffer is shrinking to %d bytes", + // new_buffer_sz); void *new_buffer = kernel->malloc(new_buffer_sz, "new circular_buffer (shrink)"); transfer(new_buffer); diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp index 6e6515901974b..44924218e570a 100644 --- a/src/rt/memory_region.cpp +++ b/src/rt/memory_region.cpp @@ -4,7 +4,7 @@ // NB: please do not commit code with this uncommented. It's // hugely expensive and should only be used as a last resort. // -// #define TRACK_ALLOCATIONS +#define TRACK_ALLOCATIONS #define MAGIC 0xbadc0ffe diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 029532363c8ef..df1486952ebf3 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -140,14 +140,16 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { update_log_settings(crate_map, getenv("RUST_LOG")); enable_claims(getenv("CHECK_CLAIMS")); + int num_threads = get_num_threads(); rust_srv *srv = new rust_srv(); - rust_kernel *kernel = new rust_kernel(srv); + rust_kernel *kernel = new rust_kernel(srv, num_threads); kernel->start(); - rust_scheduler *sched = kernel->get_scheduler(); + rust_task *root_task = kernel->create_task(NULL, "main"); + rust_scheduler *sched = root_task->sched; command_line_args *args = new (kernel, "main command line args") - command_line_args(sched->root_task, argc, argv); + command_line_args(root_task, argc, argv); DLOG(sched, dom, "startup: %d args in 0x%" PRIxPTR, args->argc, (uintptr_t)args->args); @@ -155,13 +157,11 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) { DLOG(sched, dom, "startup: arg[%d] = '%s'", i, args->argv[i]); } - sched->root_task->start(main_fn, (uintptr_t)args->args); - - int num_threads = get_num_threads(); + root_task->start(main_fn, (uintptr_t)args->args); DLOG(sched, dom, "Using %d worker threads.", num_threads); - int ret = kernel->start_task_threads(num_threads); + int ret = kernel->start_task_threads(); delete args; delete kernel; delete srv; diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index eefc6dac8cebc..4870c9fe54598 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -856,6 +856,13 @@ clone_chan(rust_task *task, rust_chan *chan) { return chan->clone(task); } +// defined in rust_task.cpp +extern size_t g_min_stack_size; +extern "C" CDECL void +set_min_stack(rust_task *task, uintptr_t stack_size) { + g_min_stack_size = stack_size; +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 470be6e6a37a8..d77b196fd35c0 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -13,16 +13,16 @@ rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy *port, if (port) { associate(port); } - DLOG(kernel->sched, comm, "new rust_chan(task=0x%" PRIxPTR + KLOG(kernel, comm, "new rust_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR, (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); } rust_chan::~rust_chan() { - DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")", + KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); - A(kernel->sched, is_associated() == false, + A(kernel, is_associated() == false, "Channel must be disassociated before being freed."); } @@ -33,7 +33,7 @@ void rust_chan::associate(maybe_proxy *port) { this->port = port; if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - DLOG(kernel->sched, task, + KLOG(kernel, task, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, this, port); ++this->ref_count; @@ -51,16 +51,16 @@ bool rust_chan::is_associated() { * Unlink this channel from its associated port. */ void rust_chan::disassociate() { - A(kernel->sched, is_associated(), + A(kernel, is_associated(), "Channel must be associated with a port."); if (port->is_proxy() == false) { scoped_lock with(port->referent()->lock); - DLOG(kernel->sched, task, - "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, - this, port->referent()); + KLOG(kernel, task, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port->referent()); --this->ref_count; - --this->task->ref_count; + task->deref(); this->task = NULL; port->referent()->chans.swap_delete(this); } @@ -73,8 +73,7 @@ void rust_chan::disassociate() { * Attempt to send data to the associated port. */ void rust_chan::send(void *sptr) { - rust_scheduler *sched = kernel->sched; - I(sched, !port->is_proxy()); + I(kernel, !port->is_proxy()); rust_port *target_port = port->referent(); // TODO: We can probably avoid this lock by using atomic operations in @@ -84,12 +83,12 @@ void rust_chan::send(void *sptr) { buffer.enqueue(sptr); if (!is_associated()) { - W(sched, is_associated(), + W(kernel, is_associated(), "rust_chan::transmit with no associated port."); return; } - A(sched, !buffer.is_empty(), + A(kernel, !buffer.is_empty(), "rust_chan::transmit with nothing to send."); if (port->is_proxy()) { @@ -98,7 +97,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { if (target_port->task->blocked_on(target_port)) { - DLOG(sched, comm, "dequeued in rendezvous_ptr"); + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); target_port->task->rendezvous_ptr = 0; target_port->task->wakeup(target_port); @@ -109,22 +108,10 @@ void rust_chan::send(void *sptr) { return; } -rust_chan *rust_chan::clone(maybe_proxy *target) { +rust_chan *rust_chan::clone(rust_task *target) { size_t unit_sz = buffer.unit_sz; maybe_proxy *port = this->port; - rust_task *target_task = NULL; - if (target->is_proxy() == false) { - port = this->port; - target_task = target->referent(); - } else { - rust_handle *handle = - task->sched->kernel->get_port_handle(port->as_referent()); - maybe_proxy *proxy = new rust_proxy (handle); - DLOG(kernel->sched, mem, "new proxy: " PTR, proxy); - port = proxy; - target_task = target->as_proxy()->handle()->referent(); - } - return new (target_task->kernel, "cloned chan") + return new (target->kernel, "cloned chan") rust_chan(kernel, port, unit_sz); } @@ -133,7 +120,7 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { * appear to be live, causing modify-after-free errors. */ void rust_chan::destroy() { - A(kernel->sched, ref_count == 0, + A(kernel, ref_count == 0, "Channel's ref count should be zero."); if (is_associated()) { diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 056d70cebe4d4..68cdd31b3cc92 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -22,7 +22,7 @@ class rust_chan : public kernel_owned, void send(void *sptr); - rust_chan *clone(maybe_proxy *target); + rust_chan *clone(rust_task *target); // Called whenever the channel's ref count drops to zero. void destroy(); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index d15d52ba431cd..8839f7fe6af8b 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -1,44 +1,46 @@ #include "rust_internal.h" -#define KLOG(...) \ - do { \ - if (log_rt_kern >= log_note) { \ - log(log_note, __VA_ARGS__); \ - } \ - } while (0) - -rust_kernel::rust_kernel(rust_srv *srv) : +#define KLOG_(...) \ + KLOG(this, kern, __VA_ARGS__) +#define KLOG_ERR_(field, ...) \ + KLOG_LVL(this, field, log_err, __VA_ARGS__) + +rust_kernel::rust_kernel(rust_srv *srv, size_t num_threads) : _region(srv, true), _log(srv, NULL), - _srv(srv), - _interrupt_kernel_loop(FALSE) + srv(srv), + _interrupt_kernel_loop(FALSE), + num_threads(num_threads), + rval(0), + live_tasks(0) { - sched = create_scheduler("main"); + isaac_init(this, &rctx); + create_schedulers(); } rust_scheduler * -rust_kernel::create_scheduler(const char *name) { +rust_kernel::create_scheduler(int id) { _kernel_lock.lock(); rust_message_queue *message_queue = - new (this, "rust_message_queue") rust_message_queue(_srv, this); - rust_srv *srv = _srv->clone(); + new (this, "rust_message_queue") rust_message_queue(srv, this); + rust_srv *srv = this->srv->clone(); rust_scheduler *sched = new (this, "rust_scheduler") - rust_scheduler(this, message_queue, srv, name); + rust_scheduler(this, message_queue, srv, id); rust_handle *handle = internal_get_sched_handle(sched); message_queue->associate(handle); message_queues.append(message_queue); - KLOG("created scheduler: " PTR ", name: %s, index: %d", - sched, name, sched->list_index); + KLOG_("created scheduler: " PTR ", id: %d, index: %d", + sched, id, sched->list_index); _kernel_lock.signal_all(); _kernel_lock.unlock(); return sched; } void -rust_kernel::destroy_scheduler() { +rust_kernel::destroy_scheduler(rust_scheduler *sched) { _kernel_lock.lock(); - KLOG("deleting scheduler: " PTR ", name: %s, index: %d", + KLOG_("deleting scheduler: " PTR ", name: %s, index: %d", sched, sched->name, sched->list_index); sched->message_queue->disassociate(); rust_srv *srv = sched->srv; @@ -48,6 +50,18 @@ rust_kernel::destroy_scheduler() { _kernel_lock.unlock(); } +void rust_kernel::create_schedulers() { + for(size_t i = 0; i < num_threads; ++i) { + threads.push(create_scheduler(i)); + } +} + +void rust_kernel::destroy_schedulers() { + for(size_t i = 0; i < num_threads; ++i) { + destroy_scheduler(threads[i]); + } +} + rust_handle * rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { rust_handle *handle = NULL; @@ -59,14 +73,6 @@ rust_kernel::internal_get_sched_handle(rust_scheduler *sched) { return handle; } -rust_handle * -rust_kernel::get_sched_handle(rust_scheduler *sched) { - _kernel_lock.lock(); - rust_handle *handle = internal_get_sched_handle(sched); - _kernel_lock.unlock(); - return handle; -} - rust_handle * rust_kernel::get_task_handle(rust_task *task) { _kernel_lock.lock(); @@ -98,7 +104,9 @@ rust_kernel::get_port_handle(rust_port *port) { void rust_kernel::log_all_scheduler_state() { - sched->log_state(); + for(size_t i = 0; i < num_threads; ++i) { + threads[i]->log_state(); + } } /** @@ -156,21 +164,21 @@ rust_kernel::start_kernel_loop() { void rust_kernel::run() { - KLOG("started kernel loop"); + KLOG_("started kernel loop"); start_kernel_loop(); - KLOG("finished kernel loop"); + KLOG_("finished kernel loop"); } void rust_kernel::terminate_kernel_loop() { - KLOG("terminating kernel loop"); + KLOG_("terminating kernel loop"); _interrupt_kernel_loop = true; signal_kernel_lock(); join(); } rust_kernel::~rust_kernel() { - destroy_scheduler(); + destroy_schedulers(); terminate_kernel_loop(); @@ -180,20 +188,20 @@ rust_kernel::~rust_kernel() { // messages. pump_message_queues(); - KLOG("freeing handles"); + KLOG_("freeing handles"); free_handles(_task_handles); - KLOG("..task handles freed"); + KLOG_("..task handles freed"); free_handles(_port_handles); - KLOG("..port handles freed"); + KLOG_("..port handles freed"); free_handles(_sched_handles); - KLOG("..sched handles freed"); + KLOG_("..sched handles freed"); - KLOG("freeing queues"); + KLOG_("freeing queues"); rust_message_queue *queue = NULL; while (message_queues.pop(&queue)) { - K(_srv, queue->is_empty(), "Kernel message queue should be empty " + K(srv, queue->is_empty(), "Kernel message queue should be empty " "before killing the kernel."); delete queue; } @@ -218,7 +226,7 @@ rust_kernel::free_handles(hash_map* > &map) { T* key; rust_handle *value; while (map.pop(&key, &value)) { - KLOG("...freeing " PTR, value); + KLOG_("...freeing " PTR, value); delete value; } } @@ -240,25 +248,30 @@ rust_kernel::signal_kernel_lock() { _kernel_lock.unlock(); } -int rust_kernel::start_task_threads(int num_threads) +int rust_kernel::start_task_threads() { - rust_task_thread *thread = NULL; - - // -1, because this thread will also be a thread. - for(int i = 0; i < num_threads - 1; ++i) { - thread = new rust_task_thread(i + 1, this); + for(size_t i = 0; i < num_threads; ++i) { + rust_scheduler *thread = threads[i]; thread->start(); - threads.push(thread); } - sched->start_main_loop(0); - - while(threads.pop(&thread)) { + for(size_t i = 0; i < num_threads; ++i) { + rust_scheduler *thread = threads[i]; thread->join(); - delete thread; } - return sched->rval; + return rval; +} + +rust_task * +rust_kernel::create_task(rust_task *spawner, const char *name) { + return threads[rand(&rctx) % num_threads]->create_task(spawner, name); +} + +void rust_kernel::wakeup_schedulers() { + for(size_t i = 0; i < num_threads; ++i) { + threads[i]->lock.signal_all(); + } } #ifdef __WIN32__ @@ -273,23 +286,13 @@ rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { NULL, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &buf, 0, NULL ); - DLOG_ERR(sched, dom, "%s failed with error %ld: %s", fn, err, buf); + KLOG_ERR_(dom, "%s failed with error %ld: %s", fn, err, buf); LocalFree((HLOCAL)buf); - I(sched, ok); + I(this, ok); } } #endif -rust_task_thread::rust_task_thread(int id, rust_kernel *owner) - : id(id), owner(owner) -{ -} - -void rust_task_thread::run() -{ - owner->sched->start_main_loop(id); -} - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 6edb0f38dd5f9..cf9d88e00164a 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -45,7 +45,10 @@ class rust_task_thread; class rust_kernel : public rust_thread { memory_region _region; rust_log _log; - rust_srv *_srv; + +public: + rust_srv *srv; +private: /** * Task proxy objects are kernel owned handles to Rust objects. @@ -62,20 +65,29 @@ class rust_kernel : public rust_thread { lock_and_signal _kernel_lock; + const size_t num_threads; + void terminate_kernel_loop(); void pump_message_queues(); rust_handle * internal_get_sched_handle(rust_scheduler *sched); - array_list threads; + array_list threads; - rust_scheduler *create_scheduler(const char *name); - void destroy_scheduler(); + randctx rctx; + + rust_scheduler *create_scheduler(int id); + void destroy_scheduler(rust_scheduler *sched); + + void create_schedulers(); + void destroy_schedulers(); public: - rust_scheduler *sched; - lock_and_signal scheduler_lock; + + int rval; + + volatile int live_tasks; /** * Message queues are kernel objects and are associated with domains. @@ -86,15 +98,15 @@ class rust_kernel : public rust_thread { */ indexed_list message_queues; - rust_handle *get_sched_handle(rust_scheduler *sched); rust_handle *get_task_handle(rust_task *task); rust_handle *get_port_handle(rust_port *port); - rust_kernel(rust_srv *srv); + rust_kernel(rust_srv *srv, size_t num_threads); bool is_deadlocked(); void signal_kernel_lock(); + void wakeup_schedulers(); /** * Notifies the kernel whenever a message has been enqueued . This gives @@ -113,24 +125,13 @@ class rust_kernel : public rust_thread { void *realloc(void *mem, size_t size); void free(void *mem); - // FIXME: this should go away - inline rust_scheduler *get_scheduler() const { return sched; } - - int start_task_threads(int num_threads); + int start_task_threads(); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); #endif -}; - -class rust_task_thread : public rust_thread { - int id; - rust_kernel *owner; - -public: - rust_task_thread(int id, rust_kernel *owner); - virtual void run(); + rust_task *create_task(rust_task *spawner, const char *name); }; #endif /* RUST_KERNEL_H */ diff --git a/src/rt/rust_log.h b/src/rt/rust_log.h index ce0d8f593efce..d14cb022bfdaa 100644 --- a/src/rt/rust_log.h +++ b/src/rt/rust_log.h @@ -23,6 +23,15 @@ const uint32_t log_note = 1; } \ } while (0) +#define KLOG(k, field, ...) \ + KLOG_LVL(k, field, log_note, __VA_ARGS__) +#define KLOG_LVL(k, field, lvl, ...) \ + do { \ + if (log_rt_##field >= lvl) { \ + (k)->log(lvl, __VA_ARGS__); \ + } \ + } while (0) + struct rust_scheduler; struct rust_task; diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 59645d6d5bd9f..f8001a17193b1 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -61,8 +61,8 @@ void notify_message::process() { break; case JOIN: { if (task->dead() == false) { - rust_proxy *proxy = new rust_proxy(_source); - task->tasks_waiting_to_join.append(proxy); + // FIXME: this should be dead code. + assert(false); } else { send(WAKEUP, "wakeup", _target, _source); } diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 18ac66beb5ee3..245ee3d5fce70 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -4,23 +4,23 @@ #include "globals.h" rust_scheduler::rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - const char *name) : + rust_message_queue *message_queue, + rust_srv *srv, + int id) : interrupt_flag(0), _log(srv, this), log_lvl(log_note), srv(srv), - name(name), + // TODO: calculate a per scheduler name. + name("main"), newborn_tasks(this, "newborn"), running_tasks(this, "running"), blocked_tasks(this, "blocked"), dead_tasks(this, "dead"), cache(this), - root_task(NULL), - curr_task(NULL), - rval(0), kernel(kernel), - message_queue(message_queue) + message_queue(message_queue), + id(id) { LOGPTR(this, "new dom", (uintptr_t)this); isaac_init(this, &rctx); @@ -29,7 +29,6 @@ rust_scheduler::rust_scheduler(rust_kernel *kernel, pthread_attr_setstacksize(&attr, 1024 * 1024); pthread_attr_setdetachstate(&attr, true); #endif - root_task = create_task(NULL, name); } rust_scheduler::~rust_scheduler() { @@ -50,9 +49,9 @@ rust_scheduler::activate(rust_task *task) { task->ctx.next = &ctx; DLOG(this, task, "descheduling..."); - kernel->scheduler_lock.unlock(); + lock.unlock(); task->ctx.swap(ctx); - kernel->scheduler_lock.lock(); + lock.lock(); DLOG(this, task, "task has returned"); } @@ -70,8 +69,8 @@ void rust_scheduler::fail() { log(NULL, log_err, "domain %s @0x%" PRIxPTR " root task failed", name, this); - I(this, rval == 0); - rval = 1; + I(this, kernel->rval == 0); + kernel->rval = 1; exit(1); } @@ -85,19 +84,24 @@ rust_scheduler::number_of_live_tasks() { */ void rust_scheduler::reap_dead_tasks(int id) { - I(this, kernel->scheduler_lock.lock_held_by_current_thread()); + I(this, lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; + task->lock.lock(); // Make sure this task isn't still running somewhere else... - if (task->ref_count == 0 && task->can_schedule(id)) { + if (task->can_schedule(id)) { I(this, task->tasks_waiting_to_join.is_empty()); dead_tasks.remove(task); DLOG(this, task, "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); - delete task; + task->lock.unlock(); + task->deref(); + sync::decrement(kernel->live_tasks); + kernel->wakeup_schedulers(); continue; } + task->lock.unlock(); ++i; } } @@ -170,9 +174,8 @@ rust_scheduler::log_state() { if (!dead_tasks.is_empty()) { log(NULL, log_note, "dead tasks:"); for (size_t i = 0; i < dead_tasks.length(); i++) { - log(NULL, log_note, "\t task: %s 0x%" PRIxPTR ", ref_count: %d", - dead_tasks[i]->name, dead_tasks[i], - dead_tasks[i]->ref_count); + log(NULL, log_note, "\t task: %s 0x%" PRIxPTR, + dead_tasks[i]->name, dead_tasks[i]); } } } @@ -183,9 +186,9 @@ rust_scheduler::log_state() { * Returns once no more tasks can be scheduled and all task ref_counts * drop to zero. */ -int -rust_scheduler::start_main_loop(int id) { - kernel->scheduler_lock.lock(); +void +rust_scheduler::start_main_loop() { + lock.lock(); // Make sure someone is watching, to pull us out of infinite loops. // @@ -196,45 +199,38 @@ rust_scheduler::start_main_loop(int id) { DLOG(this, dom, "started domain loop %d", id); - while (number_of_live_tasks() > 0) { + while (kernel->live_tasks > 0) { A(this, kernel->is_deadlocked() == false, "deadlock"); - DLOG(this, dom, "worker %d, number_of_live_tasks = %d", - id, number_of_live_tasks()); + DLOG(this, dom, "worker %d, number_of_live_tasks = %d, total = %d", + id, number_of_live_tasks(), kernel->live_tasks); drain_incoming_message_queue(true); rust_task *scheduled_task = schedule_task(id); - // The scheduler busy waits until a task is available for scheduling. - // Eventually we'll want a smarter way to do this, perhaps sleep - // for a minimum amount of time. - if (scheduled_task == NULL) { log_state(); DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - kernel->scheduler_lock.unlock(); - sync::sleep(100); - kernel->scheduler_lock.lock(); + lock.timed_wait(100000); + reap_dead_tasks(id); DLOG(this, task, - "scheduler resuming ..."); + "scheduler %d resuming ...", id); continue; } I(this, scheduled_task->running()); DLOG(this, task, - "activating task %s 0x%" PRIxPTR - ", sp=0x%" PRIxPTR - ", ref_count=%d" - ", state: %s", - scheduled_task->name, - (uintptr_t)scheduled_task, - scheduled_task->rust_sp, - scheduled_task->ref_count, - scheduled_task->state->name); + "activating task %s 0x%" PRIxPTR + ", sp=0x%" PRIxPTR + ", state: %s", + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->rust_sp, + scheduled_task->state->name); interrupt_flag = 0; @@ -267,19 +263,18 @@ rust_scheduler::start_main_loop(int id) { "scheduler yielding ...", dead_tasks.length()); log_state(); - kernel->scheduler_lock.unlock(); + lock.unlock(); sync::yield(); - kernel->scheduler_lock.lock(); + lock.lock(); } else { drain_incoming_message_queue(true); } reap_dead_tasks(id); } - DLOG(this, dom, "finished main-loop %d (dom.rval = %d)", id, rval); + DLOG(this, dom, "finished main-loop %d", id); - kernel->scheduler_lock.unlock(); - return rval; + lock.unlock(); } rust_crate_cache * @@ -299,9 +294,16 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { task->on_wakeup(spawner->_on_wakeup); } newborn_tasks.append(task); + + sync::increment(kernel->live_tasks); + return task; } +void rust_scheduler::run() { + this->start_main_loop(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index b1b9d51db5682..9289883ab1e6b 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -27,7 +27,8 @@ rust_crate_cache }; struct rust_scheduler : public kernel_owned, - rc_base + rc_base, + rust_thread { // Fields known to the compiler: uintptr_t interrupt_flag; @@ -46,9 +47,6 @@ struct rust_scheduler : public kernel_owned, rust_crate_cache cache; randctx rctx; - rust_task *root_task; - rust_task *curr_task; - int rval; rust_kernel *kernel; int32_t list_index; @@ -59,6 +57,10 @@ struct rust_scheduler : public kernel_owned, // Incoming messages from other domains. rust_message_queue *message_queue; + const int id; + + lock_and_signal lock; + #ifndef __WIN32__ pthread_attr_t attr; #endif @@ -66,8 +68,8 @@ struct rust_scheduler : public kernel_owned, // Only a pointer to 'name' is kept, so it must live as long as this // domain. rust_scheduler(rust_kernel *kernel, - rust_message_queue *message_queue, rust_srv *srv, - const char *name); + rust_message_queue *message_queue, rust_srv *srv, + int id); ~rust_scheduler(); void activate(rust_task *task); void log(rust_task *task, uint32_t level, char const *fmt, ...); @@ -82,11 +84,19 @@ struct rust_scheduler : public kernel_owned, void reap_dead_tasks(int id); rust_task *schedule_task(int id); - int start_main_loop(int id); + void start_main_loop(); void log_state(); rust_task *create_task(rust_task *spawner, const char *name); + + virtual void run(); + +#ifdef __WIN32__ + inline void win32_require(LPCTSTR fn, BOOL ok) { + kernel->win32_require(fn, ok); + } +#endif }; inline rust_log & diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index af55208582e69..a144879cc040b 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -14,6 +14,7 @@ // FIXME (issue #151): This should be 0x300; the change here is for // practicality's sake until stack growth is working. +size_t g_min_stack_size = 0x300000; static size_t get_min_stk_size() { char *stack_size = getenv("RUST_MIN_STACK"); @@ -21,7 +22,7 @@ static size_t get_min_stk_size() { return strtol(stack_size, NULL, 0); } else { - return 0x300000; + return g_min_stack_size; } } @@ -63,7 +64,7 @@ size_t const callee_save_fp = 0; rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, rust_task *spawner, const char *name) : - maybe_proxy(this), + ref_count(1), stk(NULL), runtime_sp(0), rust_sp(0), @@ -83,17 +84,14 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, pinned_on(-1), local_region(&sched->srv->local_region), _on_wakeup(NULL), - failed(false) + failed(false), + propagate_failure(true) { LOGPTR(sched, "new task", (uintptr_t)this); DLOG(sched, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); stk = new_stk(this, 0); rust_sp = stk->limit; - - if (spawner == NULL) { - ref_count = 0; - } } rust_task::~rust_task() @@ -103,8 +101,8 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - I(sched, ref_count == 0 || - (ref_count == 1 && this == sched->root_task)); + I(sched, ref_count == 0); // || + // (ref_count == 1 && this == sched->root_task)); del_stk(this, stk); } @@ -129,10 +127,6 @@ void task_start_wrapper(spawn_args *a) LOG(task, task, "task exited with value %d", rval); - - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->sched, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); task->die(); task->lock.lock(); task->notify_tasks_waiting_to_join(); @@ -165,6 +159,7 @@ rust_task::start(uintptr_t spawnee_fn, yield_timer.reset_us(0); transition(&sched->newborn_tasks, &sched->running_tasks); + sched->lock.signal(); } void @@ -207,9 +202,11 @@ rust_task::kill() { // Unblock the task so it can unwind. unblock(); - if (this == sched->root_task) + if (NULL == supervisor && propagate_failure) sched->fail(); + sched->lock.signal(); + LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); } @@ -229,7 +226,7 @@ rust_task::fail() { supervisor->kill(); } // FIXME: implement unwinding again. - if (this == sched->root_task) + if (NULL == supervisor && propagate_failure) sched->fail(); failed = true; } @@ -250,6 +247,7 @@ rust_task::unsupervise() " disconnecting from supervisor %s @0x%" PRIxPTR, name, this, supervisor->name, supervisor); supervisor = NULL; + propagate_failure = false; } void @@ -257,17 +255,10 @@ rust_task::notify_tasks_waiting_to_join() { while (tasks_waiting_to_join.is_empty() == false) { LOG(this, task, "notify_tasks_waiting_to_join: %d", tasks_waiting_to_join.size()); - maybe_proxy *waiting_task = 0; + rust_task *waiting_task = 0; tasks_waiting_to_join.pop(&waiting_task); - if (waiting_task->is_proxy()) { - notify_message::send(notify_message::WAKEUP, "wakeup", - get_handle(), waiting_task->as_proxy()->handle()); - delete waiting_task; - } else { - rust_task *task = waiting_task->referent(); - if (task->blocked() == true) { - task->wakeup(this); - } + if (waiting_task->blocked() == true) { + waiting_task->wakeup(this); } } } @@ -399,8 +390,8 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - I(sched, !kernel->scheduler_lock.lock_held_by_current_thread()); - scoped_lock with(kernel->scheduler_lock); + I(sched, !sched->lock.lock_held_by_current_thread()); + scoped_lock with(sched->lock); DLOG(sched, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); @@ -439,12 +430,15 @@ rust_task::wakeup(rust_cond *from) { if(_on_wakeup) { _on_wakeup->on_wakeup(); } + + sched->lock.signal(); } void rust_task::die() { scoped_lock with(lock); transition(&sched->running_tasks, &sched->dead_tasks); + sched->lock.signal(); } void diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 13b5537d5d9bb..8b55c0028a972 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -34,10 +34,19 @@ struct gc_alloc { } }; + struct -rust_task : public maybe_proxy, - public kernel_owned +rust_task : public kernel_owned, rust_cond { + // This block could be pulled out into something like a + // RUST_ATOMIC_REFCOUNTED macro. +private: + intptr_t ref_count; +public: + void ref() { sync::increment(ref_count); } + void deref() { if(0 == sync::decrement(ref_count)) { delete this; } } + + // Fields known to the compiler. stk_seg *stk; uintptr_t runtime_sp; // Runtime sp while task running. @@ -69,7 +78,7 @@ rust_task : public maybe_proxy, uintptr_t* rendezvous_ptr; // List of tasks waiting for this task to finish. - array_list *> tasks_waiting_to_join; + array_list tasks_waiting_to_join; rust_handle *handle; @@ -91,6 +100,7 @@ rust_task : public maybe_proxy, // Indicates that the task ended in failure bool failed; + bool propagate_failure; lock_and_signal lock; @@ -123,9 +133,6 @@ rust_task : public maybe_proxy, void die(); void unblock(); - void check_active() { I(sched, sched->curr_task == this); } - void check_suspended() { I(sched, sched->curr_task != this); } - // Print a backtrace, if the "bt" logging option is on. void backtrace(); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 8313399130ca9..794bbc9c2443a 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -6,12 +6,10 @@ #define LOG_UPCALL_ENTRY(task) \ LOG(task, upcall, \ "> UPCALL %s - task: %s 0x%" PRIxPTR \ - " retpc: x%" PRIxPTR \ - " ref_count: %d", \ + " retpc: x%" PRIxPTR, \ __FUNCTION__, \ (task)->name, (task), \ - __builtin_return_address(0), \ - (task->ref_count)); + __builtin_return_address(0)); #else #define LOG_UPCALL_ENTRY(task) \ LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \ @@ -114,8 +112,8 @@ upcall_del_port(rust_task *task, rust_port *port) { I(task->sched, !port->ref_count); delete port; - // FIXME: We shouldn't ever directly manipulate the ref count. - --task->ref_count; + // FIXME: this should happen in the port. + task->deref(); } /** @@ -162,7 +160,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { * has its own copy of the channel. */ extern "C" CDECL rust_chan * -upcall_clone_chan(rust_task *task, maybe_proxy *target, +upcall_clone_chan(rust_task *task, rust_task *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); return chan->clone(target); @@ -247,18 +245,10 @@ upcall_fail(rust_task *task, * Called whenever a task's ref count drops to zero. */ extern "C" CDECL void -upcall_kill(rust_task *task, maybe_proxy *target) { +upcall_kill(rust_task *task, rust_task *target) { LOG_UPCALL_ENTRY(task); - if (target->is_proxy()) { - notify_message:: - send(notify_message::KILL, "kill", task->get_handle(), - target->as_proxy()->handle()); - // The proxy ref_count dropped to zero, delete it here. - delete target->as_proxy(); - } else { - target->referent()->kill(); - } + target->kill(); } /** @@ -267,9 +257,6 @@ upcall_kill(rust_task *task, maybe_proxy *target) { extern "C" CDECL void upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); - LOG(task, task, "task ref_count: %d", task->ref_count); - A(task->sched, task->ref_count >= 0, - "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); @@ -541,12 +528,29 @@ extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, rust_vec *name) { // name is a rust string structure. LOG_UPCALL_ENTRY(spawner); - scoped_lock with(spawner->kernel->scheduler_lock); - rust_scheduler *sched = spawner->sched; - rust_task *task = sched->create_task(spawner, (const char *)name->data); + scoped_lock with(spawner->sched->lock); + rust_task *task = + spawner->kernel->create_task(spawner, (const char *)name->data); + task->ref(); return task; } +extern "C" CDECL void +upcall_take_task(rust_task *task, rust_task *target) { + LOG_UPCALL_ENTRY(task); + if(target) { + target->ref(); + } +} + +extern "C" CDECL void +upcall_drop_task(rust_task *task, rust_task *target) { + LOG_UPCALL_ENTRY(task); + if(target) { + target->deref(); + } +} + extern "C" CDECL rust_task * upcall_start_task(rust_task *spawner, rust_task *task, @@ -584,7 +588,7 @@ upcall_ivec_resize_shared(rust_task *task, rust_ivec *v, size_t newsz) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->kernel->scheduler_lock); + scoped_lock with(task->sched->lock); I(task->sched, !v->fill); size_t new_alloc = next_power_of_two(newsz); @@ -604,7 +608,7 @@ upcall_ivec_spill_shared(rust_task *task, rust_ivec *v, size_t newsz) { LOG_UPCALL_ENTRY(task); - scoped_lock with(task->kernel->scheduler_lock); + scoped_lock with(task->sched->lock); size_t new_alloc = next_power_of_two(newsz); rust_ivec_heap *heap_part = (rust_ivec_heap *) diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index e1644e9f3cfdc..c46f191657377 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -126,23 +126,24 @@ next_power_of_two(size_t s) // Initialization helper for ISAAC RNG +template static inline void -isaac_init(rust_scheduler *sched, randctx *rctx) +isaac_init(sched_or_kernel *sched, randctx *rctx) { memset(rctx, 0, sizeof(randctx)); #ifdef __WIN32__ { HCRYPTPROV hProv; - sched->kernel->win32_require + sched->win32_require (_T("CryptAcquireContext"), CryptAcquireContext(&hProv, NULL, NULL, PROV_RSA_FULL, CRYPT_VERIFYCONTEXT|CRYPT_SILENT)); - sched->kernel->win32_require + sched->win32_require (_T("CryptGenRandom"), CryptGenRandom(hProv, sizeof(rctx->randrsl), (BYTE*)(&rctx->randrsl))); - sched->kernel->win32_require + sched->win32_require (_T("CryptReleaseContext"), CryptReleaseContext(hProv, 0)); } diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index c6d31a47f4c82..d6c218d936d8c 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -37,6 +37,7 @@ rust_ptr_eq rust_run_program rust_start rust_getcwd +set_min_stack size_of squareroot str_alloc @@ -58,6 +59,7 @@ upcall_chan_target_task upcall_clone_chan upcall_del_chan upcall_del_port +upcall_drop_task upcall_dup_str upcall_exit upcall_fail @@ -86,6 +88,7 @@ upcall_shared_malloc upcall_shared_free upcall_sleep upcall_start_task +upcall_take_task upcall_trace_str upcall_trace_word upcall_vec_append diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 3d0d101388908..df596c65cb2f7 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -10,7 +10,9 @@ #include "lock_and_signal.h" #if defined(__WIN32__) -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : alive(true) +{ // FIXME: In order to match the behavior of pthread_cond_broadcast on // Windows, we create manual reset events. This however breaks the // behavior of pthread_cond_signal, fixing this is quite involved: @@ -22,7 +24,7 @@ lock_and_signal::lock_and_signal() { #else lock_and_signal::lock_and_signal() - : _locked(false) + : _locked(false), alive(true) { CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); @@ -36,6 +38,7 @@ lock_and_signal::~lock_and_signal() { CHECKED(pthread_cond_destroy(&_cond)); CHECKED(pthread_mutex_destroy(&_mutex)); #endif + alive = false; } void lock_and_signal::lock() { @@ -65,11 +68,15 @@ void lock_and_signal::wait() { timed_wait(0); } -void lock_and_signal::timed_wait(size_t timeout_in_ns) { +bool lock_and_signal::timed_wait(size_t timeout_in_ns) { + _locked = false; + bool rv = true; #if defined(__WIN32__) LeaveCriticalSection(&_cs); - WaitForSingleObject(_event, INFINITE); + DWORD timeout = timeout_in_ns == 0 ? INFINITE : timeout_in_ns / 1000000; + rv = WaitForSingleObject(_event, timeout) != WAIT_TIMEOUT; EnterCriticalSection(&_cs); + _holding_thread = GetCurrentThreadId(); #else if (timeout_in_ns == 0) { CHECKED(pthread_cond_wait(&_cond, &_mutex)); @@ -79,9 +86,29 @@ void lock_and_signal::timed_wait(size_t timeout_in_ns) { timespec time_spec; time_spec.tv_sec = time_val.tv_sec + 0; time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns; - CHECKED(pthread_cond_timedwait(&_cond, &_mutex, &time_spec)); + if(time_spec.tv_nsec >= 1000000000) { + time_spec.tv_sec++; + time_spec.tv_nsec -= 1000000000; + } + int cond_wait_status + = pthread_cond_timedwait(&_cond, &_mutex, &time_spec); + switch(cond_wait_status) { + case 0: + // successfully grabbed the lock. + break; + case ETIMEDOUT: + // Oops, we timed out. + rv = false; + break; + default: + // Error + CHECKED(cond_wait_status); + } } + _holding_thread = pthread_self(); #endif + _locked = true; + return rv; } /** diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 60c22958342fb..6e656017115d2 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -14,6 +14,9 @@ class lock_and_signal { pthread_t _holding_thread; #endif bool _locked; + + bool alive; + public: lock_and_signal(); virtual ~lock_and_signal(); @@ -21,7 +24,7 @@ class lock_and_signal { void lock(); void unlock(); void wait(); - void timed_wait(size_t timeout_in_ns); + bool timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index a932ef1c2cab4..8298f4028818d 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -1,4 +1,4 @@ -// -*- c++-mode -*- +// -*- c++ -*- #ifndef SYNC_H #define SYNC_H diff --git a/src/rt/test/rust_test_runtime.cpp b/src/rt/test/rust_test_runtime.cpp index 8acfe45c9e651..1e7c10944a763 100644 --- a/src/rt/test/rust_test_runtime.cpp +++ b/src/rt/test/rust_test_runtime.cpp @@ -11,17 +11,16 @@ rust_test_runtime::~rust_test_runtime() { void rust_domain_test::worker::run() { - rust_scheduler *handle = kernel->get_scheduler(); for (int i = 0; i < TASKS; i++) { - handle->create_task(NULL, "child"); + kernel->create_task(NULL, "child"); } - sync::sleep(rand(&handle->rctx) % 1000); + //sync::sleep(rand(&handle->rctx) % 1000); } bool rust_domain_test::run() { rust_srv srv; - rust_kernel kernel(&srv); + rust_kernel kernel(&srv, 1); array_list workers; for (int i = 0; i < DOMAINS; i++) { @@ -45,15 +44,15 @@ void task_entry() { void rust_task_test::worker::run() { - rust_scheduler *scheduler = kernel->get_scheduler(); - scheduler->root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); - scheduler->start_main_loop(0); + rust_task *root_task = kernel->create_task(NULL, "main"); + root_task->start((uintptr_t)&task_entry, (uintptr_t)NULL); + root_task->sched->start_main_loop(); } bool rust_task_test::run() { rust_srv srv; - rust_kernel kernel(&srv); + rust_kernel kernel(&srv, 1); array_list workers; for (int i = 0; i < DOMAINS; i++) { @@ -62,6 +61,6 @@ rust_task_test::run() { worker->start(); } - sync::sleep(rand(&kernel.sched->rctx) % 1000); + //sync::sleep(rand(&kernel.sched->rctx) % 1000); return true; } diff --git a/src/test/bench/task-perf-word-count.rs b/src/test/bench/task-perf-word-count.rs index 6827ea55804ac..0a6b94c7f2ed2 100644 --- a/src/test/bench/task-perf-word-count.rs +++ b/src/test/bench/task-perf-word-count.rs @@ -1,6 +1,3 @@ -// xfail-stage1 -// xfail-stage2 -// xfail-stage3 /** A parallel word-frequency counting program. @@ -187,7 +184,6 @@ mod map_reduce { let m; ctrl |> m; - alt m { mapper_done. { // log_err "received mapper terminated."; @@ -233,10 +229,19 @@ fn main(argv: vec[str]) { let out = io::stdout(); out.write_line(#fmt("Usage: %s ...", argv.(0))); - fail; + + // TODO: run something just to make sure the code hasn't + // broken yet. This is the unit test mode of this program. + + ret; } + // We can get by with 8k stacks, and we'll probably exhaust our + // address space otherwise. + task::set_min_stack(8192u); + let start = time::precise_time_ns(); + map_reduce::map_reduce(vec::slice(argv, 1u, vec::len(argv))); let stop = time::precise_time_ns(); @@ -342,4 +347,4 @@ fn is_alpha_upper(c: char) -> bool { fn is_alpha(c: char) -> bool { is_alpha_upper(c) || is_alpha_lower(c) } -fn is_word_char(c: char) -> bool { is_alpha(c) || is_digit(c) || c == '_' } \ No newline at end of file +fn is_word_char(c: char) -> bool { is_alpha(c) || is_digit(c) || c == '_' }