Skip to content

Fix a rare pthreads main thread deadlock that worsened in 2.0.2 #12318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 119 additions & 20 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ var LibraryPThread = {
$PThread__deps: ['$registerPthreadPtr',
'$ERRNO_CODES', 'emscripten_futex_wake', '$killThread',
'$cancelThread', '$cleanupThread',
'_main_thread_futex_wait_address'
#if USE_ASAN || USE_LSAN
, '$withBuiltinMalloc'
#endif
Expand All @@ -28,6 +27,9 @@ var LibraryPThread = {
runningWorkers: [],
// Points to a pthread_t structure in the Emscripten main heap, allocated on demand if/when first needed.
// mainThreadBlock: undefined,
// Stores the memory address that the main thread is waiting on, if any. If
// the main thread is waiting, we wake it up before waking up any workers.
// mainThreadFutex: undefined,
initMainThreadBlock: function() {
#if ASSERTIONS
assert(!ENVIRONMENT_IS_PTHREAD);
Expand Down Expand Up @@ -69,7 +71,7 @@ var LibraryPThread = {
Atomics.store(HEAPU32, (PThread.mainThreadBlock + {{{ C_STRUCTS.pthread.tid }}} ) >> 2, PThread.mainThreadBlock); // Main thread ID.
Atomics.store(HEAPU32, (PThread.mainThreadBlock + {{{ C_STRUCTS.pthread.pid }}} ) >> 2, {{{ PROCINFO.pid }}}); // Process ID.

__main_thread_futex_wait_address = _malloc(4);
PThread.initShared();

#if PTHREADS_PROFILING
PThread.createProfilerBlock(PThread.mainThreadBlock);
Expand All @@ -87,6 +89,7 @@ var LibraryPThread = {
_emscripten_register_main_browser_thread_id(PThread.mainThreadBlock);
},
initWorker: function() {
PThread.initShared();
#if EMBIND
// Embind must initialize itself on all threads, as it generates support JS.
Module['___embind_register_native_and_builtin_types']();
Expand All @@ -106,6 +109,12 @@ var LibraryPThread = {
PThread['setThreadStatus'] = PThread.setThreadStatus;
PThread['threadCancel'] = PThread.threadCancel;
PThread['threadExit'] = PThread.threadExit;
#endif
},
initShared: function() {
PThread.mainThreadFutex = Module['_main_thread_futex'];
#if ASSERTIONS
assert(PThread.mainThreadFutex > 0);
#endif
},
// Maps pthread_t to pthread info objects
Expand Down Expand Up @@ -1115,14 +1124,12 @@ var LibraryPThread = {
return 0;
},

// Stores the memory address that the main thread is waiting on, if any.
_main_thread_futex_wait_address: '0',

// Returns 0 on success, or one of the values -ETIMEDOUT, -EWOULDBLOCK or -EINVAL on error.
emscripten_futex_wait__deps: ['_main_thread_futex_wait_address', 'emscripten_main_thread_process_queued_calls'],
emscripten_futex_wait__deps: ['emscripten_main_thread_process_queued_calls'],
emscripten_futex_wait: function(addr, val, timeout) {
if (addr <= 0 || addr > HEAP8.length || addr&3 != 0) return -{{{ cDefine('EINVAL') }}};
if (ENVIRONMENT_IS_NODE || ENVIRONMENT_IS_WORKER) {
// We can do a normal blocking wait anywhere but on the main browser thread.
if (!ENVIRONMENT_IS_WEB) {
#if PTHREADS_PROFILING
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
#endif
Expand All @@ -1135,31 +1142,114 @@ var LibraryPThread = {
if (ret === 'ok') return 0;
throw 'Atomics.wait returned an unexpected value ' + ret;
} else {
// Atomics.wait is not available in the main browser thread, so simulate it via busy spinning.
var loadedVal = Atomics.load(HEAP32, addr >> 2);
if (val != loadedVal) return -{{{ cDefine('EWOULDBLOCK') }}};
// First, check if the value is correct for us to wait on.
if (Atomics.load(HEAP32, addr >> 2) != val) {
return -{{{ cDefine('EWOULDBLOCK') }}};
}

// Atomics.wait is not available in the main browser thread, so simulate it via busy spinning.
var tNow = performance.now();
var tEnd = tNow + timeout;

#if PTHREADS_PROFILING
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
#endif
// Register globally which address the main thread is simulating to be
// waiting on. When zero, the main thread is not waiting on anything, and on
// nonzero, the contents of the address pointed by PThread.mainThreadFutex
// tell which address the main thread is simulating its wait on.
// We need to be careful of recursion here: If we wait on a futex, and
// then call _emscripten_main_thread_process_queued_calls() below, that
// will call code that takes the proxying mutex - which can once more
// reach this code in a nested call. To avoid interference between the
// two (there is just a single mainThreadFutex at a time), unmark
// ourselves before calling the potentially-recursive call. See below for
// how we handle the case of our futex being notified during the time in
// between when we are not set as the value of mainThreadFutex.
#if ASSERTIONS
assert(PThread.mainThreadFutex > 0);
#endif
var lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr);
#if ASSERTIONS
// We must not have already been waiting.
assert(lastAddr == 0);
#endif

// Register globally which address the main thread is simulating to be waiting on. When zero, main thread is not waiting on anything,
// and on nonzero, the contents of address pointed by __main_thread_futex_wait_address tell which address the main thread is simulating its wait on.
Atomics.store(HEAP32, __main_thread_futex_wait_address >> 2, addr);
var ourWaitAddress = addr; // We may recursively re-enter this function while processing queued calls, in which case we'll do a spurious wakeup of the older wait operation.
while (addr == ourWaitAddress) {
while (1) {
// Check for a timeout.
tNow = performance.now();
if (tNow > tEnd) {
#if PTHREADS_PROFILING
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
#endif
// We timed out, so stop marking ourselves as waiting.
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0);
#if ASSERTIONS
// The current value must have been our address which we set, or
// in a race it was set to 0 which means another thread just allowed
// us to run, but (tragically) that happened just a bit too late.
assert(lastAddr == addr || lastAddr == 0);
#endif
return -{{{ cDefine('ETIMEDOUT') }}};
}
_emscripten_main_thread_process_queued_calls(); // We are performing a blocking loop here, so must pump any pthreads if they want to perform operations that are proxied.
addr = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2); // Look for a worker thread waking us up.
// We are performing a blocking loop here, so we must handle proxied
// events from pthreads, to avoid deadlocks.
// Note that we have to do so carefully, as we may take a lock while
// doing so, which can recurse into this function; stop marking
// ourselves as waiting while we do so.
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, 0);
#if ASSERTIONS
assert(lastAddr == addr || lastAddr == 0);
#endif
if (lastAddr == 0) {
// We were told to stop waiting, so stop.
break;
}
_emscripten_main_thread_process_queued_calls();

// Check the value, as if we were starting the futex all over again.
// This handles the following case:
//
// * wait on futex A
// * recurse into emscripten_main_thread_process_queued_calls(),
// which waits on futex B. that sets the mainThreadFutex address to
// futex B, and there is no longer any mention of futex A.
// * a worker is done with futex A. it checks mainThreadFutex but does
// not see A, so it does nothing special for the main thread.
// * a worker is done with futex B. it flips mainThreadMutex from B
// to 0, ending the wait on futex B.
// * we return to the wait on futex A. mainThreadFutex is 0, but that
// is because of futex B being done - we can't tell from
// mainThreadFutex whether A is done or not. therefore, check the
// memory value of the futex.
//
// That case motivates the design here. Given that, checking the memory
// address is also necessary for other reasons: we unset and re-set our
// address in mainThreadFutex around calls to
// emscripten_main_thread_process_queued_calls(), and a worker could
// attempt to wake us up right before/after such times.
//
// Note that checking the memory value of the futex is valid to do: we
// could easily have been delayed (relative to the worker holding on
// to futex A), which means we could be starting all of our work at the
// later time when there is no need to block. The only "odd" thing is
// that we may have caused side effects in that "delay" time. But the
// only side effects we can have are to call
// emscripten_main_thread_process_queued_calls(). That is always ok to
// do on the main thread (it's why it is ok for us to call it in the
// middle of this function, and elsewhere). So if we check the value
// here and return, it's the same is if what happened on the main thread
// was the same as calling emscripten_main_thread_process_queued_calls()
// a few times times before calling emscripten_futex_wait().
if (Atomics.load(HEAP32, addr >> 2) != val) {
return -{{{ cDefine('EWOULDBLOCK') }}};
}

// Mark us as waiting once more, and continue the loop.
lastAddr = Atomics.exchange(HEAP32, PThread.mainThreadFutex >> 2, addr);
#if ASSERTIONS
assert(lastAddr == 0);
#endif
}
#if PTHREADS_PROFILING
PThread.setThreadStatusConditional(_pthread_self(), {{{ cDefine('EM_THREAD_STATUS_RUNNING') }}}, {{{ cDefine('EM_THREAD_STATUS_WAITFUTEX') }}});
Expand All @@ -1170,7 +1260,6 @@ var LibraryPThread = {

// Returns the number of threads (>= 0) woken up, or the value -EINVAL on error.
// Pass count == INT_MAX to wake up all threads.
emscripten_futex_wake__deps: ['_main_thread_futex_wait_address'],
emscripten_futex_wake: function(addr, count) {
if (addr <= 0 || addr > HEAP8.length || addr&3 != 0 || count < 0) return -{{{ cDefine('EINVAL') }}};
if (count == 0) return 0;
Expand All @@ -1181,10 +1270,20 @@ var LibraryPThread = {
// See if main thread is waiting on this address? If so, wake it up by resetting its wake location to zero.
// Note that this is not a fair procedure, since we always wake main thread first before any workers, so
// this scheme does not adhere to real queue-based waiting.
var mainThreadWaitAddress = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2);
#if ASSERTIONS
assert(PThread.mainThreadFutex > 0);
#endif
var mainThreadWaitAddress = Atomics.load(HEAP32, PThread.mainThreadFutex >> 2);
var mainThreadWoken = 0;
if (mainThreadWaitAddress == addr) {
var loadedAddr = Atomics.compareExchange(HEAP32, __main_thread_futex_wait_address >> 2, mainThreadWaitAddress, 0);
#if ASSERTIONS
// We only use mainThreadFutex on the main browser thread, where we
// cannot block while we wait. Therefore we should only see it set from
// other threads, and not on the main thread itself. In other words, the
// main thread must never try to wake itself up!
assert(!ENVIRONMENT_IS_WEB);
#endif
var loadedAddr = Atomics.compareExchange(HEAP32, PThread.mainThreadFutex >> 2, mainThreadWaitAddress, 0);
if (loadedAddr == mainThreadWaitAddress) {
--count;
mainThreadWoken = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ this.onmessage = function(e) {
}
} catch(ex) {
err('worker.js onmessage() captured an uncaught exception: ' + ex);
if (ex.stack) err(ex.stack);
if (ex && ex.stack) err(ex.stack);
throw ex;
}
};
Expand Down
5 changes: 5 additions & 0 deletions system/lib/pthread/library_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,11 @@ int llvm_atomic_load_add_i32_p0i32(int* ptr, int delta) {
return emscripten_atomic_add_u32(ptr, delta);
}

// Stores the memory address that the main thread is waiting on, if any. If
// the main thread is waiting, we wake it up before waking up any workers.
EMSCRIPTEN_KEEPALIVE
void* main_thread_futex;

typedef struct main_args {
int argc;
char** argv;
Expand Down
75 changes: 75 additions & 0 deletions tests/pthread/test_pthread_proxy_hammer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <emscripten.h>
#include <stdio.h>

class random_device
{
int __f_;
public:
// constructors
explicit random_device();
~random_device();

// generating functions
unsigned operator()();
};

random_device::random_device()
{
__f_ = open("/dev/urandom", O_RDONLY);
if (__f_ < 0) abort();
}

random_device::~random_device()
{
close(__f_);
}

unsigned
random_device::operator()()
{
unsigned r;
size_t n = sizeof(r);
char* p = reinterpret_cast<char*>(&r);
while (n > 0)
{
ssize_t s = read(__f_, p, 1);
if (s == 0) abort();
if (s == -1)
{
if (errno != EINTR) abort();
continue;
}
n -= static_cast<size_t>(s);
p += static_cast<size_t>(s);
}
return r;
}

int main() {
int total = 0;
for (int i = 0; i < ITERATIONS; i++) {
// printf causes proxying
printf("%d %d\n", i, total);
for (int j = 0; j < 1024; j++) {
// allocation uses a mutex
auto* rd = new random_device();
// reading data causes proxying
for (int k = 0; k < 4; k++) {
total += (*rd)();
}
// make sure the optimizer doesn't remove the allocation
EM_ASM({ out("iter") }, rd);
delete rd;
}
}
printf("done: %d", total);
#ifdef REPORT_RESULT
REPORT_RESULT(0);
#endif
return 0;
}

8 changes: 6 additions & 2 deletions tests/pthread/test_pthread_proxying_in_futex_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// University of Illinois/NCSA Open Source License. Both these licenses can be
// found in the LICENSE file.

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
Expand Down Expand Up @@ -47,9 +48,12 @@ int main()
int rc = pthread_create(&thread, &attr, ThreadMain, 0);
assert(rc == 0);
rc = emscripten_futex_wait(&main_thread_wait_val, 1, 15 * 1000);
if (rc != 0)
// An rc of 0 means no error, and of EWOULDBLOCK means that the value is
// not the expected one, which can happen if the pthread manages to set it
// before we reach the futex_wait.
if (rc != 0 && rc != -EWOULDBLOCK)
{
printf("ERROR! futex wait timed out!\n");
printf("ERROR! futex wait errored %d!\n", rc);
result = 2;
#ifdef REPORT_RESULT
REPORT_RESULT(result);
Expand Down
20 changes: 10 additions & 10 deletions tests/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,12 +1356,12 @@ def assert_out_queue_empty(self, who):
self.harness_out_queue.get()
raise Exception('excessive responses from %s' % who)

# @param tries_left: how many more times to try this test, if it fails. browser tests have
# many more causes of flakiness (in particular, they do not run
# synchronously, so we have a timeout, which can be hit if the VM
# we run on stalls temporarily), so we let each test try more than
# once by default
def run_browser(self, html_file, message, expectedResult=None, timeout=None, tries_left=1):
# @param extra_tries: how many more times to try this test, if it fails. browser tests have
# many more causes of flakiness (in particular, they do not run
# synchronously, so we have a timeout, which can be hit if the VM
# we run on stalls temporarily), so we let each test try more than
# once by default
def run_browser(self, html_file, message, expectedResult=None, timeout=None, extra_tries=1):
if not has_browser():
return
if BrowserCore.unresponsive_tests >= BrowserCore.MAX_UNRESPONSIVE_TESTS:
Expand Down Expand Up @@ -1400,10 +1400,10 @@ def run_browser(self, html_file, message, expectedResult=None, timeout=None, tri
try:
self.assertIdenticalUrlEncoded(expectedResult, output)
except Exception as e:
if tries_left > 0:
if extra_tries > 0:
print('[test error (see below), automatically retrying]')
print(e)
return self.run_browser(html_file, message, expectedResult, timeout, tries_left - 1)
return self.run_browser(html_file, message, expectedResult, timeout, extra_tries - 1)
else:
raise e
finally:
Expand Down Expand Up @@ -1541,7 +1541,7 @@ def btest(self, filename, expected=None, reference=None, force_c=False,
reference_slack=0, manual_reference=False, post_build=None,
args=[], outfile='test.html', message='.', also_proxied=False,
url_suffix='', timeout=None, also_asmjs=False,
manually_trigger_reftest=False):
manually_trigger_reftest=False, extra_tries=1):
assert expected or reference, 'a btest must either expect an output, or have a reference image'
# if we are provided the source and not a path, use that
filename_is_src = '\n' in filename
Expand Down Expand Up @@ -1575,7 +1575,7 @@ def btest(self, filename, expected=None, reference=None, force_c=False,
post_build()
if not isinstance(expected, list):
expected = [expected]
self.run_browser(outfile + url_suffix, message, ['/report_result?' + e for e in expected], timeout=timeout)
self.run_browser(outfile + url_suffix, message, ['/report_result?' + e for e in expected], timeout=timeout, extra_tries=extra_tries)

# Tests can opt into being run under asmjs as well
if 'WASM=0' not in args and (also_asmjs or self.also_asmjs):
Expand Down
Loading