From 8824130ff338afb9c3cb7e1733420309711076b5 Mon Sep 17 00:00:00 2001 From: Vladimir Date: Fri, 31 Mar 2023 21:03:48 +0200 Subject: [PATCH 1/2] lock instead of spinlock --- ggml.c | 224 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 160 insertions(+), 64 deletions(-) diff --git a/ggml.c b/ggml.c index b6dd3f3cf74e3..104d55e606253 100644 --- a/ggml.c +++ b/ggml.c @@ -9112,6 +9112,19 @@ typedef pthread_t ggml_thread_t; #define ggml_thread_create pthread_create #define ggml_thread_join pthread_join +typedef pthread_mutex_t ggml_mutex_t; +typedef pthread_cond_t ggml_cond_t; + +#define ggml_mutex_init pthread_mutex_init +#define ggml_mutex_destroy pthread_mutex_destroy +#define ggml_cond_init pthread_cond_init +#define ggml_cond_destroy pthread_cond_destroy + +#define ggml_mutex_lock pthread_mutex_lock +#define ggml_mutex_unlock pthread_mutex_unlock +#define ggml_cond_broadcast pthread_cond_broadcast +#define ggml_cond_wait pthread_cond_wait + #else //typedef pthread_spinlock_t ggml_lock_t; @@ -9135,17 +9148,31 @@ typedef pthread_t ggml_thread_t; #define ggml_thread_create pthread_create #define ggml_thread_join pthread_join +typedef pthread_mutex_t ggml_mutex_t; +typedef pthread_cond_t ggml_cond_t; + +#define ggml_mutex_init pthread_mutex_init +#define ggml_mutex_destroy pthread_mutex_destroy +#define ggml_cond_init pthread_cond_init +#define ggml_cond_destroy pthread_cond_destroy + +#define ggml_mutex_lock pthread_mutex_lock +#define ggml_mutex_unlock pthread_mutex_unlock +#define ggml_cond_broadcast pthread_cond_broadcast +#define ggml_cond_wait pthread_cond_wait + #endif struct ggml_compute_state_shared { - ggml_lock_t spin; int n_threads; // synchronization primitives - atomic_int n_ready; - atomic_bool has_work; - atomic_bool stop; // stop all threads + int n_ready; + bool has_work; + bool stop; // stop all threads + ggml_mutex_t mutex; + ggml_cond_t cond; }; struct ggml_compute_state { @@ -9161,43 +9188,57 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; const int n_threads = state->shared->n_threads; - while (true) { - if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) { - atomic_store(&state->shared->has_work, false); + ggml_mutex_lock(&state->shared->mutex); + if (state->shared->n_ready++ == n_threads - 1) { + state->shared->has_work = false; + ggml_cond_broadcast(&state->shared->cond); } else { - while (atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { + while (state->shared->has_work) { + if (state->shared->stop) { + ggml_mutex_unlock(&state->shared->mutex); + return 0; + } + ggml_cond_wait(&state->shared->cond, &state->shared->mutex); + if (state->shared->stop) { + ggml_mutex_unlock(&state->shared->mutex); return 0; } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); } } + ggml_mutex_unlock(&state->shared->mutex); - atomic_fetch_sub(&state->shared->n_ready, 1); + ggml_mutex_lock(&state->shared->mutex); + state->shared->n_ready--; + ggml_cond_broadcast(&state->shared->cond); + ggml_mutex_unlock(&state->shared->mutex); // wait for work - while (!atomic_load(&state->shared->has_work)) { - if (atomic_load(&state->shared->stop)) { - return 0; + ggml_mutex_lock(&state->shared->mutex); + while (!state->shared->has_work && !state->shared->stop) { + if (state->shared->stop) { + ggml_mutex_unlock(&state->shared->mutex); + return 0; } - ggml_lock_lock (&state->shared->spin); - ggml_lock_unlock(&state->shared->spin); + ggml_cond_wait(&state->shared->cond, &state->shared->mutex); } + ggml_mutex_unlock(&state->shared->mutex); // check if we should stop - if (atomic_load(&state->shared->stop)) { + ggml_mutex_lock(&state->shared->mutex); + if (state->shared->stop) { + ggml_mutex_unlock(&state->shared->mutex); break; } + ggml_mutex_unlock(&state->shared->mutex); if (state->node) { if (state->params.ith < state->params.nth) { ggml_compute_forward(&state->params, state->node); } - state->node = NULL; } else { + ggml_mutex_unlock(&state->shared->mutex); break; } } @@ -9209,19 +9250,32 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) const int n_threads = cgraph->n_threads; struct ggml_compute_state_shared state_shared = { - /*.spin =*/ GGML_LOCK_INITIALIZER, /*.n_threads =*/ n_threads, /*.n_ready =*/ 0, /*.has_work =*/ false, /*.stop =*/ false, + /*.mutex =*/ {0}, + /*.cond =*/ {0}, }; + { + int rc = ggml_mutex_init(&state_shared.mutex, NULL); + GGML_ASSERT(rc == 0); + UNUSED(rc); + } + { + int rc = ggml_cond_init(&state_shared.cond, NULL); + GGML_ASSERT(rc == 0); + UNUSED(rc); + } struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL; // create thread pool if (n_threads > 1) { - ggml_lock_init(&state_shared.spin); - atomic_store(&state_shared.has_work, true); + ggml_mutex_lock(&state_shared.mutex); + state_shared.has_work = true; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); for (int j = 0; j < n_threads - 1; j++) { workers[j] = (struct ggml_compute_state) { @@ -9477,14 +9531,18 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // COMPUTE if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); + ggml_mutex_lock(&state_shared.mutex); + if (state_shared.n_ready++ == n_threads - 1) { + state_shared.has_work = false; + ggml_cond_broadcast(&state_shared.cond); } + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.has_work) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); // launch thread pool for (int j = 0; j < n_threads - 1; j++) { @@ -9498,14 +9556,22 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) workers[j].node = node; } - atomic_fetch_sub(&state_shared.n_ready, 1); + ggml_mutex_lock(&state_shared.mutex); + state_shared.n_ready--; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.n_ready > 0) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); - atomic_store(&state_shared.has_work, true); + + ggml_mutex_lock(&state_shared.mutex); + state_shared.has_work = true; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); } params.type = GGML_TASK_COMPUTE; @@ -9513,33 +9579,45 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // wait for thread pool if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); + ggml_mutex_lock(&state_shared.mutex); + if (state_shared.n_ready++ == n_threads - 1) { + state_shared.has_work = false; + ggml_cond_broadcast(&state_shared.cond); } + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.has_work) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); - atomic_fetch_sub(&state_shared.n_ready, 1); + ggml_mutex_lock(&state_shared.mutex); + state_shared.n_ready--; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.n_ready != 0) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); } // FINALIZE if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); + ggml_mutex_lock(&state_shared.mutex); + if (state_shared.n_ready++ == n_threads - 1) { + state_shared.has_work = false; + ggml_cond_broadcast(&state_shared.cond); } + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.has_work) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); // launch thread pool for (int j = 0; j < n_threads - 1; j++) { @@ -9553,14 +9631,21 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) workers[j].node = node; } - atomic_fetch_sub(&state_shared.n_ready, 1); + ggml_mutex_lock(&state_shared.mutex); + state_shared.n_ready -= 1; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.n_ready) > 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.n_ready > 0) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); - atomic_store(&state_shared.has_work, true); + ggml_mutex_lock(&state_shared.mutex); + state_shared.has_work = true; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); } params.type = GGML_TASK_FINALIZE; @@ -9568,21 +9653,29 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // wait for thread pool if (node->n_tasks > 1) { - if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { - atomic_store(&state_shared.has_work, false); + ggml_mutex_lock(&state_shared.mutex); + if (state_shared.n_ready++ == n_threads - 1) { + state_shared.has_work = false; + ggml_cond_broadcast(&state_shared.cond); } + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.has_work)) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.has_work) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); - atomic_fetch_sub(&state_shared.n_ready, 1); + ggml_mutex_lock(&state_shared.mutex); + state_shared.n_ready--; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); - while (atomic_load(&state_shared.n_ready) != 0) { - ggml_lock_lock (&state_shared.spin); - ggml_lock_unlock(&state_shared.spin); + ggml_mutex_lock(&state_shared.mutex); + while (state_shared.n_ready != 0) { + ggml_cond_wait(&state_shared.cond, &state_shared.mutex); } + ggml_mutex_unlock(&state_shared.mutex); } // performance stats (node) @@ -9598,16 +9691,19 @@ void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) // join thread pool if (n_threads > 1) { - atomic_store(&state_shared.stop, true); - atomic_store(&state_shared.has_work, true); - + ggml_mutex_lock(&state_shared.mutex); + state_shared.stop = true; + state_shared.has_work = true; + ggml_cond_broadcast(&state_shared.cond); + ggml_mutex_unlock(&state_shared.mutex); for (int j = 0; j < n_threads - 1; j++) { int rc = ggml_thread_join(workers[j].thrd, NULL); GGML_ASSERT(rc == 0); UNUSED(rc); } - ggml_lock_destroy(&state_shared.spin); + ggml_cond_destroy(&state_shared.cond); + ggml_mutex_destroy(&state_shared.mutex); } // performance stats (graph) From c55222e4f798dbfb1b54240a18d2fba1726a7209 Mon Sep 17 00:00:00 2001 From: Vladimir Date: Sat, 1 Apr 2023 17:12:59 +0200 Subject: [PATCH 2/2] windows fixed --- ggml.c | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/ggml.c b/ggml.c index 104d55e606253..961440212a842 100644 --- a/ggml.c +++ b/ggml.c @@ -72,6 +72,59 @@ static int sched_yield (void) { Sleep (0); return 0; } + +typedef struct pthread_mutex_tag { + CRITICAL_SECTION critical_section; +} pthread_mutex_t; + +typedef struct pthread_mutexattr_tag { + int attr; +} pthread_mutexattr_t; + +int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr_t * attr) { + InitializeCriticalSection (&mutex->critical_section); + return 0; +} + +int pthread_mutex_destroy(pthread_mutex_t * mutex) { + DeleteCriticalSection(&mutex->critical_section); + return 0; +} + + +int pthread_mutex_lock(pthread_mutex_t * mutex) { + EnterCriticalSection(&mutex->critical_section); + return 0; +} + +int pthread_mutex_unlock(pthread_mutex_t * mutex) { + LeaveCriticalSection(&mutex->critical_section); + return 0; +} + +typedef struct pthread_cond_tag { + CONDITION_VARIABLE cond; +} pthread_cond_t; + +int pthread_cond_init(pthread_cond_t * cond, void * unused) { + InitializeConditionVariable (&cond->cond); + return 0; +} + +int pthread_cond_destroy(pthread_cond_t * cond) { + return 0; +} + +int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t * mutex) { + SleepConditionVariableCS(&cond->cond, &mutex->critical_section, INFINITE); + return 0; +} + +int pthread_cond_broadcast(pthread_cond_t * cond) { + WakeAllConditionVariable(&cond->cond); + return 0; +} + #else #include #include