|  | 
|  | 1 | +#undef NDEBUG | 
|  | 2 | +#include <assert.h> | 
|  | 3 | +#include <node_api.h> | 
|  | 4 | +#include <stdlib.h> | 
|  | 5 | +#include <uv.h> | 
|  | 6 | +#include "../../js-native-api/common.h" | 
|  | 7 | + | 
|  | 8 | +typedef struct { | 
|  | 9 | +  uv_mutex_t mutex; | 
|  | 10 | +  uint32_t threadpool_size; | 
|  | 11 | +  uint32_t n_tasks_started; | 
|  | 12 | +  uint32_t n_tasks_exited; | 
|  | 13 | +  uint32_t n_tasks_finalized; | 
|  | 14 | +  bool observed_saturation; | 
|  | 15 | +} async_shared_data; | 
|  | 16 | + | 
|  | 17 | +typedef struct { | 
|  | 18 | +  uint32_t task_id; | 
|  | 19 | +  async_shared_data* shared_data; | 
|  | 20 | +  napi_async_work request; | 
|  | 21 | +} async_carrier; | 
|  | 22 | + | 
|  | 23 | +static inline bool all_tasks_started(async_shared_data* d) { | 
|  | 24 | +  assert(d->n_tasks_started <= d->threadpool_size + 1); | 
|  | 25 | +  return d->n_tasks_started == d->threadpool_size + 1; | 
|  | 26 | +} | 
|  | 27 | + | 
|  | 28 | +static inline bool all_tasks_exited(async_shared_data* d) { | 
|  | 29 | +  assert(d->n_tasks_exited <= d->n_tasks_started); | 
|  | 30 | +  return all_tasks_started(d) && d->n_tasks_exited == d->n_tasks_started; | 
|  | 31 | +} | 
|  | 32 | + | 
|  | 33 | +static inline bool all_tasks_finalized(async_shared_data* d) { | 
|  | 34 | +  assert(d->n_tasks_finalized <= d->n_tasks_exited); | 
|  | 35 | +  return all_tasks_exited(d) && d->n_tasks_finalized == d->n_tasks_exited; | 
|  | 36 | +} | 
|  | 37 | + | 
|  | 38 | +static inline bool still_saturating(async_shared_data* d) { | 
|  | 39 | +  return d->n_tasks_started < d->threadpool_size; | 
|  | 40 | +} | 
|  | 41 | + | 
|  | 42 | +static inline bool threadpool_saturated(async_shared_data* d) { | 
|  | 43 | +  return d->n_tasks_started == d->threadpool_size && d->n_tasks_exited == 0; | 
|  | 44 | +} | 
|  | 45 | + | 
|  | 46 | +static inline bool threadpool_desaturating(async_shared_data* d) { | 
|  | 47 | +  return d->n_tasks_started >= d->threadpool_size && d->n_tasks_exited != 0; | 
|  | 48 | +} | 
|  | 49 | + | 
|  | 50 | +static inline void print_info(const char* label, async_carrier* c) { | 
|  | 51 | +  async_shared_data* d = c->shared_data; | 
|  | 52 | +  printf("%s task_id=%u n_tasks_started=%u n_tasks_exited=%u " | 
|  | 53 | +         "n_tasks_finalized=%u observed_saturation=%d\n", | 
|  | 54 | +         label, | 
|  | 55 | +         c->task_id, | 
|  | 56 | +         d->n_tasks_started, | 
|  | 57 | +         d->n_tasks_exited, | 
|  | 58 | +         d->n_tasks_finalized, | 
|  | 59 | +         d->observed_saturation); | 
|  | 60 | +} | 
|  | 61 | + | 
|  | 62 | +static void Execute(napi_env env, void* data) { | 
|  | 63 | +  async_carrier* c = (async_carrier*)data; | 
|  | 64 | +  async_shared_data* d = c->shared_data; | 
|  | 65 | + | 
|  | 66 | +  // As long as fewer than threadpool_size async tasks have been started, more | 
|  | 67 | +  // should be started (eventually). Only once that happens should scheduled | 
|  | 68 | +  // async tasks remain queued. | 
|  | 69 | +  uv_mutex_lock(&d->mutex); | 
|  | 70 | +  bool should_be_concurrent = still_saturating(d); | 
|  | 71 | +  d->n_tasks_started++; | 
|  | 72 | +  assert(d->n_tasks_started <= d->threadpool_size + 1); | 
|  | 73 | + | 
|  | 74 | +  print_info("start", c); | 
|  | 75 | + | 
|  | 76 | +  if (should_be_concurrent) { | 
|  | 77 | +    // Wait for the thread pool to be saturated. This is not an elegant way of | 
|  | 78 | +    // doing so, but it really does not matter much here. | 
|  | 79 | +    while (still_saturating(d)) { | 
|  | 80 | +      print_info("waiting", c); | 
|  | 81 | +      uv_mutex_unlock(&d->mutex); | 
|  | 82 | +      uv_sleep(100); | 
|  | 83 | +      uv_mutex_lock(&d->mutex); | 
|  | 84 | +    } | 
|  | 85 | + | 
|  | 86 | +    // One async task will observe that the threadpool is saturated, that is, | 
|  | 87 | +    // that threadpool_size tasks have been started and none have exited yet. | 
|  | 88 | +    // That task will be the first to exit. | 
|  | 89 | +    if (!d->observed_saturation) { | 
|  | 90 | +      assert(threadpool_saturated(d)); | 
|  | 91 | +      d->observed_saturation = true; | 
|  | 92 | +    } else { | 
|  | 93 | +      assert(threadpool_saturated(d) || threadpool_desaturating(d)); | 
|  | 94 | +    } | 
|  | 95 | +  } else { | 
|  | 96 | +    // If this task is not among the first threadpool_size tasks, it should not | 
|  | 97 | +    // have been started unless other tasks have already finished. | 
|  | 98 | +    assert(threadpool_desaturating(d)); | 
|  | 99 | +  } | 
|  | 100 | + | 
|  | 101 | +  print_info("exit", c); | 
|  | 102 | + | 
|  | 103 | +  // Allow other tasks to access the shared data. If the thread pool is actually | 
|  | 104 | +  // larger than threadpool_size, this allows an extraneous task to start, which | 
|  | 105 | +  // will lead to an assertion error. | 
|  | 106 | +  uv_mutex_unlock(&d->mutex); | 
|  | 107 | +  uv_sleep(1000); | 
|  | 108 | +  uv_mutex_lock(&d->mutex); | 
|  | 109 | + | 
|  | 110 | +  d->n_tasks_exited++; | 
|  | 111 | +  uv_mutex_unlock(&d->mutex); | 
|  | 112 | +} | 
|  | 113 | + | 
|  | 114 | +static void Complete(napi_env env, napi_status status, void* data) { | 
|  | 115 | +  async_carrier* c = (async_carrier*)data; | 
|  | 116 | +  async_shared_data* d = c->shared_data; | 
|  | 117 | + | 
|  | 118 | +  if (status != napi_ok) { | 
|  | 119 | +    napi_throw_type_error(env, NULL, "Execute callback failed."); | 
|  | 120 | +    return; | 
|  | 121 | +  } | 
|  | 122 | + | 
|  | 123 | +  uv_mutex_lock(&d->mutex); | 
|  | 124 | +  assert(threadpool_desaturating(d)); | 
|  | 125 | +  d->n_tasks_finalized++; | 
|  | 126 | +  print_info("finalize", c); | 
|  | 127 | +  if (all_tasks_finalized(d)) { | 
|  | 128 | +    uv_mutex_unlock(&d->mutex); | 
|  | 129 | +    uv_mutex_destroy(&d->mutex); | 
|  | 130 | +    free(d); | 
|  | 131 | +  } else { | 
|  | 132 | +    uv_mutex_unlock(&d->mutex); | 
|  | 133 | +  } | 
|  | 134 | + | 
|  | 135 | +  NODE_API_CALL_RETURN_VOID(env, napi_delete_async_work(env, c->request)); | 
|  | 136 | +  free(c); | 
|  | 137 | +} | 
|  | 138 | + | 
|  | 139 | +static napi_value Test(napi_env env, napi_callback_info info) { | 
|  | 140 | +  size_t argc = 1; | 
|  | 141 | +  napi_value argv[1]; | 
|  | 142 | +  napi_value this; | 
|  | 143 | +  void* data; | 
|  | 144 | +  NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, &this, &data)); | 
|  | 145 | +  NODE_API_ASSERT(env, argc >= 1, "Not enough arguments, expected 1."); | 
|  | 146 | + | 
|  | 147 | +  async_shared_data* shared_data = calloc(1, sizeof(async_shared_data)); | 
|  | 148 | +  assert(shared_data != NULL); | 
|  | 149 | +  int ret = uv_mutex_init(&shared_data->mutex); | 
|  | 150 | +  assert(ret == 0); | 
|  | 151 | + | 
|  | 152 | +  napi_valuetype t; | 
|  | 153 | +  NODE_API_CALL(env, napi_typeof(env, argv[0], &t)); | 
|  | 154 | +  NODE_API_ASSERT( | 
|  | 155 | +      env, t == napi_number, "Wrong first argument, integer expected."); | 
|  | 156 | +  NODE_API_CALL( | 
|  | 157 | +      env, napi_get_value_uint32(env, argv[0], &shared_data->threadpool_size)); | 
|  | 158 | + | 
|  | 159 | +  napi_value resource_name; | 
|  | 160 | +  NODE_API_CALL(env, | 
|  | 161 | +                napi_create_string_utf8( | 
|  | 162 | +                    env, "TestResource", NAPI_AUTO_LENGTH, &resource_name)); | 
|  | 163 | + | 
|  | 164 | +  for (uint32_t i = 0; i <= shared_data->threadpool_size; i++) { | 
|  | 165 | +    async_carrier* carrier = malloc(sizeof(async_carrier)); | 
|  | 166 | +    assert(carrier != NULL); | 
|  | 167 | +    carrier->task_id = i; | 
|  | 168 | +    carrier->shared_data = shared_data; | 
|  | 169 | +    NODE_API_CALL(env, | 
|  | 170 | +                  napi_create_async_work(env, | 
|  | 171 | +                                         NULL, | 
|  | 172 | +                                         resource_name, | 
|  | 173 | +                                         Execute, | 
|  | 174 | +                                         Complete, | 
|  | 175 | +                                         carrier, | 
|  | 176 | +                                         &carrier->request)); | 
|  | 177 | +    NODE_API_CALL(env, napi_queue_async_work(env, carrier->request)); | 
|  | 178 | +  } | 
|  | 179 | + | 
|  | 180 | +  return NULL; | 
|  | 181 | +} | 
|  | 182 | + | 
|  | 183 | +static napi_value Init(napi_env env, napi_value exports) { | 
|  | 184 | +  napi_property_descriptor desc = DECLARE_NODE_API_PROPERTY("test", Test); | 
|  | 185 | +  NODE_API_CALL(env, napi_define_properties(env, exports, 1, &desc)); | 
|  | 186 | +  return exports; | 
|  | 187 | +} | 
|  | 188 | + | 
|  | 189 | +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) | 
0 commit comments