@@ -2014,10 +2014,11 @@ struct ggml_threadpool {
2014
2014
// these are atomic as an annotation for thread-sanitizer
2015
2015
atomic_bool stop; // Used for stopping the threadpool altogether
2016
2016
atomic_bool pause; // Used for pausing the threadpool or individual threads
2017
+ atomic_bool abort; // Used for aborting processing of a graph
2017
2018
2018
2019
struct ggml_compute_state * workers; // per thread state
2019
2020
int n_threads_max; // number of threads in the pool
2020
- int n_threads_cur; // number of threads used in the current graph
2021
+ atomic_int n_threads_cur; // number of threads used in the current graph
2021
2022
2022
2023
int32_t prio; // Scheduling priority
2023
2024
uint32_t poll; // Polling level (0 - no polling)
@@ -3181,41 +3182,36 @@ inline static void ggml_critical_section_start(void) {
3181
3182
}
3182
3183
}
3183
3184
3184
- #ifdef GGML_USE_OPENMP
3185
- static void ggml_barrier(struct ggml_threadpool * threadpool) {
3186
- if (threadpool->n_threads_cur == 1) {
3185
+ static void ggml_barrier(struct ggml_threadpool * tp) {
3186
+ int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
3187
+ if (n_threads == 1) {
3187
3188
return;
3188
3189
}
3189
3190
3191
+ #ifdef GGML_USE_OPENMP
3190
3192
#pragma omp barrier
3191
- }
3192
3193
#else
3193
- static void ggml_barrier(struct ggml_threadpool * threadpool) {
3194
- if (threadpool->n_threads_cur == 1) {
3195
- return;
3196
- }
3197
-
3198
- atomic_int * n_barrier = &threadpool->n_barrier;
3199
- atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
3194
+ int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
3200
3195
3201
- int n_threads = threadpool->n_threads_cur;
3202
- int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed );
3196
+ // enter barrier (full seq-cst fence)
3197
+ int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst );
3203
3198
3204
- if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
3199
+ int last = 0;
3200
+ if (n_barrier == (n_threads - 1)) {
3205
3201
// last thread
3206
- atomic_store( n_barrier, 0);
3207
- atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed) ;
3202
+ atomic_store_explicit(&tp-> n_barrier, 0, memory_order_relaxed );
3203
+ last = 1 ;
3208
3204
} else {
3209
3205
// wait for other threads
3210
- while (true) {
3211
- if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
3212
- return;
3213
- }
3206
+ while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
3214
3207
ggml_thread_cpu_relax();
3215
3208
}
3216
3209
}
3217
- }
3210
+
3211
+ // exit barrier (full seq-cst fence)
3212
+ atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
3218
3213
#endif
3214
+ }
3219
3215
3220
3216
// TODO: make this somehow automatically executed
3221
3217
// some sort of "sentry" mechanism
@@ -20185,64 +20181,84 @@ struct ggml_cplan ggml_graph_plan(
20185
20181
20186
20182
static thread_ret_t ggml_graph_compute_thread(void * data) {
20187
20183
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
20184
+ struct ggml_threadpool * tp = state->threadpool;
20188
20185
20189
- const struct ggml_cgraph * cgraph = state->threadpool ->cgraph;
20190
- const struct ggml_cplan * cplan = state->threadpool ->cplan;
20186
+ const struct ggml_cgraph * cgraph = tp ->cgraph;
20187
+ const struct ggml_cplan * cplan = tp ->cplan;
20191
20188
20192
20189
set_numa_thread_affinity(state->ith);
20193
20190
20194
20191
struct ggml_compute_params params = {
20195
20192
/*.ith =*/ state->ith,
20196
- /*.nth =*/ state->threadpool-> n_threads_cur,
20193
+ /*.nth =*/ atomic_load_explicit(&tp-> n_threads_cur, memory_order_relaxed) ,
20197
20194
/*.wsize =*/ cplan->work_size,
20198
20195
/*.wdata =*/ cplan->work_data,
20199
- /*.threadpool=*/ state->threadpool ,
20196
+ /*.threadpool=*/ tp ,
20200
20197
};
20201
20198
20202
- for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
20199
+ for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort ; node_n++) {
20203
20200
struct ggml_tensor * node = cgraph->nodes[node_n];
20204
20201
20205
20202
ggml_compute_forward(¶ms, node);
20206
20203
20207
- if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
20208
- state->threadpool->ec = GGML_STATUS_ABORTED;
20204
+ if (state->ith == 0 && cplan->abort_callback &&
20205
+ cplan->abort_callback(cplan->abort_callback_data)) {
20206
+ tp->abort = true;
20207
+ tp->ec = GGML_STATUS_ABORTED;
20209
20208
}
20210
20209
20211
20210
ggml_barrier(state->threadpool);
20212
-
20213
- if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
20214
- break;
20215
- }
20216
20211
}
20217
20212
20218
20213
return 0;
20219
20214
}
20220
20215
20221
20216
#ifndef GGML_USE_OPENMP
20222
20217
20223
- static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
20218
+ // check if thread is active
20219
+ static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
20220
+ struct ggml_threadpool * threadpool = state->threadpool;
20221
+ int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
20222
+ return (state->ith < n_threads);
20223
+ }
20224
+
20225
+ // check if thread is ready to proceed (exit from polling or sleeping)
20226
+ static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
20224
20227
struct ggml_threadpool * threadpool = state->threadpool;
20225
20228
20226
20229
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
20227
20230
20228
20231
// check for new graph/work
20229
20232
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
20230
20233
if (new_graph != state->last_graph) {
20231
- state->pending = (state->ith < threadpool->n_threads_cur );
20234
+ state->pending = ggml_graph_compute_thread_active (state);
20232
20235
state->last_graph = new_graph;
20233
20236
}
20234
20237
20235
20238
return state->pending;
20236
20239
}
20237
20240
20241
+ // sync thread state after polling
20242
+ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
20243
+ struct ggml_threadpool * threadpool = state->threadpool;
20244
+ // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
20245
+ // so instead we just use a dummy read-modify-write
20246
+ atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
20247
+ }
20248
+
20238
20249
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
20239
20250
struct ggml_threadpool * threadpool = state->threadpool;
20240
20251
20252
+ // Skip polling for unused threads
20253
+ if (!ggml_graph_compute_thread_active(state)) {
20254
+ return state->pending;
20255
+ }
20256
+
20241
20257
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
20242
20258
// Perhaps, we can adjust it dynamically based on load and things.
20243
20259
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
20244
20260
20245
- for (uint64_t i=0; !ggml_graph_compute_ready (state) && i< n_rounds; i++) {
20261
+ for (uint64_t i=0; !ggml_graph_compute_thread_ready (state) && i < n_rounds; i++) {
20246
20262
// No new work. Keep polling.
20247
20263
ggml_thread_cpu_relax();
20248
20264
}
@@ -20254,13 +20270,14 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
20254
20270
struct ggml_threadpool * threadpool = state->threadpool;
20255
20271
20256
20272
if (ggml_graph_compute_poll_for_work(state)) {
20273
+ ggml_graph_compute_thread_sync(state);
20257
20274
return state->pending;
20258
20275
}
20259
20276
20260
20277
ggml_mutex_lock_shared(&threadpool->mutex);
20261
- while (!ggml_graph_compute_ready (state)) {
20278
+ while (!ggml_graph_compute_thread_ready (state)) {
20262
20279
// No new work. Wait for the signal.
20263
- GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
20280
+ GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping) \n", state->ith);
20264
20281
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
20265
20282
}
20266
20283
ggml_mutex_unlock_shared(&threadpool->mutex);
@@ -20307,13 +20324,20 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
20307
20324
}
20308
20325
20309
20326
// Start processing new graph
20310
- static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
20327
+ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads )
20311
20328
{
20312
- // always take the mutex here because the worker threads are doing hybrid poll/wait
20329
+ // Always take the mutex here because the worker threads are doing hybrid poll/wait
20313
20330
20314
20331
ggml_mutex_lock(&threadpool->mutex);
20315
20332
20316
- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
20333
+ GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
20334
+
20335
+ // Update the number of active threads
20336
+ atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
20337
+
20338
+ // Indicate the graph is ready to be processed
20339
+ // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
20340
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
20317
20341
20318
20342
if (threadpool->pause) {
20319
20343
// Update main thread prio and affinity to match the threadpool settings
@@ -20372,6 +20396,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
20372
20396
threadpool->current_chunk = 0;
20373
20397
threadpool->stop = false;
20374
20398
threadpool->pause = tpp->paused;
20399
+ threadpool->abort = false;
20375
20400
threadpool->workers = NULL;
20376
20401
threadpool->n_threads_max = tpp->n_threads;
20377
20402
threadpool->n_threads_cur = tpp->n_threads;
@@ -20447,15 +20472,11 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20447
20472
// No worker threads should be accessing the parameters below at this stage
20448
20473
threadpool->cgraph = cgraph;
20449
20474
threadpool->cplan = cplan;
20450
- threadpool->n_threads_cur = n_threads;
20451
20475
threadpool->current_chunk = 0;
20476
+ threadpool->abort = false;
20452
20477
threadpool->ec = GGML_STATUS_SUCCESS;
20453
20478
}
20454
20479
20455
- if (n_threads > threadpool->n_threads_max) {
20456
- GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
20457
- }
20458
-
20459
20480
#ifdef GGML_USE_OPENMP
20460
20481
if (n_threads > 1) {
20461
20482
#pragma omp parallel num_threads(n_threads)
@@ -20464,7 +20485,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20464
20485
{
20465
20486
// update the number of threads from the actual number of threads that we got from OpenMP
20466
20487
n_threads = omp_get_num_threads();
20467
- threadpool->n_threads_cur = n_threads;
20488
+ atomic_store_explicit(& threadpool->n_threads_cur, n_threads, memory_order_relaxed) ;
20468
20489
}
20469
20490
20470
20491
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@@ -20474,8 +20495,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20474
20495
ggml_graph_compute_thread(&threadpool->workers[0]);
20475
20496
}
20476
20497
#else
20498
+ if (n_threads > threadpool->n_threads_max) {
20499
+ GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
20500
+ n_threads = threadpool->n_threads_max;
20501
+ }
20502
+
20477
20503
// Kick all threads to start the new graph
20478
- ggml_graph_compute_kickoff(threadpool);
20504
+ ggml_graph_compute_kickoff(threadpool, n_threads );
20479
20505
20480
20506
// This is a work thread too
20481
20507
ggml_graph_compute_thread(&threadpool->workers[0]);
0 commit comments