diff --git a/AUTHORS b/AUTHORS index cc470814e1cf9..cb2f7f00a09c9 100644 --- a/AUTHORS +++ b/AUTHORS @@ -509,3 +509,4 @@ a license to everyone to use it as detailed in LICENSE.) * Julien Jorge * Attila Oláh (copyright owned by Google, LLC) * Marat Dukhan (copyright owned by Google, LLC) +* Alexander Jährling diff --git a/src/library_pthread.js b/src/library_pthread.js index a7621c1f67bf3..5b833bb3fce99 100644 --- a/src/library_pthread.js +++ b/src/library_pthread.js @@ -858,7 +858,7 @@ var LibraryPThread = { __pthread_testcancel_js(); // In main runtime thread (the thread that initialized the Emscripten C runtime and launched main()), assist pthreads in performing operations // that they need to access the Emscripten main runtime for. - if (!ENVIRONMENT_IS_PTHREAD) _emscripten_main_thread_process_queued_calls(); + if (!ENVIRONMENT_IS_PTHREAD) _emscripten_current_thread_process_proxied_queued_calls(); _emscripten_futex_wait(thread + {{{ C_STRUCTS.pthread.threadStatus }}}, threadStatus, ENVIRONMENT_IS_PTHREAD ? 100 : 1); } }, @@ -1107,7 +1107,7 @@ var LibraryPThread = { _main_thread_futex_wait_address: '0', // Returns 0 on success, or one of the values -ETIMEDOUT, -EWOULDBLOCK or -EINVAL on error. - emscripten_futex_wait__deps: ['_main_thread_futex_wait_address', 'emscripten_main_thread_process_queued_calls'], + emscripten_futex_wait__deps: ['_main_thread_futex_wait_address', 'emscripten_current_thread_process_proxied_queued_calls'], emscripten_futex_wait: function(addr, val, timeout) { if (addr <= 0 || addr > HEAP8.length || addr&3 != 0) return -{{{ cDefine('EINVAL') }}}; if (ENVIRONMENT_IS_NODE || ENVIRONMENT_IS_WORKER) { @@ -1146,7 +1146,7 @@ var LibraryPThread = { #endif return -{{{ cDefine('ETIMEDOUT') }}}; } - _emscripten_main_thread_process_queued_calls(); // We are performing a blocking loop here, so must pump any pthreads if they want to perform operations that are proxied. + _emscripten_current_thread_process_proxied_queued_calls(); // We are performing a blocking loop here, so must pump any pthreads if they want to perform operations that are proxied. addr = Atomics.load(HEAP32, __main_thread_futex_wait_address >> 2); // Look for a worker thread waking us up. } #if PTHREADS_PROFILING diff --git a/system/include/emscripten/threading.h b/system/include/emscripten/threading.h index aaf02b0b2bb23..9b7b83cb1cda7 100644 --- a/system/include/emscripten/threading.h +++ b/system/include/emscripten/threading.h @@ -361,6 +361,8 @@ void emscripten_main_thread_process_queued_calls(void); void emscripten_current_thread_process_queued_calls(void); +void emscripten_current_thread_process_proxied_queued_calls(void); + pthread_t emscripten_main_browser_thread_id(void); // Synchronously sleeps the calling thread for the given number of milliseconds. diff --git a/system/lib/libc/musl/src/thread/__timedwait.c b/system/lib/libc/musl/src/thread/__timedwait.c index 71abe8e775b65..84bb1b24776ae 100644 --- a/system/lib/libc/musl/src/thread/__timedwait.c +++ b/system/lib/libc/musl/src/thread/__timedwait.c @@ -52,7 +52,7 @@ int __timedwait_cp(volatile int *addr, int val, return ECANCELED; } // Assist other threads by executing proxied operations that are effectively singlethreaded. - if (is_main_thread) emscripten_main_thread_process_queued_calls(); + if (is_main_thread) emscripten_current_thread_process_proxied_queued_calls(); // Must wait in slices in case this thread is cancelled in between. double waitMsecs = sleepUntilTime - emscripten_get_now(); if (waitMsecs <= 0) { diff --git a/system/lib/pthread/library_pthread.c b/system/lib/pthread/library_pthread.c index 6af1da45d6379..0ce937aa9e3c9 100644 --- a/system/lib/pthread/library_pthread.c +++ b/system/lib/pthread/library_pthread.c @@ -122,7 +122,7 @@ void emscripten_thread_sleep(double msecs) { __pthread_testcancel(); // pthreads spec: sleep is a cancellation point, so must test if this // thread is cancelled during the sleep. - emscripten_current_thread_process_queued_calls(); + emscripten_current_thread_process_proxied_queued_calls(); // If we have less than this many msecs left to wait, busy spin that instead. const double minimumTimeSliceToSleep = 0.1; @@ -137,7 +137,7 @@ void emscripten_thread_sleep(double msecs) { // Keep processing the main loop of the calling thread. __pthread_testcancel(); // pthreads spec: sleep is a cancellation point, so must test if this // thread is cancelled during the sleep. - emscripten_current_thread_process_queued_calls(); + emscripten_current_thread_process_proxied_queued_calls(); now = emscripten_get_now(); double msecsToSleep = target - now; @@ -200,6 +200,10 @@ int emscripten_builtin_pthread_create(void *thread, void *attr, static void _do_call(em_queued_call* q) { // C function pointer + if (q == 0) { + return; + } + assert(EM_FUNC_SIG_NUM_FUNC_ARGUMENTS(q->functionEnum) <= EM_QUEUED_CALL_MAX_ARGS); switch (q->functionEnum) { case EM_PROXIED_PTHREAD_CREATE: @@ -651,6 +655,51 @@ void* EMSCRIPTEN_KEEPALIVE emscripten_sync_run_in_main_thread_7(int function, vo return q.returnValue.vp; } +void EMSCRIPTEN_KEEPALIVE emscripten_current_thread_process_proxied_queued_calls() { + // this call should just handled proxied calls + pthread_mutex_lock(&call_queue_lock); + CallQueue* q = GetQueue(pthread_self()); + if (!q) { + pthread_mutex_unlock(&call_queue_lock); + return; + } + + if (!emscripten_is_main_browser_thread()) { + EM_ASM({console.log('!!!emscripten_current_thread_process_proxied_queued_calls called from worker: thread ' + _pthread_self() + "\n" + new Error().stack)}); + } + + int head = emscripten_atomic_load_u32((void*)&q->call_queue_head); + int tail = emscripten_atomic_load_u32((void*)&q->call_queue_tail); + while (head != tail) { + // Assume that the call is heavy, so unlock access to the call queue while it is being + // performed. + em_queued_call* call = q->call_queue[head]; + if (call == 0) { + // already processed + head = (head + 1) % CALL_QUEUE_SIZE; + continue; + } + if ((call->functionEnum & EM_FUNC_SIG_SPECIAL_INTERNAL) == 0) { + head = (head + 1) % CALL_QUEUE_SIZE; + continue; + } + pthread_mutex_unlock(&call_queue_lock); + _do_call(call); + pthread_mutex_lock(&call_queue_lock); + + // remove from queue + q->call_queue[head] = 0; + + head = (head + 1) % CALL_QUEUE_SIZE; + //emscripten_atomic_store_u32((void*)&q->call_queue_head, head); + tail = emscripten_atomic_load_u32((void*)&q->call_queue_tail); + } + pthread_mutex_unlock(&call_queue_lock); + + // If the queue was full and we had waiters pending to get to put data to queue, wake them up. + emscripten_futex_wake((void*)&q->call_queue_head, 0x7FFFFFFF); +} + void EMSCRIPTEN_KEEPALIVE emscripten_current_thread_process_queued_calls() { // #if PTHREADS_DEBUG == 2 // EM_ASM(console.error('thread ' + _pthread_self() + ': diff --git a/system/lib/pthread/library_pthread_stub.c b/system/lib/pthread/library_pthread_stub.c index 513759f384d65..31023e7801bc1 100644 --- a/system/lib/pthread/library_pthread_stub.c +++ b/system/lib/pthread/library_pthread_stub.c @@ -258,6 +258,10 @@ void emscripten_main_thread_process_queued_calls() { // nop } +void emscripten_current_thread_process_proxied_queued_calls() { + // nop +} + void emscripten_current_thread_process_queued_calls() { // nop }