Skip to content

Add sync and async_as_sync dispatch_to_thread variants #15705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions system/include/emscripten/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ int emscripten_dispatch_to_thread_(pthread_t target_thread,
emscripten_dispatch_to_thread_( \
(target_thread), (sig), (void*)(func), (satellite), ##__VA_ARGS__)

// Similar to emscripten_dispatch_to_thread, but always runs the
// function asynchronously, even if on the same thread. This is less efficient
// but may be simpler to reason about in some cases.
// Similar to emscripten_dispatch_to_thread, but always runs the function
// asynchronously, even if on the same thread. This is less efficient but may be
// simpler to reason about in some cases.
int emscripten_dispatch_to_thread_async_ptr(pthread_t target_thread,
void (*func)(void*),
void* arg);
Expand All @@ -299,6 +299,54 @@ int emscripten_dispatch_to_thread_async_(pthread_t target_thread,
emscripten_dispatch_to_thread_async_( \
(target_thread), (sig), (void*)(func), (satellite), ##__VA_ARGS__)

// Similar to emscripten_dispatch_to_thread, but waits on the dispatching thread
// until `func` has returned on the target thread.
int emscripten_dispatch_to_thread_sync_ptr(pthread_t target_thread,
void (*func)(void*),
void* arg);
int emscripten_dispatch_to_thread_sync_args(pthread_t target_thread,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other function we have in this file that do sync and async versions are of the following form:

emscripten_sync_run_in_main_runtime_thread
emscripten_async_run_in_main_runtime_thread

Should these therefore be called emscripten_sync_dispatch_to_thread_*?

I also wonder if they should be called something completely different such as emscripten_run_in_thread (really should emscripten_run_on_thread I guess). The word dispatch sounds inherently async to me.. maybe maybe I'm wrong about that?

EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
va_list args);
int emscripten_dispatch_to_thread_sync_(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
...);
#define emscripten_dispatch_to_thread_sync( \
target_thread, sig, func, satellite, ...) \
emscripten_dispatch_to_thread_sync_( \
(target_thread), (sig), (void*)(func), (satellite), ##__VA_ARGS__)

// Similar to emscripten_dispatch_to_thread, but waits on the dispatching thread
// until `emscripten_async_as_sync_finish` is called on the
// `em_async_as_sync_ctx` context pointer supplied to `func`. The first argument
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the term async_as_sync to be a bit of a mouthful... but I don't have any great suggestions for something better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding some text here to explain the motivation would help: that it can do an async operation on the called thread, while behaving sync on the calling thread, and that async Web APIs are a common use case for it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing emscripten_dispatch_to_thread_sync() function allows one to dispatch a task to another thread, and the calling/dispatching thread synchronously waits until the other thread runs the function and returns from that function call.

If I understand correctly, the idea with async_as_sync is to be able to delay the finish condition of the dispatched task? I.e. instead of the event of returning from the dispatched function signaling completion, the target thread will instead some time later manually call emscripten_async_as_sync_finish() to mark the dispatched call as finished? (to accommodate for async continuations from event handlers?)

If so, there does not seem to be a need to bake this feature as built-in to the proxying queue itself? It does have an effect of complicating the API quite a bit.

The major complication is the question about what should happen with the queue while there are one or more of these kinds of long-running async tasks pending? Should the queue retain them until they are asynchronously signaled to be finished? Or should they be removed from the async call queue already before they are signaled to be finished?

Users should be able to manually implement this kind of delayed completion signaling on top of synchronous or asynchronous dispatches via code like

int longRunningAsyncTaskOnCalledThread(int *notifyAddressOnCompletion)
{
   // do long running async stuff, capture notifyAddressOnCompletion somewhere.
   // ...
   // done: wake all waiters.
   __atomic_store_n(notifyAddressOnCompletion, 1, __ATOMIC_SEQ_CST);
   emscripten_futex_wake(notifyAddressOnCompletion, INT_MAX);
}

int callingThread()
{
  int addressNotifiedOnCompletion = 0;
  emscripten_dispatch_to_thread(target_thread, EM_FUNC_SIG_VI, longRunningAsyncTaskOnCalledThread, 0, &addressNotifiedOnCompletion);
  emscripten_futex_wait(&addressNotifiedOnCompletion, 0, INFINITY);
}

i.e. users can get this behavior through the call queue, so this kind of long running async task could be treated orthogonally and would not need to be a feature of the dispatch queue itself?

Copy link
Member Author

@tlively tlively Dec 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's exactly right. OTOH, the normal sync version of the API can be built on top of the async version of the API in the same way, but we still consider it useful enough to provide as a utility. It makes sense to provide a utility to satisfy this async/long running use case as well rather than forcing each user to implement it themselves. There is no appreciable complexity cost due to the layered nature of the implementation.

// type in `sig` must match the provided context pointer.
// `emscripten_async_as_sync_finish` may be called before or after `func`
// returns unless `target_thread` is the current thread, in which case it must
// be called before `func` returns to avoid waiting forever.
typedef struct em_async_as_sync_ctx em_async_as_sync_ctx;
int emscripten_dispatch_to_thread_async_as_sync_ptr(
pthread_t target_thread,
void (*func)(struct em_async_as_sync_ctx*, void*),
void* arg);
int emscripten_dispatch_to_thread_async_as_sync_args(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
va_list args);
int emscripten_dispatch_to_thread_async_as_sync_(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
...);
#define emscripten_dispatch_to_thread_async_as_sync( \
target_thread, sig, func, satellite, ...) \
emscripten_dispatch_to_thread_async_as_sync_( \
(target_thread), (sig), (void*)(func), (satellite), ##__VA_ARGS__)
void emscripten_async_as_sync_finish(struct em_async_as_sync_ctx* ctx);

// Returns 1 if the current thread is the thread that hosts the Emscripten runtime.
int emscripten_is_main_runtime_thread(void);

Expand Down
258 changes: 231 additions & 27 deletions system/lib/pthread/library_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,39 @@ static void em_queued_call_free(em_queued_call* call) {

static void init_em_queued_call_args(em_queued_call* q,
EM_FUNC_SIGNATURE sig,
int start,
va_list args) {
EM_FUNC_SIGNATURE argumentsType = sig & EM_FUNC_SIG_ARGUMENTS_TYPE_MASK;
int numArguments = EM_FUNC_SIG_NUM_FUNC_ARGUMENTS(sig);
for (int i = 0; i < numArguments; ++i) {
switch ((argumentsType & EM_FUNC_SIG_ARGUMENT_TYPE_SIZE_MASK)) {
case EM_FUNC_SIG_PARAM_I:
q->args[i].i = va_arg(args, int);
break;
case EM_FUNC_SIG_PARAM_I64:
q->args[i].i64 = va_arg(args, int64_t);
break;
case EM_FUNC_SIG_PARAM_F:
q->args[i].f = (float)va_arg(args, double);
break;
case EM_FUNC_SIG_PARAM_D:
q->args[i].d = va_arg(args, double);
break;
if (i >= start) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not have the loop on the previous line start from i = start?

switch ((argumentsType & EM_FUNC_SIG_ARGUMENT_TYPE_SIZE_MASK)) {
case EM_FUNC_SIG_PARAM_I:
q->args[i].i = va_arg(args, int);
break;
case EM_FUNC_SIG_PARAM_I64:
q->args[i].i64 = va_arg(args, int64_t);
break;
case EM_FUNC_SIG_PARAM_F:
q->args[i].f = (float)va_arg(args, double);
break;
case EM_FUNC_SIG_PARAM_D:
q->args[i].d = va_arg(args, double);
break;
}
}
argumentsType >>= EM_FUNC_SIG_ARGUMENT_TYPE_SIZE_SHIFT;
}
}

static em_queued_call* em_queued_call_create(EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
va_list args) {
static em_queued_call* em_queued_call_create(
EM_FUNC_SIGNATURE sig, void* func, void* satellite, int start, va_list args) {
em_queued_call* call = em_queued_call_malloc();
if (call) {
call->functionEnum = sig;
call->functionPtr = func;
call->satelliteData = satellite;
init_em_queued_call_args(call, sig, args);
init_em_queued_call_args(call, sig, start, args);
}
return call;
}
Expand Down Expand Up @@ -586,7 +587,7 @@ int emscripten_sync_run_in_main_runtime_thread_(EM_FUNC_SIGNATURE sig, void* fun

va_list args;
va_start(args, func_ptr);
init_em_queued_call_args(&q, sig, args);
init_em_queued_call_args(&q, sig, 0, args);
va_end(args);
emscripten_sync_run_in_main_thread(&q);
return q.returnValue.i;
Expand Down Expand Up @@ -647,7 +648,7 @@ em_queued_call* emscripten_async_waitable_run_in_main_runtime_thread_(

va_list args;
va_start(args, func_ptr);
init_em_queued_call_args(q, sig, args);
init_em_queued_call_args(q, sig, 0, args);
va_end(args);
// 'async waitable' runs are waited on by the caller, so the call object needs to remain alive for
// the caller to access it after the operation is done. The caller is responsible in cleaning up
Expand Down Expand Up @@ -728,13 +729,19 @@ int emscripten_dispatch_to_thread_args(pthread_t target_thread,
void* func_ptr,
void* satellite,
va_list args) {
em_queued_call* q = em_queued_call_create(sig, func_ptr, satellite, args);
if (!q)
em_queued_call* q = em_queued_call_create(sig, func_ptr, satellite, 0, args);
if (!q) {
return 0;
}

// `q` will not be used after it is called, so let the call clean it up.
q->calleeDelete = 1;
return emscripten_dispatch_to_thread_ptr(target_thread, _do_call, q);
if (emscripten_dispatch_to_thread_ptr(target_thread, _do_call, q)) {
return 1;
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No else after return.

em_queued_call_free(q);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new free is not part of a new API - it looks like a bugfix for an existing API, is that right?

return 0;
}
}

int emscripten_dispatch_to_thread_(pthread_t target_thread,
Expand Down Expand Up @@ -770,16 +777,18 @@ int emscripten_dispatch_to_thread_async_args(pthread_t target_thread,
void* satellite,
va_list args) {
// Setup is the same as in emscripten_dispatch_to_thread_args.
em_queued_call* q = em_queued_call_create(sig, func, satellite, args);
if (!q)
em_queued_call* q = em_queued_call_create(sig, func, satellite, 0, args);
if (!q) {
return 0;
}

q->calleeDelete = 1;
if (!emscripten_dispatch_to_thread_async_ptr(target_thread, _do_call, q)) {
if (emscripten_dispatch_to_thread_async_ptr(target_thread, _do_call, q)) {
return 1;
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

em_queued_call_free(q);
return 0;
}
return 1;
}

int emscripten_dispatch_to_thread_async_(pthread_t target_thread,
Expand All @@ -795,6 +804,201 @@ int emscripten_dispatch_to_thread_async_(pthread_t target_thread,
return ret;
}

typedef struct em_sync_ctx {
void (*func)(void*);
void* arg;
} em_sync_ctx;

// Helper for performing the user-provided function then synchronously calling
// `emscripten_async_as_sync_finish`. This lets us reuse the waiting logic from
// `emscripten_dispatch_to_thread_async_as_sync` without unnecessarily exposing
// the `em_async_as_sync_ctx` to the user code.
static void do_sync_call(em_async_as_sync_ctx* ctx, void* arg) {
em_sync_ctx* sync = (em_sync_ctx*)arg;
sync->func(sync->arg);
emscripten_async_as_sync_finish(ctx);
}

// Dispatch `func` to `target_thread` and wait until it has finished executing.
// Return 1 if the work was completed or 0 if it was not successfully
// dispatched.
int emscripten_dispatch_to_thread_sync_ptr(pthread_t target_thread,
void (*func)(void*),
void* arg) {
em_sync_ctx ctx = {func, arg};
return emscripten_dispatch_to_thread_async_as_sync_ptr(
target_thread, do_sync_call, &ctx);
}

int emscripten_dispatch_to_thread_sync_args(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
va_list args) {
// TODO: Stack allocate `q` but make sure its satellite data is still freed.
em_queued_call* q = em_queued_call_create(sig, func, satellite, 0, args);
if (!q) {
return 0;
}

q->calleeDelete = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is calleeDelete, why do we free on line 848?

if (emscripten_dispatch_to_thread_sync_ptr(target_thread, _do_call, q)) {
return 1;
} else {
em_queued_call_free(q);
return 0;
}
}

int emscripten_dispatch_to_thread_sync_(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
...) {
va_list args;
va_start(args, satellite);
int ret = emscripten_dispatch_to_thread_sync_args(
target_thread, sig, func, satellite, args);
va_end(args);
return ret;
}

struct em_async_as_sync_ctx {
// The function being dispatched and its argument.
void (*func)(struct em_async_as_sync_ctx*, void*);
void* arg;
// Allow the dispatching thread to wait for the work to be finished.
pthread_mutex_t mutex;
pthread_cond_t cond;
// Set to 1 when the work is finished.
int done;
};

static void init_em_async_as_sync_ctx(em_async_as_sync_ctx* ctx,
void (*func)(struct em_async_as_sync_ctx*,
void*),
void* arg) {
ctx->func = func;
ctx->arg = arg;
pthread_mutex_init(&ctx->mutex, NULL);
pthread_cond_init(&ctx->cond, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these take non-trivial time we could store them as thread local perhaps. TODO?

ctx->done = 0;
}

static void destroy_em_async_as_sync_ctx(em_async_as_sync_ctx* ctx) {
pthread_mutex_destroy(&ctx->mutex);
pthread_cond_destroy(&ctx->cond);
}

// Helper for exposing the `em_async_as_sync_ctx` to the user-provided async
// work function.
static void do_async_as_sync_call(void* arg) {
em_async_as_sync_ctx* ctx = (em_async_as_sync_ctx*)arg;
ctx->func(ctx, ctx->arg);
}

// Dispatch `func` to `target_thread` and wait until
// `emscripten_async_as_sync_finish` is called on the
// `em_async_as_sync_ctx*` passed to `func`, possibly at some point after `func`
// returns. Return 1 if the work was completed or 0 if it was not successfully
// dispatched.
int emscripten_dispatch_to_thread_async_as_sync_ptr(
pthread_t target_thread,
void (*func)(em_async_as_sync_ctx*, void*),
void* arg) {
// Initialize the context that will be used to wait for the result of the work
// on the original thread.
em_async_as_sync_ctx ctx;
init_em_async_as_sync_ctx(&ctx, func, arg);

// Schedule `func` to run on the target thread.
if (!emscripten_dispatch_to_thread_ptr(
target_thread, do_async_as_sync_call, &ctx)) {
destroy_em_async_as_sync_ctx(&ctx);
return 0;
}

// Wait for the work to be marked done by `emscripten_async_as_sync_finish`.
pthread_mutex_lock(&ctx.mutex);
while (!ctx.done) {
// A thread cannot both perform asynchronous work and synchronously wait for
// that work to be finished. If we were proxying to the current thread, the
// work must have been synchronous and should already be done.
assert(!pthread_equal(target_thread, pthread_self()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this assert no just be done once at the very top of the function?

pthread_cond_wait(&ctx.cond, &ctx.mutex);
}
pthread_mutex_unlock(&ctx.mutex);

// The work has been finished. Clean up and return.
destroy_em_async_as_sync_ctx(&ctx);
return 1;
}

// Helper for injecting a `em_async_as_sync_ctx` argument into an
// `em_queued_call` and calling it.
static void do_set_ctx_and_call(em_async_as_sync_ctx* ctx, void* arg) {
em_queued_call* q = (em_queued_call*)arg;
// Set the first argument to be the `ctx` pointer.
#ifdef __wasm32__
q->args[0].i = (int)ctx;
#else
#ifdef __wasm64__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif defined

q->args[0].i64 = (int64_t)ctx;
#else
#error "expected either __wasm32__ or __wasm64__"
#endif
#endif

// `q` is only used to kick off the async work, but its satellite data might
// need to live for the entirety of the async work, so we need to defer
// freeing `q` until after the async work has been completed.
_do_call(q);
}

int emscripten_dispatch_to_thread_async_as_sync_args(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
va_list args) {
// Leave argument 0 uninitialized; it will later be filled in with the pointer
// to the `em_async_as_sync_ctx`.
// TODO: Stack allocate `q` but make sure its satellite data is still freed.
em_queued_call* q = em_queued_call_create(sig, func, satellite, 1, args);
if (!q) {
return 0;
}

// `q` is only used to kick off the async work, but its satellite data might
// need to live for the entirety of the async work, so we need to defer
// freeing `q` until after the async work has been completed.
q->calleeDelete = 0;
int success = emscripten_dispatch_to_thread_async_as_sync_ptr(
target_thread, do_set_ctx_and_call, q);
em_queued_call_free(q);
return success;
}

int emscripten_dispatch_to_thread_async_as_sync_(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func,
void* satellite,
...) {
va_list args;
va_start(args, satellite);
int ret = emscripten_dispatch_to_thread_async_as_sync_args(
target_thread, sig, func, satellite, args);
va_end(args);
return ret;
}

void emscripten_async_as_sync_finish(em_async_as_sync_ctx* ctx) {
// Mark this work as done and wake up the invoking thread.
pthread_mutex_lock(&ctx->mutex);
ctx->done = 1;
pthread_mutex_unlock(&ctx->mutex);
pthread_cond_signal(&ctx->cond);
}

// Stores the memory address that the main thread is waiting on, if any. If
// the main thread is waiting, we wake it up before waking up any workers.
EMSCRIPTEN_KEEPALIVE void* _emscripten_main_thread_futex;
Expand Down
Loading