From 4886f5001d9f6fc9089f0640cb79030c5a2b7e50 Mon Sep 17 00:00:00 2001 From: slaren Date: Tue, 18 Jun 2024 18:38:49 +0200 Subject: [PATCH 1/5] ggml : synchronize using openmp barriers --- ggml.c | 190 ++++++++++----------------------------------------------- 1 file changed, 31 insertions(+), 159 deletions(-) diff --git a/ggml.c b/ggml.c index d5d33c2ba1029..ad6172c023d4c 100644 --- a/ggml.c +++ b/ggml.c @@ -1753,9 +1753,9 @@ struct ggml_compute_state_shared { int n_threads; // synchronization primitives - atomic_int n_active; // num active threads - atomic_int node_n; // active graph node - atomic_int node_task; // active graph node task phase + //atomic_int n_active; // num active threads + //atomic_int node_n; // active graph node + //atomic_int node_task; // active graph node task phase ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void* abort_callback_data; @@ -18972,184 +18972,60 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_ return n_tasks; } -static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) { - // wait for other threads to finish - const int last_node_n = * node_n; - - while (true) { - if (do_yield) { - sched_yield(); - } - - *node_n = atomic_load(&state->shared->node_n); - if (*node_n != last_node_n) { - break; - } - -#if defined(__SSE3__) - // Tell the processor we're spinning. It's a processor hint for spinlocks. - _mm_pause(); -#endif - } -} - -static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) { - // wait for other threads to finish - const int last_task_phase = *task_phase; - - while (true) { - if (do_yield) { - sched_yield(); - } - - *task_phase = atomic_load(&state->shared->node_task); - if (*task_phase != last_task_phase) { - break; - } - -#if defined(__SSE3__) - // Tell the processor we're spinning. It's a processor hint for spinlocks. - _mm_pause(); -#endif - } -} - static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; const struct ggml_cgraph * cgraph = state->shared->cgraph; const struct ggml_cplan * cplan = state->shared->cplan; - const int n_threads = state->shared->n_threads; + const int ith = state->ith; + const int n_threads = state->shared->n_threads; - set_numa_thread_affinity(state->ith); + set_numa_thread_affinity(ith); - int node_n = -1; - int task_phase = GGML_TASK_TYPE_FINALIZE; + struct ggml_compute_params params = { + /*.type =*/ GGML_TASK_TYPE_INIT, + /*.ith =*/ ith, + /*.nth =*/ state->shared->n_threads, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + }; - while (true) { + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->shared->node_n += 1; state->ec = GGML_STATUS_ABORTED; return 0; } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - // all other threads are finished and spinning - // do finalize and init here so we don't have synchronize again - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_FINALIZE, - /*.ith =*/ 0, - /*.nth =*/ 0, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - }; - - if (node_n != -1) { - /* FINALIZE */ - struct ggml_tensor * node = cgraph->nodes[node_n]; - if (GGML_OP_HAS_FINALIZE[node->op]) { - params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - ggml_compute_forward(¶ms, node, state); - } - ggml_graph_compute_perf_stats_node(node, state->shared); - } - - // distribute new work or execute it direct if 1T - while (++node_n < cgraph->n_nodes) { - GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes); - struct ggml_tensor * node = cgraph->nodes[node_n]; - const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - - state->shared->perf_node_start_cycles = ggml_perf_cycles(); - state->shared->perf_node_start_time_us = ggml_perf_time_us(); - - params.nth = n_tasks; - - if (n_tasks == 1) { - /* INIT */ - if (GGML_OP_HAS_INIT[node->op]) { - params.type = GGML_TASK_TYPE_INIT; - ggml_compute_forward(¶ms, node, state); - } - - // TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1, - // they do something more efficient than spinning (?) - params.type = GGML_TASK_TYPE_COMPUTE; - ggml_compute_forward(¶ms, node, state); - - if (GGML_OP_HAS_FINALIZE[node->op]) { - params.type = GGML_TASK_TYPE_FINALIZE; - ggml_compute_forward(¶ms, node, state); - } - - ggml_graph_compute_perf_stats_node(node, state->shared); - } else { - break; - } - - if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - break; - } - } - - task_phase = GGML_TASK_TYPE_INIT; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_n, node_n); - atomic_store(&state->shared->node_task, task_phase); - } else { - ggml_graph_compute_thread_sync_node(&node_n, state, false); - ggml_graph_compute_thread_sync_task(&task_phase, state, false); - } - - // check if we should stop - if (node_n >= cgraph->n_nodes) break; - - /* INIT & COMPUTE */ struct ggml_tensor * node = cgraph->nodes[node_n]; const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_INIT, - /*.ith =*/ state->ith, - /*.nth =*/ n_tasks, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - }; + params.nth = n_tasks; - if (state->ith < n_tasks) { - if (GGML_OP_HAS_INIT[node->op]) { + /* INIT */ + if (GGML_OP_HAS_INIT[node->op]) { + if (ith < n_tasks) { + params.type = GGML_TASK_TYPE_INIT; ggml_compute_forward(¶ms, node, state); } + #pragma omp barrier } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - task_phase = GGML_TASK_TYPE_COMPUTE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { - // TODO: this sched_yield can have significant impact on the performance - either positive or negative - // depending on the workload and the operating system. - // since it is not clear what is the best approach, it should potentially become user-configurable - // ref: https://github.com/ggerganov/ggml/issues/291 - // UPD: adding the do_yield flag seems to resolve the issue universally - const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT; - ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield); - } - - if (state->ith < n_tasks) { + /* COMPUTE */ + if (ith < n_tasks) { params.type = GGML_TASK_TYPE_COMPUTE; ggml_compute_forward(¶ms, node, state); } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - task_phase = GGML_TASK_TYPE_FINALIZE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { - ggml_graph_compute_thread_sync_task(&task_phase, state, false); + #pragma omp barrier + + /* FINALIZE */ + if (GGML_OP_HAS_FINALIZE[node->op]) { + if (params.ith == 0) { + params.type = GGML_TASK_TYPE_FINALIZE; + ggml_compute_forward(¶ms, node, state); + } + #pragma omp barrier } } @@ -19336,7 +19212,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state * // update the number of threads from the actual number of threads that we got from OpenMP n_threads = omp_get_num_threads(); workers[0].shared->n_threads = n_threads; - workers[0].shared->n_active = n_threads; } ggml_graph_compute_thread(&workers[omp_get_thread_num()]); } @@ -19399,9 +19274,6 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.perf_node_start_cycles =*/ 0, /*.perf_node_start_time_us =*/ 0, /*.n_threads =*/ n_threads, - /*.n_active =*/ n_threads, - /*.node_n =*/ -1, - /*.node_task =*/ GGML_TASK_TYPE_FINALIZE, /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, /*.current_chunk; =*/ 0, From e4643ad4d448a2c50ee047018ff710f34bb1b340 Mon Sep 17 00:00:00 2001 From: slaren Date: Tue, 18 Jun 2024 19:55:17 +0200 Subject: [PATCH 2/5] add implementation without openmp --- ggml.c | 50 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/ggml.c b/ggml.c index ad6172c023d4c..c33385f2e369a 100644 --- a/ggml.c +++ b/ggml.c @@ -1753,9 +1753,8 @@ struct ggml_compute_state_shared { int n_threads; // synchronization primitives - //atomic_int n_active; // num active threads - //atomic_int node_n; // active graph node - //atomic_int node_task; // active graph node task phase + atomic_int n_barrier; + atomic_int n_barrier_passed; ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void* abort_callback_data; @@ -18972,6 +18971,43 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_ return n_tasks; } +#ifdef GGML_USE_OPENMP +static void ggml_barrier(struct ggml_compute_state * state) { + #pragma omp barrier + UNUSED(state); +} +#else +static void ggml_barrier(struct ggml_compute_state * state) { + atomic_int * n_barrier = &state->shared->n_barrier; + atomic_int * n_barrier_passed = &state->shared->n_barrier_passed; + + int n_threads = state->shared->n_threads; + int passed_old = atomic_load(n_barrier_passed); + + if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) { + // last thread + atomic_store(n_barrier, 0); + atomic_fetch_add(n_barrier_passed, 1); + } else { + // wait for other threads + //while (atomic_load(n_barrier_passed) == passed_old) { + //} + const int n_spin_before_sleep = 100; + while (true) { + for (int i = 0; i < n_spin_before_sleep; i++) { + if (atomic_load(n_barrier_passed) != passed_old) { + return; + } + #if defined(__SSE3__) + _mm_pause(); + #endif + } + sched_yield(); + } + } +} +#endif + static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; @@ -19008,7 +19044,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { params.type = GGML_TASK_TYPE_INIT; ggml_compute_forward(¶ms, node, state); } - #pragma omp barrier + ggml_barrier(state); } /* COMPUTE */ @@ -19017,7 +19053,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { ggml_compute_forward(¶ms, node, state); } - #pragma omp barrier + ggml_barrier(state); /* FINALIZE */ if (GGML_OP_HAS_FINALIZE[node->op]) { @@ -19025,7 +19061,7 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { params.type = GGML_TASK_TYPE_FINALIZE; ggml_compute_forward(¶ms, node, state); } - #pragma omp barrier + ggml_barrier(state); } } @@ -19274,6 +19310,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.perf_node_start_cycles =*/ 0, /*.perf_node_start_time_us =*/ 0, /*.n_threads =*/ n_threads, + /*.n_barrier =*/ 0, + /*.n_barrier_passed =*/ 0, /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, /*.current_chunk; =*/ 0, From 7226483b06fc8579f8b016b4a8d13c2a878c4d70 Mon Sep 17 00:00:00 2001 From: slaren Date: Tue, 18 Jun 2024 20:09:51 +0200 Subject: [PATCH 3/5] spin more --- ggml.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ggml.c b/ggml.c index c33385f2e369a..797bfa70ddc07 100644 --- a/ggml.c +++ b/ggml.c @@ -18992,7 +18992,7 @@ static void ggml_barrier(struct ggml_compute_state * state) { // wait for other threads //while (atomic_load(n_barrier_passed) == passed_old) { //} - const int n_spin_before_sleep = 100; + const int n_spin_before_sleep = 100000; while (true) { for (int i = 0; i < n_spin_before_sleep; i++) { if (atomic_load(n_barrier_passed) != passed_old) { From e5c0c4e30d42f9679f1a9b4fc930676e126a2634 Mon Sep 17 00:00:00 2001 From: slaren Date: Tue, 18 Jun 2024 20:54:02 +0200 Subject: [PATCH 4/5] server ci : do not use openmp with tsan ggml-ci --- .github/workflows/server.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml index 1fee9ac281943..6155e94156e42 100644 --- a/.github/workflows/server.yml +++ b/.github/workflows/server.yml @@ -87,8 +87,22 @@ jobs: exit 1 fi + - name: Build (no OpenMP) + id: cmake_build_no_openmp + if: ${{ matrix.sanitizer == 'THREAD' }} + run: | + cmake -B build \ + -DLLAMA_NATIVE=OFF \ + -DLLAMA_BUILD_SERVER=ON \ + -DLLAMA_CURL=ON \ + -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \ + -DLLAMA_SANITIZE_${{ matrix.sanitizer }}=ON \ + -DLLAMA_OPENMP=OFF ; + cmake --build build --config ${{ matrix.build_type }} -j $(nproc) --target llama-server + - name: Build id: cmake_build + if: ${{ matrix.sanitizer != 'THREAD' }} run: | cmake -B build \ -DLLAMA_NATIVE=OFF \ From 4d8a0c2b9f95dfcc2f0b78bddd3962733502f966 Mon Sep 17 00:00:00 2001 From: slaren Date: Wed, 19 Jun 2024 02:05:44 +0200 Subject: [PATCH 5/5] skip barriers with 1 threads --- ggml.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/ggml.c b/ggml.c index 797bfa70ddc07..778ca3fdf1f8f 100644 --- a/ggml.c +++ b/ggml.c @@ -18973,11 +18973,18 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_ #ifdef GGML_USE_OPENMP static void ggml_barrier(struct ggml_compute_state * state) { + if (state->shared->n_threads == 1) { + return; + } + #pragma omp barrier - UNUSED(state); } #else static void ggml_barrier(struct ggml_compute_state * state) { + if (state->shared->n_threads == 1) { + return; + } + atomic_int * n_barrier = &state->shared->n_barrier; atomic_int * n_barrier_passed = &state->shared->n_barrier_passed;