From f202cab424ed40227db0e5a791f56433188aec2e Mon Sep 17 00:00:00 2001 From: gbaraldi Date: Thu, 22 May 2025 14:09:17 -0300 Subject: [PATCH 1/3] Refactor wakeup of threads and scheduler file logic --- base/Base.jl | 2 +- base/{ => scheduler}/partr.jl | 73 ++++++---------------------------- base/scheduler/scheduler.jl | 74 +++++++++++++++++++++++++++++++++++ base/task.jl | 34 ++++++++++++---- src/jl_exported_funcs.inc | 1 + src/julia_threads.h | 1 + src/scheduler.c | 64 +++++++++++++++++++++++++++--- test.jl | 7 ++++ test/threads.jl | 2 +- 9 files changed, 182 insertions(+), 76 deletions(-) rename base/{ => scheduler}/partr.jl (66%) create mode 100644 base/scheduler/scheduler.jl create mode 100644 test.jl diff --git a/base/Base.jl b/base/Base.jl index afa5a3d93d27c..710be9d098f75 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -149,7 +149,7 @@ const liblapack_name = libblas_name # Note that `atomics.jl` here should be deprecated Core.eval(Threads, :(include("atomics.jl"))) include("channels.jl") -include("partr.jl") +include("scheduler/scheduler.jl") include("task.jl") include("threads_overloads.jl") include("weakkeydict.jl") diff --git a/base/partr.jl b/base/scheduler/partr.jl similarity index 66% rename from base/partr.jl rename to base/scheduler/partr.jl index 6053a584af5ba..fd80065056e62 100644 --- a/base/partr.jl +++ b/base/scheduler/partr.jl @@ -19,63 +19,6 @@ const heap_d = UInt32(8) const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)] const heaps_lock = [SpinLock(), SpinLock()] - -""" - cong(max::UInt32) - -Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0. -""" -cong(max::UInt32) = iszero(max) ? UInt32(0) : rand_ptls(max) + UInt32(1) #TODO: make sure users don't use 0 and remove this check - -get_ptls_rng() = ccall(:jl_get_ptls_rng, UInt64, ()) - -set_ptls_rng(seed::UInt64) = ccall(:jl_set_ptls_rng, Cvoid, (UInt64,), seed) - -""" - rand_ptls(max::UInt32) - -Return a random UInt32 in the range `0:max-1` using the thread-local RNG -state. Max must be greater than 0. -""" -Base.@assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls(max::UInt32) - rngseed = get_ptls_rng() - val, seed = rand_uniform_max_int32(max, rngseed) - set_ptls_rng(seed) - return val % UInt32 -end - -# This implementation is based on OpenSSLs implementation of rand_uniform -# https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99 -# Comments are vendored from their implementation as well. -# For the original developer check the PR to swift https://github.com/apple/swift/pull/39143. - -# Essentially it boils down to incrementally generating a fixed point -# number on the interval [0, 1) and multiplying this number by the upper -# range limit. Once it is certain what the fractional part contributes to -# the integral part of the product, the algorithm has produced a definitive -# result. -""" - rand_uniform_max_int32(max::UInt32, seed::UInt64) - -Return a random UInt32 in the range `0:max-1` using the given seed. -Max must be greater than 0. -""" -Base.@assume_effects :total function rand_uniform_max_int32(max::UInt32, seed::UInt64) - if max == UInt32(1) - return UInt32(0), seed - end - # We are generating a fixed point number on the interval [0, 1). - # Multiplying this by the range gives us a number on [0, upper). - # The high word of the multiplication result represents the integral part - # This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes - seed = UInt64(69069) * seed + UInt64(362437) - prod = (UInt64(max)) * (seed % UInt32) # 64 bit product - i = prod >> 32 % UInt32 # integral part - return i % UInt32, seed -end - - - function multiq_sift_up(heap::taskheap, idx::Int32) while idx > Int32(1) parent = (idx - Int32(2)) รท heap_d + Int32(1) @@ -147,10 +90,10 @@ function multiq_insert(task::Task, priority::UInt16) task.priority = priority - rn = cong(heap_p) + rn = Base.Scheduler.cong(heap_p) tpheaps = heaps[tp] while !trylock(tpheaps[rn].lock) - rn = cong(heap_p) + rn = Base.Scheduler.cong(heap_p) end heap = tpheaps[rn] @@ -190,8 +133,8 @@ function multiq_deletemin() if i == heap_p return nothing end - rn1 = cong(heap_p) - rn2 = cong(heap_p) + rn1 = Base.Scheduler.cong(heap_p) + rn2 = Base.Scheduler.cong(heap_p) prio1 = tpheaps[rn1].priority prio2 = tpheaps[rn2].priority if prio1 > prio2 @@ -235,6 +178,9 @@ function multiq_check_empty() if tp == 0 # Foreign thread return true end + if !isempty(Base.workqueue_for(tid)) + return false + end for i = UInt32(1):length(heaps[tp]) if heaps[tp][i].ntasks != 0 return false @@ -243,4 +189,9 @@ function multiq_check_empty() return true end + +enqueue!(t::Task) = multiq_insert(t, t.priority) +dequeue!() = multiq_deletemin() +checktaskempty() = multiq_check_empty() + end diff --git a/base/scheduler/scheduler.jl b/base/scheduler/scheduler.jl new file mode 100644 index 0000000000000..b47e91aabba9a --- /dev/null +++ b/base/scheduler/scheduler.jl @@ -0,0 +1,74 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module Scheduler + +""" + cong(max::UInt32) + +Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0. +""" +cong(max::UInt32) = iszero(max) ? UInt32(0) : rand_ptls(max) + UInt32(1) #TODO: make sure users don't use 0 and remove this check + +get_ptls_rng() = ccall(:jl_get_ptls_rng, UInt64, ()) + +set_ptls_rng(seed::UInt64) = ccall(:jl_set_ptls_rng, Cvoid, (UInt64,), seed) + +""" + rand_ptls(max::UInt32) + +Return a random UInt32 in the range `0:max-1` using the thread-local RNG +state. Max must be greater than 0. +""" +Base.@assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls(max::UInt32) + rngseed = get_ptls_rng() + val, seed = rand_uniform_max_int32(max, rngseed) + set_ptls_rng(seed) + return val % UInt32 +end + +# This implementation is based on OpenSSLs implementation of rand_uniform +# https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99 +# Comments are vendored from their implementation as well. +# For the original developer check the PR to swift https://github.com/apple/swift/pull/39143. + +# Essentially it boils down to incrementally generating a fixed point +# number on the interval [0, 1) and multiplying this number by the upper +# range limit. Once it is certain what the fractional part contributes to +# the integral part of the product, the algorithm has produced a definitive +# result. +""" + rand_uniform_max_int32(max::UInt32, seed::UInt64) + +Return a random UInt32 in the range `0:max-1` using the given seed. +Max must be greater than 0. +""" +Base.@assume_effects :total function rand_uniform_max_int32(max::UInt32, seed::UInt64) + if max == UInt32(1) + return UInt32(0), seed + end + # We are generating a fixed point number on the interval [0, 1). + # Multiplying this by the range gives us a number on [0, upper). + # The high word of the multiplication result represents the integral part + # This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes + seed = UInt64(69069) * seed + UInt64(362437) + prod = (UInt64(max)) * (seed % UInt32) # 64 bit product + i = prod >> 32 % UInt32 # integral part + return i % UInt32, seed +end + +include("scheduler/partr.jl") + +const ChosenScheduler = Partr + + + +# Scheduler interface: + # enqueue! which pushes a runnable Task into it + # dequeue! which pops a runnable Task from it + # checktaskempty which returns true if the scheduler has no available Tasks + +enqueue!(t::Task) = ChosenScheduler.enqueue!(t) +dequeue!() = ChosenScheduler.dequeue!() +checktaskempty() = ChosenScheduler.checktaskempty() + +end diff --git a/base/task.jl b/base/task.jl index cddf1fc854f4c..cf270b5429d28 100644 --- a/base/task.jl +++ b/base/task.jl @@ -937,7 +937,6 @@ end function enq_work(t::Task) (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable") - # Sticky tasks go into their thread's work queue. if t.sticky tid = Threads.threadid(t) @@ -968,19 +967,40 @@ function enq_work(t::Task) ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) push!(workqueue_for(tid), t) else - # Otherwise, put the task in the multiqueue. - Partr.multiq_insert(t, t.priority) + # Otherwise, push the task to the scheduler + Scheduler.enqueue!(t) tid = 0 end end - ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) + + if (tid == 0) + ccall(:jl_wake_any_thread, Cvoid, (Any,), current_task()) + else + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) + end return t end +const ChildFirst = false + function schedule(t::Task) # [task] created -scheduled-> wait_time maybe_record_enqueued!(t) - enq_work(t) + if ChildFirst + ct = current_task() + if ct.sticky || t.sticky + maybe_record_enqueued!(t) + enq_work(t) + else + maybe_record_enqueued!(t) + enq_work(ct) + yieldto(t) + end + else + maybe_record_enqueued!(t) + enq_work(t) + end + return t end """ @@ -1186,10 +1206,10 @@ function trypoptask(W::StickyWorkqueue) end return t end - return Partr.multiq_deletemin() + return Scheduler.dequeue!() end -checktaskempty = Partr.multiq_check_empty +checktaskempty = Scheduler.checktaskempty function wait() ct = current_task() diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 7b204066b8c28..385612ca8b6b2 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -441,6 +441,7 @@ XX(jl_tagged_gensym) \ XX(jl_take_buffer) \ XX(jl_task_get_next) \ + XX(jl_wake_any_thread) \ XX(jl_termios_size) \ XX(jl_test_cpu_feature) \ XX(jl_threadid) \ diff --git a/src/julia_threads.h b/src/julia_threads.h index dbe9166f288a9..6e24477999448 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -214,6 +214,7 @@ typedef struct _jl_tls_states_t { uint64_t uv_run_leave; uint64_t sleep_enter; uint64_t sleep_leave; + uint64_t woken_up; ) // some hidden state (usually just because we don't have the type's size declaration) diff --git a/src/scheduler.c b/src/scheduler.c index 731a0c5146605..7556f39e81a5d 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -32,7 +32,8 @@ static const int16_t sleeping_like_the_dead JL_UNUSED = 2; // plus a running count of the number of in-flight wake-ups // n.b. this may temporarily exceed jl_n_threads _Atomic(int) n_threads_running = 0; - +// Number of threads sleeping in the scheduler, this number may be lower than the actual number +_Atomic(int) n_threads_idle = 0; // invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending). // invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping. // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. @@ -220,6 +221,7 @@ static int wake_thread(int16_t tid) JL_NOTSAFEPOINT if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) { int8_t state = sleeping; if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) { + JULIA_DEBUG_SLEEPWAKE( ptls2->woken_up = cycleclock() ); int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); // increment in-flight wakeup count assert(wasrunning); (void)wasrunning; JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state); @@ -359,6 +361,43 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT } +STATIC_INLINE void wake_any(jl_task_t *ct) JL_NOTSAFEPOINT +{ + // Find sleeping thread to wake up + int self = jl_atomic_load_relaxed(&ct->tid); + int nthreads = jl_atomic_load_acquire(&jl_n_threads); + int idle_threads = jl_atomic_load_relaxed(&n_threads_idle); + jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); + if (uvlock == ct) + uv_stop(jl_global_event_loop()); + if (idle_threads > 0) { + int anysleep = 0; + for (int tid = self + 1; tid < nthreads; tid++) { + if ((tid != self) && wake_thread(tid)) { + anysleep = 1; + break; + } + } + for (int tid = 0; tid < self; tid++) { + if ((tid != self) && wake_thread(tid)) { + anysleep = 1; + break; + } + } + if (anysleep) { + jl_fence(); // This fence is expensive but needed for libuv to do RUN_ONCE + if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL) + wake_libuv(); + } + } +} + +JL_DLLEXPORT void jl_wake_any_thread(jl_task_t *ct) +{ + wake_any(ct); +} + + JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty) { jl_task_t *ct = jl_current_task; @@ -366,8 +405,10 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, while (1) { jl_task_t *task = get_next_task(trypoptask, q); - if (task) + if (task) { + wake_any(ct); return task; + } // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); @@ -382,12 +423,14 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // acquire sleep-check lock assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping); + jl_atomic_fetch_add_relaxed(&n_threads_idle, 1); jl_fence(); // [^store_buffering_1] JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls); if (!check_empty(checkempty)) { // uses relaxed loads if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); continue; } volatile int isrunning = 1; @@ -399,12 +442,14 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); continue; // jump to JL_CATCH } if (task) { if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); continue; // jump to JL_CATCH } @@ -437,7 +482,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // responsibility, so need to make sure thread 0 will take care // of us. if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock - jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid)); + wakeup_thread(ct, jl_atomic_load_relaxed(&io_loop_tid)); } if (uvlock) { @@ -451,7 +496,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, uv_loop_t *loop = jl_global_event_loop(); loop->stop_flag = 0; JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() ); - active = uv_run(loop, UV_RUN_ONCE); + active = uv_run(loop, UV_RUN_NOWAIT); JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() ); jl_gc_safepoint(); } @@ -465,6 +510,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); start_cycles = 0; continue; // jump to JL_CATCH } @@ -474,6 +520,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); start_cycles = 0; continue; // jump to JL_CATCH } @@ -519,6 +566,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0? uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); assert(jl_atomic_load_relaxed(&n_threads_running)); start_cycles = 0; @@ -533,13 +581,17 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, } JL_CATCH { // probably SIGINT, but possibly a user mistake in trypoptask - if (!isrunning) + if (!isrunning) { + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + } set_not_sleeping(ptls); jl_rethrow(); } - if (task) + if (task) { + wake_any(ct); return task; + } } else { // maybe check the kernel for new messages too diff --git a/test.jl b/test.jl new file mode 100644 index 0000000000000..7a31d00c6b827 --- /dev/null +++ b/test.jl @@ -0,0 +1,7 @@ +function fib(n::Int) + n <= 1 && return n + t = Threads.@spawn fib(n - 2) + return fib(n - 1) + fetch(t)::Int + end + +fib(34) diff --git a/test/threads.jl b/test/threads.jl index fa0b33a6352f3..747452906114c 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -338,7 +338,7 @@ end end @testset "rand_ptls underflow" begin - @test Base.Partr.cong(UInt32(0)) == 0 + @test Base.Scheduler.cong(UInt32(0)) == 0 end @testset "num_stack_mappings metric" begin From 471333cea9c6b7ef4c1c52401f636eff65ae988c Mon Sep 17 00:00:00 2001 From: gbaraldi Date: Tue, 3 Jun 2025 11:43:12 -0300 Subject: [PATCH 2/3] Change loop logic slightly --- src/scheduler.c | 77 ++++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/src/scheduler.c b/src/scheduler.c index 7556f39e81a5d..d3510c171555e 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -304,6 +304,44 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT wakeup_thread(ct, tid); } +STATIC_INLINE void wake_any(jl_task_t *ct) JL_NOTSAFEPOINT +{ + jl_fence(); // [^store_buffering_1] + // Find sleeping thread to wake up + int self = jl_atomic_load_relaxed(&ct->tid); + int nthreads = jl_atomic_load_acquire(&jl_n_threads); + int idle_threads = jl_atomic_load_relaxed(&n_threads_idle); + jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); + if (uvlock == ct) + uv_stop(jl_global_event_loop()); + if (idle_threads > 0) { + for (int i = 1; i < nthreads; i++) { + int tid = self + i; + if (tid >= nthreads) + tid -= nthreads; + if ((tid != self) && wake_thread(tid)) { + if (uvlock != ct) { + jl_fence(); + jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + jl_task_t *tid_task = jl_atomic_load_relaxed(&other->current_task); + // now that we have changed the thread to not-sleeping, ensure that + // either it has not yet acquired the libuv lock, or that it will + // observe the change of state to not_sleeping + if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task) + wake_libuv(); + } + break; + } + } + } +} + +JL_DLLEXPORT void jl_wake_any_thread(jl_task_t *ct) +{ + wake_any(ct); +} + + // get the next runnable task static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) { @@ -361,43 +399,6 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT } -STATIC_INLINE void wake_any(jl_task_t *ct) JL_NOTSAFEPOINT -{ - // Find sleeping thread to wake up - int self = jl_atomic_load_relaxed(&ct->tid); - int nthreads = jl_atomic_load_acquire(&jl_n_threads); - int idle_threads = jl_atomic_load_relaxed(&n_threads_idle); - jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); - if (uvlock == ct) - uv_stop(jl_global_event_loop()); - if (idle_threads > 0) { - int anysleep = 0; - for (int tid = self + 1; tid < nthreads; tid++) { - if ((tid != self) && wake_thread(tid)) { - anysleep = 1; - break; - } - } - for (int tid = 0; tid < self; tid++) { - if ((tid != self) && wake_thread(tid)) { - anysleep = 1; - break; - } - } - if (anysleep) { - jl_fence(); // This fence is expensive but needed for libuv to do RUN_ONCE - if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL) - wake_libuv(); - } - } -} - -JL_DLLEXPORT void jl_wake_any_thread(jl_task_t *ct) -{ - wake_any(ct); -} - - JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty) { jl_task_t *ct = jl_current_task; @@ -406,7 +407,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, while (1) { jl_task_t *task = get_next_task(trypoptask, q); if (task) { - wake_any(ct); return task; } @@ -589,7 +589,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_rethrow(); } if (task) { - wake_any(ct); return task; } } From 5c2534001cb01e461400f9d9c8891b01b82cc0cd Mon Sep 17 00:00:00 2001 From: gbaraldi Date: Thu, 12 Jun 2025 18:35:25 -0300 Subject: [PATCH 3/3] Small code cleanup --- base/task.jl | 18 ++---------------- src/scheduler.c | 16 +++++++--------- test.jl | 7 ------- 3 files changed, 9 insertions(+), 32 deletions(-) delete mode 100644 test.jl diff --git a/base/task.jl b/base/task.jl index 29f1564d68496..d51af56f5e263 100644 --- a/base/task.jl +++ b/base/task.jl @@ -981,25 +981,11 @@ function enq_work(t::Task) return t end -const ChildFirst = false - function schedule(t::Task) # [task] created -scheduled-> wait_time maybe_record_enqueued!(t) - if ChildFirst - ct = current_task() - if ct.sticky || t.sticky - maybe_record_enqueued!(t) - enq_work(t) - else - maybe_record_enqueued!(t) - enq_work(ct) - yieldto(t) - end - else - maybe_record_enqueued!(t) - enq_work(t) - end + maybe_record_enqueued!(t) + enq_work(t) return t end diff --git a/src/scheduler.c b/src/scheduler.c index d3510c171555e..63f1799195823 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -32,7 +32,10 @@ static const int16_t sleeping_like_the_dead JL_UNUSED = 2; // plus a running count of the number of in-flight wake-ups // n.b. this may temporarily exceed jl_n_threads _Atomic(int) n_threads_running = 0; -// Number of threads sleeping in the scheduler, this number may be lower than the actual number +// Number of threads sleeping in the scheduler, this is very similar to n_threads_running +// But it's managed by the thread itself, so if there are in flight wake-ups/threads going to sleep but not actually asleep +// then it might be higher than the actual number of threads sleeping. +// This is fine because as the only consequence is we may try to wake more threads than necessary, which is not a problem. _Atomic(int) n_threads_idle = 0; // invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending). // invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping. @@ -206,6 +209,7 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) { if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) { + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); return 1; } } @@ -430,7 +434,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls); } - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); continue; } volatile int isrunning = 1; @@ -442,14 +445,12 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls); } - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); continue; // jump to JL_CATCH } if (task) { if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls); } - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); continue; // jump to JL_CATCH } @@ -496,7 +497,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, uv_loop_t *loop = jl_global_event_loop(); loop->stop_flag = 0; JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() ); - active = uv_run(loop, UV_RUN_NOWAIT); + active = uv_run(loop, UV_RUN_ONCE); JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() ); jl_gc_safepoint(); } @@ -510,7 +511,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls); } - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); start_cycles = 0; continue; // jump to JL_CATCH } @@ -520,7 +520,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (set_not_sleeping(ptls)) { JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls); } - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); start_cycles = 0; continue; // jump to JL_CATCH } @@ -566,7 +565,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0? uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); } - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); // We got woken up by someone else so we are no longer idle assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); assert(jl_atomic_load_relaxed(&n_threads_running)); start_cycles = 0; @@ -582,7 +581,6 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, JL_CATCH { // probably SIGINT, but possibly a user mistake in trypoptask if (!isrunning) { - jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); jl_atomic_fetch_add_relaxed(&n_threads_running, 1); } set_not_sleeping(ptls); diff --git a/test.jl b/test.jl deleted file mode 100644 index 7a31d00c6b827..0000000000000 --- a/test.jl +++ /dev/null @@ -1,7 +0,0 @@ -function fib(n::Int) - n <= 1 && return n - t = Threads.@spawn fib(n - 2) - return fib(n - 1) + fetch(t)::Int - end - -fib(34)