Skip to content

Commit 5302cde

Browse files
author
Eric Holk
committed
Made task threads wait instead of sleep, so they can be woken up. This appears to give us much better parallel performance.
Also, commented out one more unsafe log and updated rust_kernel.cpp to compile under g++
1 parent e697a52 commit 5302cde

8 files changed

+61
-22
lines changed

src/rt/circular_buffer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ circular_buffer::dequeue(void *dst) {
123123
if (dst != NULL) {
124124
memcpy(dst, &_buffer[_next], unit_sz);
125125
}
126-
DLOG(sched, mem, "shifted data from index %d", _next);
126+
//DLOG(sched, mem, "shifted data from index %d", _next);
127127
_unread -= unit_sz;
128128
_next += unit_sz;
129129
if (_next == _buffer_sz) {

src/rt/rust_kernel.cpp

+11-5
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) {
5353
}
5454

5555
void rust_kernel::create_schedulers() {
56-
for(int i = 0; i < num_threads; ++i) {
56+
for(size_t i = 0; i < num_threads; ++i) {
5757
threads.push(create_scheduler(i));
5858
}
5959
}
6060

6161
void rust_kernel::destroy_schedulers() {
62-
for(int i = 0; i < num_threads; ++i) {
62+
for(size_t i = 0; i < num_threads; ++i) {
6363
destroy_scheduler(threads[i]);
6464
}
6565
}
@@ -106,7 +106,7 @@ rust_kernel::get_port_handle(rust_port *port) {
106106

107107
void
108108
rust_kernel::log_all_scheduler_state() {
109-
for(int i = 0; i < num_threads; ++i) {
109+
for(size_t i = 0; i < num_threads; ++i) {
110110
threads[i]->log_state();
111111
}
112112
}
@@ -252,12 +252,12 @@ rust_kernel::signal_kernel_lock() {
252252

253253
int rust_kernel::start_task_threads()
254254
{
255-
for(int i = 0; i < num_threads; ++i) {
255+
for(size_t i = 0; i < num_threads; ++i) {
256256
rust_scheduler *thread = threads[i];
257257
thread->start();
258258
}
259259

260-
for(int i = 0; i < num_threads; ++i) {
260+
for(size_t i = 0; i < num_threads; ++i) {
261261
rust_scheduler *thread = threads[i];
262262
thread->join();
263263
}
@@ -271,6 +271,12 @@ rust_kernel::create_task(rust_task *spawner, const char *name) {
271271
return threads[rand(&rctx) % num_threads]->create_task(spawner, name);
272272
}
273273

274+
void rust_kernel::wakeup_schedulers() {
275+
for(size_t i = 0; i < num_threads; ++i) {
276+
threads[i]->lock.signal_all();
277+
}
278+
}
279+
274280
#ifdef __WIN32__
275281
void
276282
rust_kernel::win32_require(LPCTSTR fn, BOOL ok) {

src/rt/rust_kernel.h

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class rust_kernel : public rust_thread {
106106
bool is_deadlocked();
107107

108108
void signal_kernel_lock();
109+
void wakeup_schedulers();
109110

110111
/**
111112
* Notifies the kernel whenever a message has been enqueued . This gives

src/rt/rust_scheduler.cpp

+7-9
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,21 @@ rust_scheduler::reap_dead_tasks(int id) {
8787
I(this, lock.lock_held_by_current_thread());
8888
for (size_t i = 0; i < dead_tasks.length(); ) {
8989
rust_task *task = dead_tasks[i];
90+
task->lock.lock();
9091
// Make sure this task isn't still running somewhere else...
9192
if (task->ref_count == 0 && task->can_schedule(id)) {
9293
I(this, task->tasks_waiting_to_join.is_empty());
9394
dead_tasks.remove(task);
9495
DLOG(this, task,
9596
"deleting unreferenced dead task %s @0x%" PRIxPTR,
9697
task->name, task);
98+
task->lock.unlock();
9799
delete task;
98100
sync::decrement(kernel->live_tasks);
101+
kernel->wakeup_schedulers();
99102
continue;
100103
}
104+
task->lock.unlock();
101105
++i;
102106
}
103107
}
@@ -206,21 +210,15 @@ rust_scheduler::start_main_loop() {
206210

207211
rust_task *scheduled_task = schedule_task(id);
208212

209-
// The scheduler busy waits until a task is available for scheduling.
210-
// Eventually we'll want a smarter way to do this, perhaps sleep
211-
// for a minimum amount of time.
212-
213213
if (scheduled_task == NULL) {
214214
log_state();
215215
DLOG(this, task,
216216
"all tasks are blocked, scheduler id %d yielding ...",
217217
id);
218-
lock.unlock();
219-
sync::sleep(100);
220-
lock.lock();
221-
DLOG(this, task,
222-
"scheduler resuming ...");
218+
lock.timed_wait(100000);
223219
reap_dead_tasks(id);
220+
DLOG(this, task,
221+
"scheduler %d resuming ...", id);
224222
continue;
225223
}
226224

src/rt/rust_task.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ rust_task::~rust_task()
105105

106106
/* FIXME: tighten this up, there are some more
107107
assertions that hold at task-lifecycle events. */
108-
// I(sched, ref_count == 0 ||
108+
I(sched, ref_count == 0); // ||
109109
// (ref_count == 1 && this == sched->root_task));
110110

111111
del_stk(this, stk);
@@ -167,6 +167,7 @@ rust_task::start(uintptr_t spawnee_fn,
167167

168168
yield_timer.reset_us(0);
169169
transition(&sched->newborn_tasks, &sched->running_tasks);
170+
sched->lock.signal();
170171
}
171172

172173
void
@@ -212,6 +213,8 @@ rust_task::kill() {
212213
if (NULL == supervisor && propagate_failure)
213214
sched->fail();
214215

216+
sched->lock.signal();
217+
215218
LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this);
216219
// run_on_resume(rust_unwind_glue);
217220
}
@@ -442,12 +445,15 @@ rust_task::wakeup(rust_cond *from) {
442445
if(_on_wakeup) {
443446
_on_wakeup->on_wakeup();
444447
}
448+
449+
sched->lock.signal();
445450
}
446451

447452
void
448453
rust_task::die() {
449454
scoped_lock with(lock);
450455
transition(&sched->running_tasks, &sched->dead_tasks);
456+
sched->lock.signal();
451457
}
452458

453459
void

src/rt/sync/lock_and_signal.cpp

+30-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
#include "lock_and_signal.h"
1111

1212
#if defined(__WIN32__)
13-
lock_and_signal::lock_and_signal() {
13+
lock_and_signal::lock_and_signal()
14+
: alive(true)
15+
{
1416
// FIXME: In order to match the behavior of pthread_cond_broadcast on
1517
// Windows, we create manual reset events. This however breaks the
1618
// behavior of pthread_cond_signal, fixing this is quite involved:
@@ -22,7 +24,7 @@ lock_and_signal::lock_and_signal() {
2224

2325
#else
2426
lock_and_signal::lock_and_signal()
25-
: _locked(false)
27+
: _locked(false), alive(true)
2628
{
2729
CHECKED(pthread_cond_init(&_cond, NULL));
2830
CHECKED(pthread_mutex_init(&_mutex, NULL));
@@ -36,6 +38,7 @@ lock_and_signal::~lock_and_signal() {
3638
CHECKED(pthread_cond_destroy(&_cond));
3739
CHECKED(pthread_mutex_destroy(&_mutex));
3840
#endif
41+
alive = false;
3942
}
4043

4144
void lock_and_signal::lock() {
@@ -65,11 +68,14 @@ void lock_and_signal::wait() {
6568
timed_wait(0);
6669
}
6770

68-
void lock_and_signal::timed_wait(size_t timeout_in_ns) {
71+
bool lock_and_signal::timed_wait(size_t timeout_in_ns) {
72+
_locked = false;
73+
bool rv = true;
6974
#if defined(__WIN32__)
7075
LeaveCriticalSection(&_cs);
7176
WaitForSingleObject(_event, INFINITE);
7277
EnterCriticalSection(&_cs);
78+
_holding_thread = GetCurrentThreadId();
7379
#else
7480
if (timeout_in_ns == 0) {
7581
CHECKED(pthread_cond_wait(&_cond, &_mutex));
@@ -79,9 +85,29 @@ void lock_and_signal::timed_wait(size_t timeout_in_ns) {
7985
timespec time_spec;
8086
time_spec.tv_sec = time_val.tv_sec + 0;
8187
time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns;
82-
CHECKED(pthread_cond_timedwait(&_cond, &_mutex, &time_spec));
88+
if(time_spec.tv_nsec >= 1000000000) {
89+
time_spec.tv_sec++;
90+
time_spec.tv_nsec -= 1000000000;
91+
}
92+
int cond_wait_status
93+
= pthread_cond_timedwait(&_cond, &_mutex, &time_spec);
94+
switch(cond_wait_status) {
95+
case 0:
96+
// successfully grabbed the lock.
97+
break;
98+
case ETIMEDOUT:
99+
// Oops, we timed out.
100+
rv = false;
101+
break;
102+
default:
103+
// Error
104+
CHECKED(cond_wait_status);
105+
}
83106
}
107+
_holding_thread = pthread_self();
84108
#endif
109+
_locked = true;
110+
return rv;
85111
}
86112

87113
/**

src/rt/sync/lock_and_signal.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,17 @@ class lock_and_signal {
1414
pthread_t _holding_thread;
1515
#endif
1616
bool _locked;
17+
18+
bool alive;
19+
1720
public:
1821
lock_and_signal();
1922
virtual ~lock_and_signal();
2023

2124
void lock();
2225
void unlock();
2326
void wait();
24-
void timed_wait(size_t timeout_in_ns);
27+
bool timed_wait(size_t timeout_in_ns);
2528
void signal();
2629
void signal_all();
2730

src/test/bench/task-perf-word-count.rs

-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ mod map_reduce {
184184
let m;
185185
ctrl |> m;
186186

187-
188187
alt m {
189188
mapper_done. {
190189
// log_err "received mapper terminated.";

0 commit comments

Comments
 (0)