From b1d402d5fbc2f53e29a65d2122c8d64e10e52662 Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 03:26:39 +0800 Subject: [PATCH 1/7] work stealing chunked task allocator example for issue #291 --- examples/CMakeLists.txt | 1 + examples/task-allocator/CMakeLists.txt | 7 + examples/task-allocator/task-allocator.c | 433 +++++++++++++++++++++++ 3 files changed, 441 insertions(+) create mode 100644 examples/task-allocator/CMakeLists.txt create mode 100644 examples/task-allocator/task-allocator.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index cf9c4a2231337..2c4dd1a630a4e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -39,6 +39,7 @@ else() add_subdirectory(baby-llama) add_subdirectory(train-text-from-scratch) add_subdirectory(simple) + add_subdirectory(task-allocator) if (LLAMA_METAL) add_subdirectory(metal) endif() diff --git a/examples/task-allocator/CMakeLists.txt b/examples/task-allocator/CMakeLists.txt new file mode 100644 index 0000000000000..3d10456a12e66 --- /dev/null +++ b/examples/task-allocator/CMakeLists.txt @@ -0,0 +1,7 @@ +set(TARGET task-allocator) +add_executable(${TARGET} task-allocator.c) +target_link_libraries(${TARGET} PRIVATE) +target_compile_features(${TARGET} PRIVATE c_std_11) +if(TARGET BUILD_INFO) + add_dependencies(${TARGET} BUILD_INFO) +endif() diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c new file mode 100644 index 0000000000000..4d6957f0d74a5 --- /dev/null +++ b/examples/task-allocator/task-allocator.c @@ -0,0 +1,433 @@ +// https://github.com/ggerganov/ggml/issues/291 +// https://github.com/ggerganov/llama.cpp/pull/1507 + +#include +#include +#include +#include + +#if defined(_MSC_VER) || defined(__MINGW32__) +#include // using malloc.h with MSC/MINGW +#elif !defined(__FreeBSD__) && !defined(__NetBSD__) && !defined(__OpenBSD__) +#include +#endif + +#define GGML_ASSERT(x) \ + do { \ + if (!(x)) { \ + fprintf(stderr, "GGML_ASSERT: %s:%d: %s\n", __FILE__, __LINE__, \ + #x); \ + abort(); \ + } \ + } while (0) + +#define GGML_DEBUG 1 +#if (GGML_DEBUG >= 1) +#define GGML_PRINT_DEBUG(...) printf(__VA_ARGS__) +#else +#define GGML_PRINT_DEBUG(...) +#endif + +#if (GGML_DEBUG >= 5) +#define GGML_PRINT_DEBUG_5(...) printf(__VA_ARGS__) +#else +#define GGML_PRINT_DEBUG_5(...) +#endif + +#define UNUSED(x) (void)(x) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + +#if defined(_WIN32) + +#include + +typedef volatile LONG atomic_int; +typedef atomic_int atomic_bool; + +typedef HANDLE pthread_t; +typedef int thread_ret_t; + +static void atomic_store(atomic_int *ptr, LONG val) { + Intechan_lockedExchange(ptr, val); +} + +static LONG atomic_load(atomic_int *ptr) { + return Intechan_lockedCompareExchange(ptr, 0, 0); +} + +static LONG atomic_fetch_add(atomic_int *ptr, LONG inc) { + return Intechan_lockedExchangeAdd(ptr, inc); +} + +static LONG atomic_fetch_sub(atomic_int *ptr, LONG dec) { + return atomic_fetch_add(ptr, -(dec)); +} + +static int pthread_create(pthread_t *out, void *unused, + thread_ret_t (*func)(void *), void *arg) { + (void)unused; + HANDLE handle = + CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)func, arg, 0, NULL); + if (handle == NULL) { + return EAGAIN; + } + + *out = handle; + return 0; +} + +static int pthread_join(pthread_t thread, void *unused) { + (void)unused; + return (int)WaitForSingleObject(thread, INFINITE); +} + +static int sched_yield(void) { + // https://learn.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-yieldprocessor + YieldProcessor(); + return 0; +} + +#else // ! _WIN32 + +typedef void *thread_ret_t; + +#include +#include +#include + +#endif + +typedef pthread_t ggml_thread_t; + +//----------------------------------------------------------------------------- +/// Most of the above codes are taken from +/// https://github.com/ggerganov/llama.cpp/tree/master/ggml.c +/// Copyright original authors. +//----------------------------------------------------------------------------- + +_Thread_local int32_t thread_local_id; + +#define MAX_THREADS 16 + +struct task_allocator { + int nth; + + int n_multiplier; // >= 1 + + atomic_int lock; // 0 unlocked, 1 locked + + // total assigned. + atomic_int global_counter; + + atomic_int thread_queue_heads[MAX_THREADS]; + atomic_int thread_queue_tails[MAX_THREADS]; +}; + +static void task_allocator_reset(struct task_allocator *a) { + for (int i = 0; i < a->nth; ++i) { + atomic_store(&a->thread_queue_heads[i], 0); + atomic_store(&a->thread_queue_tails[i], a->n_multiplier); + } + + atomic_store(&a->lock, 0); + atomic_store(&a->global_counter, 0); +} + +static void task_allocator_init(struct task_allocator *a, int nth, + int n_multiplier) { + GGML_ASSERT(nth <= MAX_THREADS); + a->nth = nth; + a->n_multiplier = n_multiplier; + task_allocator_reset(a); +} + +static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, + int *n_chunks) { + GGML_ASSERT(a->nth > 0); + GGML_ASSERT(a->n_multiplier > 0); + + *chunk_idx = -1; + *n_chunks = 0; + + while (atomic_fetch_add(&a->lock, 1) != 0) { // lock + atomic_fetch_sub(&a->lock, 1); + } + + int M = a->n_multiplier; + int nth = a->nth; + int total_chunks = M * nth; + + // all assigned? + if (atomic_load(&a->global_counter) == total_chunks) { + GGML_PRINT_DEBUG_5("[#_%d] %s(): nothing to do.\n", thread_local_id, + __func__); + atomic_fetch_sub(&a->lock, 1); // unlock + return; + } + + // try take its own, pop front. + { + int head = atomic_load(&a->thread_queue_heads[ith]); + int tail = atomic_load(&a->thread_queue_tails[ith]); + + GGML_PRINT_DEBUG_5("[#_%d] %s(): head: %d, tail: %d.\n", + thread_local_id, __func__, head, tail); + + if (head < tail) { + int idx = ith * M + head; + + atomic_fetch_add(&a->thread_queue_heads[ith], 1); + atomic_fetch_add(&a->global_counter, 1); + + GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n", + thread_local_id, __func__, head + 1); + + *chunk_idx = idx; + *n_chunks = total_chunks; + atomic_fetch_sub(&a->lock, 1); // unlock + return; + } + } + + // steal from others. + for (int i = 0; i < nth; ++i) { + if (i == ith) { + continue; + } + + int tail = atomic_load(&a->thread_queue_tails[i]); + if (tail == atomic_load(&a->thread_queue_heads[i])) { + continue; + } + + // pop back + int idx = i * M + tail; + atomic_fetch_sub(&a->thread_queue_tails[i], 1); + atomic_fetch_add(&a->global_counter, 1); + + GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", + thread_local_id, __func__, tail, i); + + *chunk_idx = idx; + *n_chunks = total_chunks; + atomic_fetch_sub(&a->lock, 1); // unlock + return; + } + + fprintf(stderr, "%s:%d should be unreachable!\n", __FILE__, __LINE__); + abort(); +} + +struct state_shared { + int n_threads; + int n_multiplier; + + int n_nodes; + struct ggml_tensor *nodes; + + // thread done counter for single node + atomic_int done_counter; + + struct task_allocator task_allocator; +}; + +struct state { + ggml_thread_t thrd; + int ith; + struct state_shared *shared; +}; + +// simulate tensor that can be compute in parallel +struct ggml_tensor { + // simulate actual compute workload, e.g. src0 rows + int n_compute_units; +}; + +struct params { + int ith; + int nth; + + // simulate performance jitters related to: OS workload, thread affinity, + // economic cores, ... + int jitter_percent; + + struct task_allocator *task_allocator; +}; + +void compute_tensor(struct params params, struct ggml_tensor *node) { + GGML_PRINT_DEBUG_5("[#_%d] %s(): enter.\n", thread_local_id, __func__); + + const int ith = params.ith; + int chunk_idx; + int n_chunks; + + while (true) { + allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks); + if (chunk_idx < 0 || n_chunks <= 0) { + break; + } + + const int nr = node->n_compute_units; + const int dr = (nr + n_chunks - 1) / n_chunks; + const int ir0 = dr * chunk_idx; + const int ir1 = MIN(ir0 + dr, nr); + const int n_loops = 10000 * (100 + params.jitter_percent); + + volatile int64_t x = 0; + + for (int i = ir0; i <= ir1; ++i) { + for (int j = 0; j < n_loops; ++j) { + ++x; + } + } + UNUSED(x); + } + + GGML_PRINT_DEBUG_5("[#_%d] %s(): exit.\n", thread_local_id, __func__); +} + +static thread_ret_t demo_compute_thread(void *data) { + struct state *state = (struct state *)data; + GGML_ASSERT(state); + + struct state_shared *shared = state->shared; + GGML_ASSERT(shared); + + struct task_allocator *allocator = &shared->task_allocator; + GGML_ASSERT(allocator); + + int ith = state->ith; + int n_threads = shared->n_threads; + + thread_local_id = ith; + + atomic_int *done_counter = &shared->done_counter; + + for (int i = 0; i < shared->n_nodes; ++i) { + // Just slow down the last thread. + struct params params = { + .ith = state->ith, + .nth = n_threads, // suppose parallel + .task_allocator = allocator, + .jitter_percent = ith + 1 < n_threads ? 0 : 50, + }; + + struct ggml_tensor *node = &shared->nodes[i]; + + compute_tensor(params, node); + atomic_fetch_add(done_counter, 1); + + while (atomic_load(done_counter) != n_threads) { + sched_yield(); + // main: go here --> later, main saw cond matched, break out loop + // --> reset counter + // current: go here --stall for a thousand years --> check condition + // OOPS! will never break out. + + // So we have to check if the counter has been reset. + if (atomic_load(done_counter) == 0) { + break; + } + } + + GGML_PRINT_DEBUG_5( + "[#_%d] %s(): saw all threads finished computing the node.\n", + thread_local_id, __func__); + + if (ith == 0) { + task_allocator_reset(allocator); + atomic_store(done_counter, 0); + } else { + while (atomic_load(done_counter) != 0) { + sched_yield(); + } + } + } + + GGML_PRINT_DEBUG_5("[#_%d] %s(): exited\n", thread_local_id, __func__); + + return 0; +} + +static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, + int n_multiplier) { + fprintf(stderr, + "\n[#_%d] %s(): n_threads: %d, n_nodes: %d, n_compute_units: %d, " + "n_multiplier: %d ===>\n\n", + thread_local_id, __func__, n_threads, n_nodes, n_compute_units, + n_multiplier); + + struct ggml_tensor *nodes = alloca(n_nodes * sizeof(struct ggml_tensor)); + + for (int i = 0; i < n_nodes; ++i) { + nodes[i].n_compute_units = n_compute_units; + } + + struct state_shared shared = { + .n_threads = n_threads, + .n_nodes = n_nodes, + .nodes = nodes, + .done_counter = 0, + }; + + task_allocator_init(&shared.task_allocator, n_threads, n_multiplier); + + struct state *workers = alloca(n_threads * sizeof(struct state)); + + for (int i = 0; i < n_threads; ++i) { + workers[i].ith = i; + workers[i].shared = &shared; + if (i > 0) { + pthread_create(&workers[i].thrd, NULL, demo_compute_thread, + &workers[i]); + } + } + + demo_compute_thread(&workers[0]); + + for (int i = 1; i < n_threads; ++i) { + pthread_join(workers[i].thrd, NULL); + } +} + +// +// Conclusions: +// +// - Given workers A and B, and the accumulated time T_a and T_b: +// B can steal a chunk from A only if T_a > T_b + T_b_per_chunk. +// - Saw this situation: A steal B, B steal C. +// - n_chunks plays a key role, similar to choosing the best n_threads, it's +// difficult choose the ideal n_chunks value. Performance drops with per-chunk +// compute time exceeds the scheduling overhead. +// - Work stealing chunked task allocator can save the response time +// significantly when the majority threads runs fast but a few suddenly or +// constantly slow. +// +int main(void) { + if (false) { // the most simple one: only main thread, one node + int n_threads = 1; + int n_nodes = 1; + int n_multiplier = 1; // trunks per thread. + int n_compute_units = 1; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } + + if (false) { + int n_threads = 2; + int n_nodes = 2; + int n_multiplier = 1; // trunks per thread. + int n_compute_units = 2; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } + + if (true) { + int n_threads = 4; + int n_nodes = 2; + int n_multiplier = 8; // trunks per thread. + int n_compute_units = 32; + + test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); + } +} From fef9eac856056ef3284947a94f713ffc0ca4395a Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 04:32:44 +0800 Subject: [PATCH 2/7] fix windows build error caused by mis-replacing text --- examples/task-allocator/task-allocator.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index 4d6957f0d74a5..fc596a27a5477 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -48,15 +48,15 @@ typedef HANDLE pthread_t; typedef int thread_ret_t; static void atomic_store(atomic_int *ptr, LONG val) { - Intechan_lockedExchange(ptr, val); + InterlockedExchange(ptr, val); } static LONG atomic_load(atomic_int *ptr) { - return Intechan_lockedCompareExchange(ptr, 0, 0); + return InterlockedCompareExchange(ptr, 0, 0); } static LONG atomic_fetch_add(atomic_int *ptr, LONG inc) { - return Intechan_lockedExchangeAdd(ptr, inc); + return InterlockedExchangeAdd(ptr, inc); } static LONG atomic_fetch_sub(atomic_int *ptr, LONG dec) { From 767d1db097e7d55385f7787c1258126b2072a343 Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 04:51:55 +0800 Subject: [PATCH 3/7] remove thread local variable: Windows does not recogonize it --- examples/task-allocator/task-allocator.c | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index fc596a27a5477..8c376c90b995a 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -105,8 +105,6 @@ typedef pthread_t ggml_thread_t; /// Copyright original authors. //----------------------------------------------------------------------------- -_Thread_local int32_t thread_local_id; - #define MAX_THREADS 16 struct task_allocator { @@ -159,8 +157,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, // all assigned? if (atomic_load(&a->global_counter) == total_chunks) { - GGML_PRINT_DEBUG_5("[#_%d] %s(): nothing to do.\n", thread_local_id, - __func__); + GGML_PRINT_DEBUG("[#_%d] %s(): nothing to do.\n", ith, __func__); atomic_fetch_sub(&a->lock, 1); // unlock return; } @@ -180,7 +177,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, atomic_fetch_add(&a->global_counter, 1); GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n", - thread_local_id, __func__, head + 1); + ith, __func__, head + 1); *chunk_idx = idx; *n_chunks = total_chunks; @@ -205,8 +202,8 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, atomic_fetch_sub(&a->thread_queue_tails[i], 1); atomic_fetch_add(&a->global_counter, 1); - GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", - thread_local_id, __func__, tail, i); + GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", ith, + __func__, tail, i); *chunk_idx = idx; *n_chunks = total_chunks; @@ -299,8 +296,6 @@ static thread_ret_t demo_compute_thread(void *data) { int ith = state->ith; int n_threads = shared->n_threads; - thread_local_id = ith; - atomic_int *done_counter = &shared->done_counter; for (int i = 0; i < shared->n_nodes; ++i) { @@ -352,10 +347,9 @@ static thread_ret_t demo_compute_thread(void *data) { static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, int n_multiplier) { fprintf(stderr, - "\n[#_%d] %s(): n_threads: %d, n_nodes: %d, n_compute_units: %d, " + "\n%s(): n_threads: %d, n_nodes: %d, n_compute_units: %d, " "n_multiplier: %d ===>\n\n", - thread_local_id, __func__, n_threads, n_nodes, n_compute_units, - n_multiplier); + __func__, n_threads, n_nodes, n_compute_units, n_multiplier); struct ggml_tensor *nodes = alloca(n_nodes * sizeof(struct ggml_tensor)); From a1306ce6c6b7a976e30ca48c464fabb337df0d72 Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 05:59:25 +0800 Subject: [PATCH 4/7] prevent deadlock; cleanup --- examples/task-allocator/task-allocator.c | 32 ++++++++++-------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index 8c376c90b995a..b09186021a40c 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -167,8 +167,8 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, int head = atomic_load(&a->thread_queue_heads[ith]); int tail = atomic_load(&a->thread_queue_tails[ith]); - GGML_PRINT_DEBUG_5("[#_%d] %s(): head: %d, tail: %d.\n", - thread_local_id, __func__, head, tail); + GGML_PRINT_DEBUG_5("[#_%d] %s(): head: %d, tail: %d.\n", ith, __func__, + head, tail); if (head < tail) { int idx = ith * M + head; @@ -252,7 +252,7 @@ struct params { }; void compute_tensor(struct params params, struct ggml_tensor *node) { - GGML_PRINT_DEBUG_5("[#_%d] %s(): enter.\n", thread_local_id, __func__); + GGML_PRINT_DEBUG_5("[#_%d] %s(): enter.\n", params.ith, __func__); const int ith = params.ith; int chunk_idx; @@ -280,7 +280,7 @@ void compute_tensor(struct params params, struct ggml_tensor *node) { UNUSED(x); } - GGML_PRINT_DEBUG_5("[#_%d] %s(): exit.\n", thread_local_id, __func__); + GGML_PRINT_DEBUG_5("[#_%d] %s(): exit.\n", ith, __func__); } static thread_ret_t demo_compute_thread(void *data) { @@ -312,24 +312,18 @@ static thread_ret_t demo_compute_thread(void *data) { compute_tensor(params, node); atomic_fetch_add(done_counter, 1); - while (atomic_load(done_counter) != n_threads) { - sched_yield(); - // main: go here --> later, main saw cond matched, break out loop - // --> reset counter - // current: go here --stall for a thousand years --> check condition - // OOPS! will never break out. + GGML_PRINT_DEBUG_5("[#_%d] %s(): finished computing the node.\n", ith, + __func__); - // So we have to check if the counter has been reset. - if (atomic_load(done_counter) == 0) { - break; + if (ith == 0) { + while (atomic_load(done_counter) != n_threads) { + sched_yield(); } - } - GGML_PRINT_DEBUG_5( - "[#_%d] %s(): saw all threads finished computing the node.\n", - thread_local_id, __func__); + GGML_PRINT_DEBUG_5( + "[#_%d] %s(): saw all threads finished computing the node.\n", + ith, __func__); - if (ith == 0) { task_allocator_reset(allocator); atomic_store(done_counter, 0); } else { @@ -339,7 +333,7 @@ static thread_ret_t demo_compute_thread(void *data) { } } - GGML_PRINT_DEBUG_5("[#_%d] %s(): exited\n", thread_local_id, __func__); + GGML_PRINT_DEBUG_5("[#_%d] %s(): exited\n", ith, __func__); return 0; } From e300a91104cbef16e9646d17a722b167cad1d55a Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 06:09:19 +0800 Subject: [PATCH 5/7] stdbool.h --- examples/task-allocator/task-allocator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index b09186021a40c..13acc2a3e6eed 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -5,6 +5,7 @@ #include #include #include +#include #if defined(_MSC_VER) || defined(__MINGW32__) #include // using malloc.h with MSC/MINGW @@ -93,7 +94,6 @@ typedef void *thread_ret_t; #include #include -#include #endif From 4afb12fbb3f780155f039c63d67fcc558e2de4f4 Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 06:20:59 +0800 Subject: [PATCH 6/7] stdint.h --- examples/task-allocator/task-allocator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index 13acc2a3e6eed..7a96774a9ec52 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -6,6 +6,7 @@ #include #include #include +#include #if defined(_MSC_VER) || defined(__MINGW32__) #include // using malloc.h with MSC/MINGW From 76e5e2719da1851ccefebd20bc5a2d8707fc85d5 Mon Sep 17 00:00:00 2001 From: mqy Date: Wed, 28 Jun 2023 17:00:34 +0800 Subject: [PATCH 7/7] corner case: when nth is 1, n_multiplier should be 1 --- examples/task-allocator/task-allocator.c | 197 ++++++++++++++++++++--- 1 file changed, 173 insertions(+), 24 deletions(-) diff --git a/examples/task-allocator/task-allocator.c b/examples/task-allocator/task-allocator.c index 7a96774a9ec52..f9b7e3febffd0 100644 --- a/examples/task-allocator/task-allocator.c +++ b/examples/task-allocator/task-allocator.c @@ -1,12 +1,12 @@ // https://github.com/ggerganov/ggml/issues/291 // https://github.com/ggerganov/llama.cpp/pull/1507 +#include +#include #include #include #include #include -#include -#include #if defined(_MSC_VER) || defined(__MINGW32__) #include // using malloc.h with MSC/MINGW @@ -132,30 +132,36 @@ static void task_allocator_reset(struct task_allocator *a) { atomic_store(&a->global_counter, 0); } +// NOTE: when nth == 1, n_multiplier is actually useless. static void task_allocator_init(struct task_allocator *a, int nth, int n_multiplier) { + GGML_ASSERT(nth > 0); GGML_ASSERT(nth <= MAX_THREADS); + GGML_ASSERT(n_multiplier > 0); + a->nth = nth; - a->n_multiplier = n_multiplier; + a->n_multiplier = nth == 1 ? 1 : n_multiplier; task_allocator_reset(a); } +// ith: worker id (start from 0). +// chunk_idx and n_chunks will be updated. +// chunk_idx is set as -1 when nothing to do. static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, int *n_chunks) { - GGML_ASSERT(a->nth > 0); - GGML_ASSERT(a->n_multiplier > 0); + GGML_ASSERT(ith >= 0 && ith < a->nth); + + int M = a->n_multiplier; + int nth = a->nth; + int total_chunks = M * nth; *chunk_idx = -1; - *n_chunks = 0; + *n_chunks = total_chunks; while (atomic_fetch_add(&a->lock, 1) != 0) { // lock atomic_fetch_sub(&a->lock, 1); } - int M = a->n_multiplier; - int nth = a->nth; - int total_chunks = M * nth; - // all assigned? if (atomic_load(&a->global_counter) == total_chunks) { GGML_PRINT_DEBUG("[#_%d] %s(): nothing to do.\n", ith, __func__); @@ -177,7 +183,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, atomic_fetch_add(&a->thread_queue_heads[ith], 1); atomic_fetch_add(&a->global_counter, 1); - GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n", + GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th chunk of its own.\n", ith, __func__, head + 1); *chunk_idx = idx; @@ -188,6 +194,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, } // steal from others. + // TODO: optimize: steal from the slowest one. for (int i = 0; i < nth; ++i) { if (i == ith) { continue; @@ -203,7 +210,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx, atomic_fetch_sub(&a->thread_queue_tails[i], 1); atomic_fetch_add(&a->global_counter, 1); - GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", ith, + GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th chunk from #_%d\n", ith, __func__, tail, i); *chunk_idx = idx; @@ -261,7 +268,7 @@ void compute_tensor(struct params params, struct ggml_tensor *node) { while (true) { allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks); - if (chunk_idx < 0 || n_chunks <= 0) { + if (chunk_idx < 0) { break; } @@ -339,6 +346,144 @@ static thread_ret_t demo_compute_thread(void *data) { return 0; } +static void test_task_allocator_init(void) { + struct task_allocator a; + + task_allocator_init(&a, 1, 2); + GGML_ASSERT(a.nth == 1); + GGML_ASSERT(a.n_multiplier == 1); // when nth == 1, force n_multiplier as 1 + + task_allocator_init(&a, 2, 2); + GGML_ASSERT(a.nth == 2); + GGML_ASSERT(a.n_multiplier == 2); // ok +} + +static void task_allocator_unit_test_no_steal(void) { + int chunk_idx; // out + int n_chunks; // out + + int n_threads = 2; + int n_multiplier = 2; + const int expected_n_slots = n_threads * n_multiplier; + + struct task_allocator a; + task_allocator_init(&a, n_threads, n_multiplier); + + struct test_data_t { + int ith; // call by + int chunk_idx; // expected + int n_chunks; // expected + }; + + struct test_data_t test_data[] = { + //////////////////// clang format ///////////////////////// + { + .ith = 0, + .chunk_idx = 0, + }, + { + .ith = 1, + .chunk_idx = 2, + }, + { + .ith = 0, + .chunk_idx = 1, + }, + { + .ith = 1, + .chunk_idx = 3, + }, + { + .ith = 0, + .chunk_idx = -1, + }, + { + .ith = 1, + .chunk_idx = -1, + }}; + + int t_len = sizeof(test_data) / sizeof(struct test_data_t); + + for (int i = 0; i < t_len; i++) { + allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks); + if (chunk_idx != test_data[i].chunk_idx) { + fprintf(stderr, + "%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, chunk_idx, test_data[i].chunk_idx); + abort(); + } + if (n_chunks != expected_n_slots) { + fprintf(stderr, + "%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, n_chunks, expected_n_slots); + abort(); + } + } +} + +static void task_allocator_unit_test_steal(void) { + int chunk_idx; // out + int n_chunks; // out + + int n_threads = 2; + int n_multiplier = 2; + const int expected_n_slots = n_threads * n_multiplier; + + struct task_allocator a; + task_allocator_init(&a, n_threads, n_multiplier); + + struct test_data_t { + int ith; // call by + int chunk_idx; // expected + }; + + struct test_data_t test_data[] = { + //////////////////// clang format ///////////////////////// + { + .ith = 0, + .chunk_idx = 0, + }, + { + .ith = 0, + .chunk_idx = 1, + }, + { + .ith = 1, + .chunk_idx = 2, + }, + { + .ith = 0, + .chunk_idx = 4, // steal from tail + }, + { + .ith = 0, + .chunk_idx = -1, + }, + { + .ith = 1, + .chunk_idx = -1, + }}; + + int t_len = sizeof(test_data) / sizeof(struct test_data_t); + + for (int i = 0; i < t_len; i++) { + allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks); + if (chunk_idx != test_data[i].chunk_idx) { + fprintf(stderr, + "%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, chunk_idx, test_data[i].chunk_idx); + abort(); + } + if (n_chunks != expected_n_slots) { + fprintf(stderr, + "%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n", + __func__, i, n_chunks, expected_n_slots); + abort(); + } + } +} + +// Integration test. static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, int n_multiplier) { fprintf(stderr, @@ -386,36 +531,40 @@ static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units, // B can steal a chunk from A only if T_a > T_b + T_b_per_chunk. // - Saw this situation: A steal B, B steal C. // - n_chunks plays a key role, similar to choosing the best n_threads, it's -// difficult choose the ideal n_chunks value. Performance drops with per-chunk -// compute time exceeds the scheduling overhead. +// difficult to choose the ideal n_chunks value. Performance drops when +// per-chunk compute time exceeds the scheduling overhead. // - Work stealing chunked task allocator can save the response time // significantly when the majority threads runs fast but a few suddenly or // constantly slow. // int main(void) { - if (false) { // the most simple one: only main thread, one node + test_task_allocator_init(); + task_allocator_unit_test_no_steal(); + task_allocator_unit_test_steal(); + + // Integration tests + const int n_compute_units = 64; + + if (false) { int n_threads = 1; int n_nodes = 1; - int n_multiplier = 1; // trunks per thread. - int n_compute_units = 1; + int n_multiplier = 2; // equivalent to 1 test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); } - if (false) { + if (true) { int n_threads = 2; int n_nodes = 2; - int n_multiplier = 1; // trunks per thread. - int n_compute_units = 2; + int n_multiplier = 1; test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); } if (true) { - int n_threads = 4; + int n_threads = 2; int n_nodes = 2; - int n_multiplier = 8; // trunks per thread. - int n_compute_units = 32; + int n_multiplier = 8; test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier); }