1212#include " tracing/traced_value.h"
1313#include " util-inl.h"
1414
15+ #include < atomic>
1516#include < memory>
1617
1718struct node_napi_env__ : public napi_env__ {
@@ -136,6 +137,7 @@ class ThreadSafeFunction : public node::AsyncResource {
136137 *v8::String::Utf8Value (env_->isolate, name)),
137138 thread_count(thread_count_),
138139 is_closing(false ),
140+ dispatch_state(kDispatchIdle ),
139141 context(context_),
140142 max_queue_size(max_queue_size_),
141143 env(env_),
@@ -175,10 +177,8 @@ class ThreadSafeFunction : public node::AsyncResource {
175177 return napi_closing;
176178 }
177179 } else {
178- if (uv_async_send (&async) != 0 ) {
179- return napi_generic_failure;
180- }
181180 queue.push (data);
181+ Send ();
182182 return napi_ok;
183183 }
184184 }
@@ -210,9 +210,7 @@ class ThreadSafeFunction : public node::AsyncResource {
210210 if (is_closing && max_queue_size > 0 ) {
211211 cond->Signal (lock);
212212 }
213- if (uv_async_send (&async) != 0 ) {
214- return napi_generic_failure;
215- }
213+ Send ();
216214 }
217215 }
218216
@@ -237,7 +235,6 @@ class ThreadSafeFunction : public node::AsyncResource {
237235 cond = std::make_unique<node::ConditionVariable>();
238236 }
239237 if (max_queue_size == 0 || cond) {
240- CHECK_EQ (0 , uv_idle_init (loop, &idle));
241238 return napi_ok;
242239 }
243240
@@ -262,21 +259,46 @@ class ThreadSafeFunction : public node::AsyncResource {
262259
263260 napi_status Unref () {
264261 uv_unref (reinterpret_cast <uv_handle_t *>(&async));
265- uv_unref (reinterpret_cast <uv_handle_t *>(&idle));
266262
267263 return napi_ok;
268264 }
269265
270266 napi_status Ref () {
271267 uv_ref (reinterpret_cast <uv_handle_t *>(&async));
272- uv_ref (reinterpret_cast <uv_handle_t *>(&idle));
273268
274269 return napi_ok;
275270 }
276271
277- void DispatchOne () {
272+ inline void * Context () {
273+ return context;
274+ }
275+
276+ protected:
277+ void Dispatch () {
278+ bool has_more = true ;
279+
280+ // Limit maximum synchronous iteration count to prevent event loop
281+ // starvation. See `src/node_messaging.cc` for an inspiration.
282+ unsigned int iterations_left = kMaxIterationCount ;
283+ while (has_more && --iterations_left != 0 ) {
284+ dispatch_state = kDispatchRunning ;
285+ has_more = DispatchOne ();
286+
287+ // Send() was called while we were executing the JS function
288+ if (dispatch_state.exchange (kDispatchIdle ) != kDispatchRunning ) {
289+ has_more = true ;
290+ }
291+ }
292+
293+ if (has_more) {
294+ Send ();
295+ }
296+ }
297+
298+ bool DispatchOne () {
278299 void * data = nullptr ;
279300 bool popped_value = false ;
301+ bool has_more = false ;
280302
281303 {
282304 node::Mutex::ScopedLock lock (this ->mutex );
@@ -301,9 +323,9 @@ class ThreadSafeFunction : public node::AsyncResource {
301323 cond->Signal (lock);
302324 }
303325 CloseHandlesAndMaybeDelete ();
304- } else {
305- CHECK_EQ (0 , uv_idle_stop (&idle));
306326 }
327+ } else {
328+ has_more = true ;
307329 }
308330 }
309331 }
@@ -321,6 +343,8 @@ class ThreadSafeFunction : public node::AsyncResource {
321343 call_js_cb (env, js_callback, context, data);
322344 });
323345 }
346+
347+ return has_more;
324348 }
325349
326350 void Finalize () {
@@ -334,10 +358,6 @@ class ThreadSafeFunction : public node::AsyncResource {
334358 EmptyQueueAndDelete ();
335359 }
336360
337- inline void * Context () {
338- return context;
339- }
340-
341361 void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
342362 v8::HandleScope scope (env->isolate );
343363 if (set_closing) {
@@ -357,18 +377,20 @@ class ThreadSafeFunction : public node::AsyncResource {
357377 ThreadSafeFunction* ts_fn =
358378 node::ContainerOf (&ThreadSafeFunction::async,
359379 reinterpret_cast <uv_async_t *>(handle));
360- v8::HandleScope scope (ts_fn->env ->isolate );
361- ts_fn->env ->node_env ()->CloseHandle (
362- reinterpret_cast <uv_handle_t *>(&ts_fn->idle ),
363- [](uv_handle_t * handle) -> void {
364- ThreadSafeFunction* ts_fn =
365- node::ContainerOf (&ThreadSafeFunction::idle,
366- reinterpret_cast <uv_idle_t *>(handle));
367- ts_fn->Finalize ();
368- });
380+ ts_fn->Finalize ();
369381 });
370382 }
371383
384+ void Send () {
385+ // Ask currently running Dispatch() to make one more iteration
386+ unsigned char current_state = dispatch_state.fetch_or (kDispatchPending );
387+ if ((current_state & kDispatchRunning ) == kDispatchRunning ) {
388+ return ;
389+ }
390+
391+ CHECK_EQ (0 , uv_async_send (&async));
392+ }
393+
372394 // Default way of calling into JavaScript. Used when ThreadSafeFunction is
373395 // without a call_js_cb_.
374396 static void CallJs (napi_env env, napi_value cb, void * context, void * data) {
@@ -392,16 +414,10 @@ class ThreadSafeFunction : public node::AsyncResource {
392414 }
393415 }
394416
395- static void IdleCb (uv_idle_t * idle) {
396- ThreadSafeFunction* ts_fn =
397- node::ContainerOf (&ThreadSafeFunction::idle, idle);
398- ts_fn->DispatchOne ();
399- }
400-
401417 static void AsyncCb (uv_async_t * async) {
402418 ThreadSafeFunction* ts_fn =
403419 node::ContainerOf (&ThreadSafeFunction::async, async);
404- CHECK_EQ ( 0 , uv_idle_start (& ts_fn->idle , IdleCb) );
420+ ts_fn->Dispatch ( );
405421 }
406422
407423 static void Cleanup (void * data) {
@@ -410,14 +426,20 @@ class ThreadSafeFunction : public node::AsyncResource {
410426 }
411427
412428 private:
429+ static const unsigned char kDispatchIdle = 0 ;
430+ static const unsigned char kDispatchRunning = 1 << 0 ;
431+ static const unsigned char kDispatchPending = 1 << 1 ;
432+
433+ static const unsigned int kMaxIterationCount = 1000 ;
434+
413435 // These are variables protected by the mutex.
414436 node::Mutex mutex;
415437 std::unique_ptr<node::ConditionVariable> cond;
416438 std::queue<void *> queue;
417439 uv_async_t async;
418- uv_idle_t idle;
419440 size_t thread_count;
420441 bool is_closing;
442+ std::atomic_uchar dispatch_state;
421443
422444 // These are variables set once, upon creation, and then never again, which
423445 // means we don't need the mutex to read them.
0 commit comments