@@ -2013,10 +2013,11 @@ struct ggml_threadpool {
2013
2013
// these are atomic as an annotation for thread-sanitizer
2014
2014
atomic_bool stop; // Used for stopping the threadpool altogether
2015
2015
atomic_bool pause; // Used for pausing the threadpool or individual threads
2016
+ atomic_bool abort; // Used for aborting processing of a graph
2016
2017
2017
2018
struct ggml_compute_state * workers; // per thread state
2018
2019
int n_threads_max; // number of threads in the pool
2019
- int n_threads_cur; // number of threads used in the current graph
2020
+ atomic_int n_threads_cur; // number of threads used in the current graph
2020
2021
2021
2022
int32_t prio; // Scheduling priority
2022
2023
uint32_t poll; // Polling level (0 - no polling)
@@ -3178,41 +3179,36 @@ inline static void ggml_critical_section_start(void) {
3178
3179
}
3179
3180
}
3180
3181
3181
- #ifdef GGML_USE_OPENMP
3182
- static void ggml_barrier(struct ggml_threadpool * threadpool) {
3183
- if (threadpool->n_threads_cur == 1) {
3182
+ static void ggml_barrier(struct ggml_threadpool * tp) {
3183
+ int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
3184
+ if (n_threads == 1) {
3184
3185
return;
3185
3186
}
3186
3187
3188
+ #ifdef GGML_USE_OPENMP
3187
3189
#pragma omp barrier
3188
- }
3189
3190
#else
3190
- static void ggml_barrier(struct ggml_threadpool * threadpool) {
3191
- if (threadpool->n_threads_cur == 1) {
3192
- return;
3193
- }
3194
-
3195
- atomic_int * n_barrier = &threadpool->n_barrier;
3196
- atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
3191
+ int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
3197
3192
3198
- int n_threads = threadpool->n_threads_cur;
3199
- int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed );
3193
+ // enter barrier (full seq-cst fence)
3194
+ int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst );
3200
3195
3201
- if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
3196
+ int last = 0;
3197
+ if (n_barrier == (n_threads - 1)) {
3202
3198
// last thread
3203
- atomic_store( n_barrier, 0);
3204
- atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed) ;
3199
+ atomic_store_explicit(&tp-> n_barrier, 0, memory_order_relaxed );
3200
+ last = 1 ;
3205
3201
} else {
3206
3202
// wait for other threads
3207
- while (true) {
3208
- if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
3209
- return;
3210
- }
3203
+ while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
3211
3204
ggml_thread_cpu_relax();
3212
3205
}
3213
3206
}
3214
- }
3207
+
3208
+ // exit barrier (full seq-cst fence)
3209
+ atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
3215
3210
#endif
3211
+ }
3216
3212
3217
3213
// TODO: make this somehow automatically executed
3218
3214
// some sort of "sentry" mechanism
@@ -19933,64 +19929,84 @@ struct ggml_cplan ggml_graph_plan(
19933
19929
19934
19930
static thread_ret_t ggml_graph_compute_thread(void * data) {
19935
19931
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
19932
+ struct ggml_threadpool * tp = state->threadpool;
19936
19933
19937
- const struct ggml_cgraph * cgraph = state->threadpool ->cgraph;
19938
- const struct ggml_cplan * cplan = state->threadpool ->cplan;
19934
+ const struct ggml_cgraph * cgraph = tp ->cgraph;
19935
+ const struct ggml_cplan * cplan = tp ->cplan;
19939
19936
19940
19937
set_numa_thread_affinity(state->ith);
19941
19938
19942
19939
struct ggml_compute_params params = {
19943
19940
/*.ith =*/ state->ith,
19944
- /*.nth =*/ state->threadpool-> n_threads_cur,
19941
+ /*.nth =*/ atomic_load_explicit(&tp-> n_threads_cur, memory_order_relaxed) ,
19945
19942
/*.wsize =*/ cplan->work_size,
19946
19943
/*.wdata =*/ cplan->work_data,
19947
- /*.threadpool=*/ state->threadpool ,
19944
+ /*.threadpool=*/ tp ,
19948
19945
};
19949
19946
19950
- for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
19947
+ for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort ; node_n++) {
19951
19948
struct ggml_tensor * node = cgraph->nodes[node_n];
19952
19949
19953
19950
ggml_compute_forward(¶ms, node);
19954
19951
19955
- if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
19956
- state->threadpool->ec = GGML_STATUS_ABORTED;
19952
+ if (state->ith == 0 && cplan->abort_callback &&
19953
+ cplan->abort_callback(cplan->abort_callback_data)) {
19954
+ tp->abort = true;
19955
+ tp->ec = GGML_STATUS_ABORTED;
19957
19956
}
19958
19957
19959
19958
ggml_barrier(state->threadpool);
19960
-
19961
- if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
19962
- break;
19963
- }
19964
19959
}
19965
19960
19966
19961
return 0;
19967
19962
}
19968
19963
19969
19964
#ifndef GGML_USE_OPENMP
19970
19965
19971
- static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
19966
+ // check if thread is active
19967
+ static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
19968
+ struct ggml_threadpool * threadpool = state->threadpool;
19969
+ int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
19970
+ return (state->ith < n_threads);
19971
+ }
19972
+
19973
+ // check if thread is ready to proceed (exit from polling or sleeping)
19974
+ static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
19972
19975
struct ggml_threadpool * threadpool = state->threadpool;
19973
19976
19974
19977
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
19975
19978
19976
19979
// check for new graph/work
19977
19980
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
19978
19981
if (new_graph != state->last_graph) {
19979
- state->pending = (state->ith < threadpool->n_threads_cur );
19982
+ state->pending = ggml_graph_compute_thread_active (state);
19980
19983
state->last_graph = new_graph;
19981
19984
}
19982
19985
19983
19986
return state->pending;
19984
19987
}
19985
19988
19989
+ // sync thread state after polling
19990
+ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
19991
+ struct ggml_threadpool * threadpool = state->threadpool;
19992
+ // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
19993
+ // so instead we just use a dummy read-modify-write
19994
+ atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
19995
+ }
19996
+
19986
19997
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
19987
19998
struct ggml_threadpool * threadpool = state->threadpool;
19988
19999
20000
+ // Skip polling for unused threads
20001
+ if (!ggml_graph_compute_thread_active(state)) {
20002
+ return state->pending;
20003
+ }
20004
+
19989
20005
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
19990
20006
// Perhaps, we can adjust it dynamically based on load and things.
19991
20007
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
19992
20008
19993
- for (uint64_t i=0; !ggml_graph_compute_ready (state) && i< n_rounds; i++) {
20009
+ for (uint64_t i=0; !ggml_graph_compute_thread_ready (state) && i < n_rounds; i++) {
19994
20010
// No new work. Keep polling.
19995
20011
ggml_thread_cpu_relax();
19996
20012
}
@@ -20002,13 +20018,14 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
20002
20018
struct ggml_threadpool * threadpool = state->threadpool;
20003
20019
20004
20020
if (ggml_graph_compute_poll_for_work(state)) {
20021
+ ggml_graph_compute_thread_sync(state);
20005
20022
return state->pending;
20006
20023
}
20007
20024
20008
20025
ggml_mutex_lock_shared(&threadpool->mutex);
20009
- while (!ggml_graph_compute_ready (state)) {
20026
+ while (!ggml_graph_compute_thread_ready (state)) {
20010
20027
// No new work. Wait for the signal.
20011
- GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
20028
+ GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping) \n", state->ith);
20012
20029
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
20013
20030
}
20014
20031
ggml_mutex_unlock_shared(&threadpool->mutex);
@@ -20055,13 +20072,20 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
20055
20072
}
20056
20073
20057
20074
// Start processing new graph
20058
- static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
20075
+ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads )
20059
20076
{
20060
- // always take the mutex here because the worker threads are doing hybrid poll/wait
20077
+ // Always take the mutex here because the worker threads are doing hybrid poll/wait
20061
20078
20062
20079
ggml_mutex_lock(&threadpool->mutex);
20063
20080
20064
- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
20081
+ GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
20082
+
20083
+ // Update the number of active threads
20084
+ atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
20085
+
20086
+ // Indicate the graph is ready to be processed
20087
+ // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
20088
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
20065
20089
20066
20090
if (threadpool->pause) {
20067
20091
// Update main thread prio and affinity to match the threadpool settings
@@ -20120,6 +20144,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
20120
20144
threadpool->current_chunk = 0;
20121
20145
threadpool->stop = false;
20122
20146
threadpool->pause = tpp->paused;
20147
+ threadpool->abort = false;
20123
20148
threadpool->workers = NULL;
20124
20149
threadpool->n_threads_max = tpp->n_threads;
20125
20150
threadpool->n_threads_cur = tpp->n_threads;
@@ -20195,15 +20220,11 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20195
20220
// No worker threads should be accessing the parameters below at this stage
20196
20221
threadpool->cgraph = cgraph;
20197
20222
threadpool->cplan = cplan;
20198
- threadpool->n_threads_cur = n_threads;
20199
20223
threadpool->current_chunk = 0;
20224
+ threadpool->abort = false;
20200
20225
threadpool->ec = GGML_STATUS_SUCCESS;
20201
20226
}
20202
20227
20203
- if (n_threads > threadpool->n_threads_max) {
20204
- GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
20205
- }
20206
-
20207
20228
#ifdef GGML_USE_OPENMP
20208
20229
if (n_threads > 1) {
20209
20230
#pragma omp parallel num_threads(n_threads)
@@ -20212,7 +20233,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20212
20233
{
20213
20234
// update the number of threads from the actual number of threads that we got from OpenMP
20214
20235
n_threads = omp_get_num_threads();
20215
- threadpool->n_threads_cur = n_threads;
20236
+ atomic_store_explicit(& threadpool->n_threads_cur, n_threads, memory_order_relaxed) ;
20216
20237
}
20217
20238
20218
20239
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@@ -20221,8 +20242,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20221
20242
ggml_graph_compute_thread(&threadpool->workers[0]);
20222
20243
}
20223
20244
#else
20245
+ if (n_threads > threadpool->n_threads_max) {
20246
+ GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
20247
+ n_threads = threadpool->n_threads_max;
20248
+ }
20249
+
20224
20250
// Kick all threads to start the new graph
20225
- ggml_graph_compute_kickoff(threadpool);
20251
+ ggml_graph_compute_kickoff(threadpool, n_threads );
20226
20252
20227
20253
// This is a work thread too
20228
20254
ggml_graph_compute_thread(&threadpool->workers[0]);
0 commit comments