Skip to content

Commit 28be33e

Browse files
committed
Rewrite the old proxying API in terms of the new API
Rewrite the old threading.h proxying API used for internal system call implementations in terms of the new proxying API introduced in #15737.
1 parent e4b05eb commit 28be33e

File tree

4 files changed

+35
-212
lines changed

4 files changed

+35
-212
lines changed

src/library_pthread.js

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,9 @@ var LibraryPThread = {
250250
return;
251251
}
252252

253-
if (cmd === 'processQueuedMainThreadWork') {
253+
if (cmd === 'processProxyingQueue') {
254254
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode.
255-
_emscripten_main_thread_process_queued_calls();
255+
_emscripten_proxy_execute_queue(d['queue']);
256256
} else if (cmd === 'spawnThread') {
257257
spawnThread(d);
258258
} else if (cmd === 'cleanupThread') {
@@ -1149,29 +1149,6 @@ var LibraryPThread = {
11491149
return {{{ makeDynCall('ii', 'ptr') }}}(arg);
11501150
},
11511151

1152-
// This function is called internally to notify target thread ID that it has messages it needs to
1153-
// process in its message queue inside the Wasm heap. As a helper, the caller must also pass the
1154-
// ID of the main browser thread to this function, to avoid needlessly ping-ponging between JS and
1155-
// Wasm boundaries.
1156-
_emscripten_notify_thread_queue: function(targetThreadId, mainThreadId) {
1157-
if (targetThreadId == mainThreadId) {
1158-
postMessage({'cmd' : 'processQueuedMainThreadWork'});
1159-
} else if (ENVIRONMENT_IS_PTHREAD) {
1160-
postMessage({'targetThread': targetThreadId, 'cmd': 'processThreadQueue'});
1161-
} else {
1162-
var pthread = PThread.pthreads[targetThreadId];
1163-
var worker = pthread && pthread.worker;
1164-
if (!worker) {
1165-
#if ASSERTIONS
1166-
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
1167-
#endif
1168-
return /*0*/;
1169-
}
1170-
worker.postMessage({'cmd' : 'processThreadQueue'});
1171-
}
1172-
return 1;
1173-
},
1174-
11751152
_emscripten_notify_proxying_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
11761153
if (targetThreadId == currThreadId) {
11771154
setTimeout(function() { _emscripten_proxy_execute_queue(queue); });

src/worker.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,6 @@ self.onmessage = function(e) {
272272
}
273273
} else if (e.data.target === 'setimmediate') {
274274
// no-op
275-
} else if (e.data.cmd === 'processThreadQueue') {
276-
if (Module['_pthread_self']()) { // If this thread is actually running?
277-
Module['_emscripten_current_thread_process_queued_calls']();
278-
}
279275
} else if (e.data.cmd === 'processProxyingQueue') {
280276
if (Module['_pthread_self']()) { // If this thread is actually running?
281277
Module['_emscripten_proxy_execute_queue'](e.data.queue);

system/lib/pthread/library_pthread.c

Lines changed: 30 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <emscripten/stack.h>
3636

3737
#include "threading_internal.h"
38+
#include "proxying.h"
3839

3940
void __pthread_testcancel();
4041

@@ -183,7 +184,6 @@ void emscripten_async_waitable_close(em_queued_call* call) {
183184
}
184185

185186
extern double emscripten_receive_on_main_thread_js(int functionIndex, int numCallArgs, double* args);
186-
extern int _emscripten_notify_thread_queue(pthread_t targetThreadId, pthread_t mainThreadId);
187187
extern int __pthread_create_js(struct pthread *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
188188

189189
static void _do_call(void* arg) {
@@ -335,60 +335,6 @@ static void _do_call(void* arg) {
335335
}
336336
}
337337

338-
#define CALL_QUEUE_SIZE 128
339-
340-
// Shared data synchronized by call_queue_lock.
341-
typedef struct CallQueueEntry {
342-
void (*func)(void*);
343-
void* arg;
344-
} CallQueueEntry;
345-
346-
typedef struct CallQueue {
347-
void* target_thread;
348-
CallQueueEntry* call_queue;
349-
int call_queue_head;
350-
int call_queue_tail;
351-
struct CallQueue* next;
352-
} CallQueue;
353-
354-
// Currently global to the queue, but this can be improved to be per-queue specific. (TODO: with
355-
// lockfree list operations on callQueue_head, or removing the list by moving this data to
356-
// pthread_t)
357-
static pthread_mutex_t call_queue_lock = PTHREAD_MUTEX_INITIALIZER;
358-
static CallQueue* callQueue_head = 0;
359-
360-
// Not thread safe, call while having call_queue_lock obtained.
361-
static CallQueue* GetQueue(void* target) {
362-
assert(target);
363-
CallQueue* q = callQueue_head;
364-
while (q && q->target_thread != target)
365-
q = q->next;
366-
return q;
367-
}
368-
369-
// Not thread safe, call while having call_queue_lock obtained.
370-
static CallQueue* GetOrAllocateQueue(void* target) {
371-
CallQueue* q = GetQueue(target);
372-
if (q)
373-
return q;
374-
375-
q = (CallQueue*)malloc(sizeof(CallQueue));
376-
q->target_thread = target;
377-
q->call_queue = 0;
378-
q->call_queue_head = 0;
379-
q->call_queue_tail = 0;
380-
q->next = 0;
381-
if (callQueue_head) {
382-
CallQueue* last = callQueue_head;
383-
while (last->next)
384-
last = last->next;
385-
last->next = q;
386-
} else {
387-
callQueue_head = q;
388-
}
389-
return q;
390-
}
391-
392338
EMSCRIPTEN_RESULT emscripten_wait_for_call_v(em_queued_call* call, double timeoutMSecs) {
393339
int r;
394340

@@ -424,85 +370,43 @@ pthread_t emscripten_main_browser_thread_id() {
424370
return &__main_pthread;
425371
}
426372

427-
int _emscripten_do_dispatch_to_thread(pthread_t target_thread, em_queued_call* call) {
428-
assert(call);
429-
430-
// #if PTHREADS_DEBUG // TODO: Create a debug version of pthreads library
431-
// EM_ASM_INT({dump('thread ' + _pthread_self() + ' (ENVIRONMENT_IS_WORKER: ' +
432-
//ENVIRONMENT_IS_WORKER + '), queueing call of function enum=' + $0 + '/ptr=' + $1 + ' on thread '
433-
//+ $2 + '\n' + new Error().stack)}, call->functionEnum, call->functionPtr, target_thread);
434-
// #endif
435-
436-
// Can't be a null pointer here, and can't be
437-
// EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD either.
373+
static pthread_t normalize_thread(pthread_t target_thread) {
438374
assert(target_thread);
439-
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD)
440-
target_thread = emscripten_main_browser_thread_id();
441-
442-
// If we are the target recipient of this message, we can just call the operation directly.
443-
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD ||
444-
target_thread == pthread_self()) {
445-
_do_call(call);
446-
return 1;
375+
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD) {
376+
return emscripten_main_browser_thread_id();
447377
}
448-
449-
// Add the operation to the call queue of the main runtime thread.
450-
pthread_mutex_lock(&call_queue_lock);
451-
CallQueue* q = GetOrAllocateQueue(target_thread);
452-
if (!q->call_queue) {
453-
// Shared data synchronized by call_queue_lock.
454-
q->call_queue = malloc(sizeof(CallQueueEntry) * CALL_QUEUE_SIZE);
378+
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD) {
379+
return pthread_self();
455380
}
381+
return target_thread;
382+
}
456383

457-
int head = q->call_queue_head;
458-
int tail = q->call_queue_tail;
459-
int new_tail = (tail + 1) % CALL_QUEUE_SIZE;
460-
461-
while (new_tail == head) { // Queue is full?
462-
pthread_mutex_unlock(&call_queue_lock);
463-
464-
// If queue of the main browser thread is full, then we wait. (never drop messages for the main
465-
// browser thread)
466-
if (target_thread == emscripten_main_browser_thread_id()) {
467-
emscripten_futex_wait((void*)&q->call_queue_head, head, INFINITY);
468-
pthread_mutex_lock(&call_queue_lock);
469-
head = q->call_queue_head;
470-
tail = q->call_queue_tail;
471-
new_tail = (tail + 1) % CALL_QUEUE_SIZE;
472-
} else {
473-
// For the queues of other threads, just drop the message.
474-
// #if DEBUG TODO: a debug build of pthreads library?
475-
// EM_ASM(console.error('Pthread queue overflowed, dropping queued
476-
//message to thread. ' + new Error().stack));
477-
// #endif
478-
em_queued_call_free(call);
479-
return 0;
480-
}
384+
// Execute `call` and return 1 only if already on the `target_thread`. Otherwise
385+
// return 0.
386+
static int maybe_call_on_current_thread(pthread_t target_thread,
387+
em_queued_call* call) {
388+
if (pthread_equal(target_thread, pthread_self())) {
389+
_do_call(call);
390+
return 1;
481391
}
392+
return 0;
393+
}
482394

483-
q->call_queue[tail].func = _do_call;
484-
q->call_queue[tail].arg = call;
485-
486-
// If the call queue was empty, the main runtime thread is likely idle in the browser event loop,
487-
// so send a message to it to ensure that it wakes up to start processing the command we have
488-
// posted.
489-
if (head == tail) {
490-
int success = _emscripten_notify_thread_queue(target_thread, emscripten_main_browser_thread_id());
491-
// Failed to dispatch the thread, delete the crafted message.
492-
if (!success) {
493-
em_queued_call_free(call);
494-
pthread_mutex_unlock(&call_queue_lock);
495-
return 0;
496-
}
395+
// Execute or proxy `call`. Return 1 if the work was executed or otherwise
396+
// return 0.
397+
static int do_dispatch_to_thread(pthread_t target_thread,
398+
em_queued_call* call) {
399+
target_thread = normalize_thread(target_thread);
400+
if (maybe_call_on_current_thread(target_thread, call)) {
401+
return 1;
497402
}
498-
499-
q->call_queue_tail = new_tail;
500-
pthread_mutex_unlock(&call_queue_lock);
403+
emscripten_proxy_async(
404+
emscripten_proxy_get_system_queue(), target_thread, _do_call, call);
501405
return 0;
502406
}
503407

504408
void emscripten_async_run_in_main_thread(em_queued_call* call) {
505-
_emscripten_do_dispatch_to_thread(emscripten_main_browser_thread_id(), call);
409+
do_dispatch_to_thread(emscripten_main_browser_thread_id(), call);
506410
}
507411

508412
void emscripten_sync_run_in_main_thread(em_queued_call* call) {
@@ -603,50 +507,7 @@ void* emscripten_sync_run_in_main_thread_7(int function, void* arg1,
603507
}
604508

605509
void emscripten_current_thread_process_queued_calls() {
606-
// #if PTHREADS_DEBUG == 2
607-
// EM_ASM(console.error('thread ' + _pthread_self() + ':
608-
//emscripten_current_thread_process_queued_calls(), ' + new Error().stack));
609-
// #endif
610-
611-
static thread_local bool thread_is_processing_queued_calls = false;
612-
613-
// It is possible that when processing a queued call, the control flow leads back to calling this
614-
// function in a nested fashion! Therefore this scenario must explicitly be detected, and
615-
// processing the queue must be avoided if we are nesting, or otherwise the same queued calls
616-
// would be processed again and again.
617-
if (thread_is_processing_queued_calls)
618-
return;
619-
// This must be before pthread_mutex_lock(), since pthread_mutex_lock() can call back to this
620-
// function.
621-
thread_is_processing_queued_calls = true;
622-
623-
pthread_mutex_lock(&call_queue_lock);
624-
CallQueue* q = GetQueue(pthread_self());
625-
if (!q) {
626-
pthread_mutex_unlock(&call_queue_lock);
627-
thread_is_processing_queued_calls = false;
628-
return;
629-
}
630-
631-
int head = q->call_queue_head;
632-
int tail = q->call_queue_tail;
633-
while (head != tail) {
634-
// Assume that the call is heavy, so unlock access to the call queue while it is being
635-
// performed.
636-
pthread_mutex_unlock(&call_queue_lock);
637-
q->call_queue[head].func(q->call_queue[head].arg);
638-
pthread_mutex_lock(&call_queue_lock);
639-
640-
head = (head + 1) % CALL_QUEUE_SIZE;
641-
q->call_queue_head = head;
642-
tail = q->call_queue_tail;
643-
}
644-
pthread_mutex_unlock(&call_queue_lock);
645-
646-
// If the queue was full and we had waiters pending to get to put data to queue, wake them up.
647-
emscripten_futex_wake((void*)&q->call_queue_head, INT_MAX);
648-
649-
thread_is_processing_queued_calls = false;
510+
emscripten_proxy_execute_queue(emscripten_proxy_get_system_queue());
650511
}
651512

652513
// At times when we disallow the main thread to process queued calls, this will
@@ -747,17 +608,6 @@ em_queued_call* emscripten_async_waitable_run_in_main_runtime_thread_(
747608
return q;
748609
}
749610

750-
typedef struct DispatchToThreadArgs {
751-
pthread_t target_thread;
752-
em_queued_call* q;
753-
} DispatchToThreadArgs;
754-
755-
static void dispatch_to_thread_helper(void* user_data) {
756-
DispatchToThreadArgs* args = (DispatchToThreadArgs*)user_data;
757-
_emscripten_do_dispatch_to_thread(args->target_thread, args->q);
758-
free(user_data);
759-
}
760-
761611
int emscripten_dispatch_to_thread_args(pthread_t target_thread,
762612
EM_FUNC_SIGNATURE sig,
763613
void* func_ptr,
@@ -775,7 +625,7 @@ int emscripten_dispatch_to_thread_args(pthread_t target_thread,
775625

776626
// `q` will not be used after it is called, so let the call clean it up.
777627
q->calleeDelete = 1;
778-
return _emscripten_do_dispatch_to_thread(target_thread, q);
628+
return do_dispatch_to_thread(target_thread, q);
779629
}
780630

781631
int emscripten_dispatch_to_thread_(pthread_t target_thread,
@@ -806,10 +656,7 @@ int emscripten_dispatch_to_thread_async_args(pthread_t target_thread,
806656
q->calleeDelete = 1;
807657

808658
// Schedule the call to run later on this thread.
809-
DispatchToThreadArgs* args = malloc(sizeof(DispatchToThreadArgs));
810-
args->target_thread = target_thread;
811-
args->q = q;
812-
emscripten_set_timeout(dispatch_to_thread_helper, 0, args);
659+
emscripten_set_timeout(_do_call, 0, args);
813660
return 0;
814661
}
815662

tools/system_libs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,9 @@ class libc(DebugLibrary, AsanInstrumentedLibrary, MuslInternalLibrary, MTLibrary
776776
'-Wno-string-plus-int',
777777
'-Wno-pointer-sign']
778778

779+
# Include internal proxying header.
780+
cflags += [f'-I{utils.path_from_root("system/lib/pthread")}']
781+
779782
def get_files(self):
780783
libc_files = []
781784
musl_srcdir = utils.path_from_root('system/lib/libc/musl/src')

0 commit comments

Comments
 (0)