Skip to content

Commit 6650d80

Browse files
authored
Rewrite the old proxying API in terms of the new API (#15880)
Rewrite the old threading.h proxying API used for internal system call implementations in terms of the new proxying API introduced in #15737.
1 parent d4873eb commit 6650d80

8 files changed

+61
-222
lines changed

src/library_pthread.js

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,9 @@ var LibraryPThread = {
263263
return;
264264
}
265265

266-
if (cmd === 'processQueuedMainThreadWork') {
266+
if (cmd === 'processProxyingQueue') {
267267
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode.
268-
_emscripten_main_thread_process_queued_calls();
268+
_emscripten_proxy_execute_queue(d['queue']);
269269
} else if (cmd === 'spawnThread') {
270270
spawnThread(d);
271271
} else if (cmd === 'cleanupThread') {
@@ -1040,29 +1040,6 @@ var LibraryPThread = {
10401040
return {{{ makeDynCall('ii', 'ptr') }}}(arg);
10411041
},
10421042

1043-
// This function is called internally to notify target thread ID that it has messages it needs to
1044-
// process in its message queue inside the Wasm heap. As a helper, the caller must also pass the
1045-
// ID of the main browser thread to this function, to avoid needlessly ping-ponging between JS and
1046-
// Wasm boundaries.
1047-
_emscripten_notify_thread_queue: function(targetThreadId, mainThreadId) {
1048-
if (targetThreadId == mainThreadId) {
1049-
postMessage({'cmd' : 'processQueuedMainThreadWork'});
1050-
} else if (ENVIRONMENT_IS_PTHREAD) {
1051-
postMessage({'targetThread': targetThreadId, 'cmd': 'processThreadQueue'});
1052-
} else {
1053-
var pthread = PThread.pthreads[targetThreadId];
1054-
var worker = pthread && pthread.worker;
1055-
if (!worker) {
1056-
#if ASSERTIONS
1057-
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
1058-
#endif
1059-
return /*0*/;
1060-
}
1061-
worker.postMessage({'cmd' : 'processThreadQueue'});
1062-
}
1063-
return 1;
1064-
},
1065-
10661043
_emscripten_notify_proxying_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
10671044
if (targetThreadId == currThreadId) {
10681045
setTimeout(function() { _emscripten_proxy_execute_queue(queue); });

src/worker.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,6 @@ self.onmessage = (e) => {
285285
}
286286
} else if (e.data.target === 'setimmediate') {
287287
// no-op
288-
} else if (e.data.cmd === 'processThreadQueue') {
289-
if (Module['_pthread_self']()) { // If this thread is actually running?
290-
Module['_emscripten_current_thread_process_queued_calls']();
291-
}
292288
} else if (e.data.cmd === 'processProxyingQueue') {
293289
if (Module['_pthread_self']()) { // If this thread is actually running?
294290
Module['_emscripten_proxy_execute_queue'](e.data.queue);

system/lib/pthread/library_pthread.c

Lines changed: 30 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <emscripten/stack.h>
3636

3737
#include "threading_internal.h"
38+
#include "proxying.h"
3839

3940
int emscripten_pthread_attr_gettransferredcanvases(const pthread_attr_t* a, const char** str) {
4041
*str = a->_a_transferredcanvases;
@@ -177,7 +178,6 @@ void emscripten_async_waitable_close(em_queued_call* call) {
177178

178179
extern EMSCRIPTEN_RESULT _emscripten_set_offscreencanvas_size(const char *target, int width, int height);
179180
extern double emscripten_receive_on_main_thread_js(int functionIndex, int numCallArgs, double* args);
180-
extern int _emscripten_notify_thread_queue(pthread_t targetThreadId, pthread_t mainThreadId);
181181

182182
static void _do_call(void* arg) {
183183
em_queued_call* q = (em_queued_call*)arg;
@@ -321,60 +321,6 @@ static void _do_call(void* arg) {
321321
}
322322
}
323323

324-
#define CALL_QUEUE_SIZE 128
325-
326-
// Shared data synchronized by call_queue_lock.
327-
typedef struct CallQueueEntry {
328-
void (*func)(void*);
329-
void* arg;
330-
} CallQueueEntry;
331-
332-
typedef struct CallQueue {
333-
void* target_thread;
334-
CallQueueEntry* call_queue;
335-
int call_queue_head;
336-
int call_queue_tail;
337-
struct CallQueue* next;
338-
} CallQueue;
339-
340-
// Currently global to the queue, but this can be improved to be per-queue specific. (TODO: with
341-
// lockfree list operations on callQueue_head, or removing the list by moving this data to
342-
// pthread_t)
343-
static pthread_mutex_t call_queue_lock = PTHREAD_MUTEX_INITIALIZER;
344-
static CallQueue* callQueue_head = 0;
345-
346-
// Not thread safe, call while having call_queue_lock obtained.
347-
static CallQueue* GetQueue(void* target) {
348-
assert(target);
349-
CallQueue* q = callQueue_head;
350-
while (q && q->target_thread != target)
351-
q = q->next;
352-
return q;
353-
}
354-
355-
// Not thread safe, call while having call_queue_lock obtained.
356-
static CallQueue* GetOrAllocateQueue(void* target) {
357-
CallQueue* q = GetQueue(target);
358-
if (q)
359-
return q;
360-
361-
q = (CallQueue*)malloc(sizeof(CallQueue));
362-
q->target_thread = target;
363-
q->call_queue = 0;
364-
q->call_queue_head = 0;
365-
q->call_queue_tail = 0;
366-
q->next = 0;
367-
if (callQueue_head) {
368-
CallQueue* last = callQueue_head;
369-
while (last->next)
370-
last = last->next;
371-
last->next = q;
372-
} else {
373-
callQueue_head = q;
374-
}
375-
return q;
376-
}
377-
378324
EMSCRIPTEN_RESULT emscripten_wait_for_call_v(em_queued_call* call, double timeoutMSecs) {
379325
int r;
380326

@@ -410,85 +356,43 @@ pthread_t emscripten_main_browser_thread_id() {
410356
return &__main_pthread;
411357
}
412358

413-
int _emscripten_do_dispatch_to_thread(pthread_t target_thread, em_queued_call* call) {
414-
assert(call);
415-
416-
// #if PTHREADS_DEBUG // TODO: Create a debug version of pthreads library
417-
// EM_ASM_INT({dump('thread ' + _pthread_self() + ' (ENVIRONMENT_IS_WORKER: ' +
418-
//ENVIRONMENT_IS_WORKER + '), queueing call of function enum=' + $0 + '/ptr=' + $1 + ' on thread '
419-
//+ $2 + '\n' + new Error().stack)}, call->functionEnum, call->functionPtr, target_thread);
420-
// #endif
421-
422-
// Can't be a null pointer here, and can't be
423-
// EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD either.
359+
static pthread_t normalize_thread(pthread_t target_thread) {
424360
assert(target_thread);
425-
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD)
426-
target_thread = emscripten_main_browser_thread_id();
427-
428-
// If we are the target recipient of this message, we can just call the operation directly.
429-
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD ||
430-
target_thread == pthread_self()) {
431-
_do_call(call);
432-
return 1;
361+
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD) {
362+
return emscripten_main_browser_thread_id();
433363
}
434-
435-
// Add the operation to the call queue of the main runtime thread.
436-
pthread_mutex_lock(&call_queue_lock);
437-
CallQueue* q = GetOrAllocateQueue(target_thread);
438-
if (!q->call_queue) {
439-
// Shared data synchronized by call_queue_lock.
440-
q->call_queue = malloc(sizeof(CallQueueEntry) * CALL_QUEUE_SIZE);
364+
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD) {
365+
return pthread_self();
441366
}
367+
return target_thread;
368+
}
442369

443-
int head = q->call_queue_head;
444-
int tail = q->call_queue_tail;
445-
int new_tail = (tail + 1) % CALL_QUEUE_SIZE;
446-
447-
while (new_tail == head) { // Queue is full?
448-
pthread_mutex_unlock(&call_queue_lock);
449-
450-
// If queue of the main browser thread is full, then we wait. (never drop messages for the main
451-
// browser thread)
452-
if (target_thread == emscripten_main_browser_thread_id()) {
453-
emscripten_futex_wait((void*)&q->call_queue_head, head, INFINITY);
454-
pthread_mutex_lock(&call_queue_lock);
455-
head = q->call_queue_head;
456-
tail = q->call_queue_tail;
457-
new_tail = (tail + 1) % CALL_QUEUE_SIZE;
458-
} else {
459-
// For the queues of other threads, just drop the message.
460-
// #if DEBUG TODO: a debug build of pthreads library?
461-
// EM_ASM(console.error('Pthread queue overflowed, dropping queued
462-
//message to thread. ' + new Error().stack));
463-
// #endif
464-
em_queued_call_free(call);
465-
return 0;
466-
}
370+
// Execute `call` and return 1 only if already on the `target_thread`. Otherwise
371+
// return 0.
372+
static int maybe_call_on_current_thread(pthread_t target_thread,
373+
em_queued_call* call) {
374+
if (pthread_equal(target_thread, pthread_self())) {
375+
_do_call(call);
376+
return 1;
467377
}
378+
return 0;
379+
}
468380

469-
q->call_queue[tail].func = _do_call;
470-
q->call_queue[tail].arg = call;
471-
472-
// If the call queue was empty, the main runtime thread is likely idle in the browser event loop,
473-
// so send a message to it to ensure that it wakes up to start processing the command we have
474-
// posted.
475-
if (head == tail) {
476-
int success = _emscripten_notify_thread_queue(target_thread, emscripten_main_browser_thread_id());
477-
// Failed to dispatch the thread, delete the crafted message.
478-
if (!success) {
479-
em_queued_call_free(call);
480-
pthread_mutex_unlock(&call_queue_lock);
481-
return 0;
482-
}
381+
// Execute or proxy `call`. Return 1 if the work was executed or otherwise
382+
// return 0.
383+
static int do_dispatch_to_thread(pthread_t target_thread,
384+
em_queued_call* call) {
385+
target_thread = normalize_thread(target_thread);
386+
if (maybe_call_on_current_thread(target_thread, call)) {
387+
return 1;
483388
}
484-
485-
q->call_queue_tail = new_tail;
486-
pthread_mutex_unlock(&call_queue_lock);
389+
emscripten_proxy_async(
390+
emscripten_proxy_get_system_queue(), target_thread, _do_call, call);
487391
return 0;
488392
}
489393

490394
void emscripten_async_run_in_main_thread(em_queued_call* call) {
491-
_emscripten_do_dispatch_to_thread(emscripten_main_browser_thread_id(), call);
395+
do_dispatch_to_thread(emscripten_main_browser_thread_id(), call);
492396
}
493397

494398
void emscripten_sync_run_in_main_thread(em_queued_call* call) {
@@ -589,50 +493,7 @@ void* emscripten_sync_run_in_main_thread_7(int function, void* arg1,
589493
}
590494

591495
void emscripten_current_thread_process_queued_calls() {
592-
// #if PTHREADS_DEBUG == 2
593-
// EM_ASM(console.error('thread ' + _pthread_self() + ':
594-
//emscripten_current_thread_process_queued_calls(), ' + new Error().stack));
595-
// #endif
596-
597-
static thread_local bool thread_is_processing_queued_calls = false;
598-
599-
// It is possible that when processing a queued call, the control flow leads back to calling this
600-
// function in a nested fashion! Therefore this scenario must explicitly be detected, and
601-
// processing the queue must be avoided if we are nesting, or otherwise the same queued calls
602-
// would be processed again and again.
603-
if (thread_is_processing_queued_calls)
604-
return;
605-
// This must be before pthread_mutex_lock(), since pthread_mutex_lock() can call back to this
606-
// function.
607-
thread_is_processing_queued_calls = true;
608-
609-
pthread_mutex_lock(&call_queue_lock);
610-
CallQueue* q = GetQueue(pthread_self());
611-
if (!q) {
612-
pthread_mutex_unlock(&call_queue_lock);
613-
thread_is_processing_queued_calls = false;
614-
return;
615-
}
616-
617-
int head = q->call_queue_head;
618-
int tail = q->call_queue_tail;
619-
while (head != tail) {
620-
// Assume that the call is heavy, so unlock access to the call queue while it is being
621-
// performed.
622-
pthread_mutex_unlock(&call_queue_lock);
623-
q->call_queue[head].func(q->call_queue[head].arg);
624-
pthread_mutex_lock(&call_queue_lock);
625-
626-
head = (head + 1) % CALL_QUEUE_SIZE;
627-
q->call_queue_head = head;
628-
tail = q->call_queue_tail;
629-
}
630-
pthread_mutex_unlock(&call_queue_lock);
631-
632-
// If the queue was full and we had waiters pending to get to put data to queue, wake them up.
633-
emscripten_futex_wake((void*)&q->call_queue_head, INT_MAX);
634-
635-
thread_is_processing_queued_calls = false;
496+
emscripten_proxy_execute_queue(emscripten_proxy_get_system_queue());
636497
}
637498

638499
// At times when we disallow the main thread to process queued calls, this will
@@ -733,17 +594,6 @@ em_queued_call* emscripten_async_waitable_run_in_main_runtime_thread_(
733594
return q;
734595
}
735596

736-
typedef struct DispatchToThreadArgs {
737-
pthread_t target_thread;
738-
em_queued_call* q;
739-
} DispatchToThreadArgs;
740-
741-
static void dispatch_to_thread_helper(void* user_data) {
742-
DispatchToThreadArgs* args = (DispatchToThreadArgs*)user_data;
743-
_emscripten_do_dispatch_to_thread(args->target_thread, args->q);
744-
free(user_data);
745-
}
746-
747597
int emscripten_dispatch_to_thread_args(pthread_t target_thread,
748598
EM_FUNC_SIGNATURE sig,
749599
void* func_ptr,
@@ -761,7 +611,7 @@ int emscripten_dispatch_to_thread_args(pthread_t target_thread,
761611

762612
// `q` will not be used after it is called, so let the call clean it up.
763613
q->calleeDelete = 1;
764-
return _emscripten_do_dispatch_to_thread(target_thread, q);
614+
return do_dispatch_to_thread(target_thread, q);
765615
}
766616

767617
int emscripten_dispatch_to_thread_(pthread_t target_thread,
@@ -792,10 +642,7 @@ int emscripten_dispatch_to_thread_async_args(pthread_t target_thread,
792642
q->calleeDelete = 1;
793643

794644
// Schedule the call to run later on this thread.
795-
DispatchToThreadArgs* args = malloc(sizeof(DispatchToThreadArgs));
796-
args->target_thread = target_thread;
797-
args->q = q;
798-
emscripten_set_timeout(dispatch_to_thread_helper, 0, args);
645+
emscripten_set_timeout(_do_call, 0, q);
799646
return 0;
800647
}
801648

system/lib/pthread/proxying.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,26 @@ static task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,
206206
EMSCRIPTEN_KEEPALIVE
207207
void emscripten_proxy_execute_queue(em_proxying_queue* q) {
208208
assert(q != NULL);
209+
210+
// Recursion guard to avoid infinite recursion when we arrive here from the
211+
// pthread_lock call below that executes the system queue. The per-task_queue
212+
// recursion lock can't catch these recursions because it can only be checked
213+
// after the lock has been acquired.
214+
static _Thread_local int executing_system_queue = 0;
215+
int is_system_queue = q == &system_proxying_queue;
216+
if (is_system_queue) {
217+
if (executing_system_queue) {
218+
return;
219+
}
220+
executing_system_queue = 1;
221+
}
222+
209223
pthread_mutex_lock(&q->mutex);
210224
int tasks_index = get_tasks_index_for_thread(q, pthread_self());
211225
task_queue* tasks = tasks_index == -1 ? NULL : &q->task_queues[tasks_index];
212226
if (tasks == NULL || tasks->processing) {
213227
// No tasks for this thread or they are already being processed.
214-
pthread_mutex_unlock(&q->mutex);
215-
return;
228+
goto end;
216229
}
217230
// Found the task queue; process the tasks.
218231
tasks->processing = 1;
@@ -227,7 +240,12 @@ void emscripten_proxy_execute_queue(em_proxying_queue* q) {
227240
tasks = &q->task_queues[tasks_index];
228241
}
229242
tasks->processing = 0;
243+
244+
end:
230245
pthread_mutex_unlock(&q->mutex);
246+
if (is_system_queue) {
247+
executing_system_queue = 0;
248+
}
231249
}
232250

233251
int emscripten_proxy_async(em_proxying_queue* q,

0 commit comments

Comments
 (0)