diff --git a/base/Base.jl b/base/Base.jl index afa5a3d93d27c..710be9d098f75 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -149,7 +149,7 @@ const liblapack_name = libblas_name # Note that `atomics.jl` here should be deprecated Core.eval(Threads, :(include("atomics.jl"))) include("channels.jl") -include("partr.jl") +include("scheduler/scheduler.jl") include("task.jl") include("threads_overloads.jl") include("weakkeydict.jl") diff --git a/base/partr.jl b/base/scheduler/partr.jl similarity index 66% rename from base/partr.jl rename to base/scheduler/partr.jl index 6053a584af5ba..fd80065056e62 100644 --- a/base/partr.jl +++ b/base/scheduler/partr.jl @@ -19,63 +19,6 @@ const heap_d = UInt32(8) const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)] const heaps_lock = [SpinLock(), SpinLock()] - -""" - cong(max::UInt32) - -Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0. -""" -cong(max::UInt32) = iszero(max) ? UInt32(0) : rand_ptls(max) + UInt32(1) #TODO: make sure users don't use 0 and remove this check - -get_ptls_rng() = ccall(:jl_get_ptls_rng, UInt64, ()) - -set_ptls_rng(seed::UInt64) = ccall(:jl_set_ptls_rng, Cvoid, (UInt64,), seed) - -""" - rand_ptls(max::UInt32) - -Return a random UInt32 in the range `0:max-1` using the thread-local RNG -state. Max must be greater than 0. -""" -Base.@assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls(max::UInt32) - rngseed = get_ptls_rng() - val, seed = rand_uniform_max_int32(max, rngseed) - set_ptls_rng(seed) - return val % UInt32 -end - -# This implementation is based on OpenSSLs implementation of rand_uniform -# https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99 -# Comments are vendored from their implementation as well. -# For the original developer check the PR to swift https://github.com/apple/swift/pull/39143. - -# Essentially it boils down to incrementally generating a fixed point -# number on the interval [0, 1) and multiplying this number by the upper -# range limit. Once it is certain what the fractional part contributes to -# the integral part of the product, the algorithm has produced a definitive -# result. -""" - rand_uniform_max_int32(max::UInt32, seed::UInt64) - -Return a random UInt32 in the range `0:max-1` using the given seed. -Max must be greater than 0. -""" -Base.@assume_effects :total function rand_uniform_max_int32(max::UInt32, seed::UInt64) - if max == UInt32(1) - return UInt32(0), seed - end - # We are generating a fixed point number on the interval [0, 1). - # Multiplying this by the range gives us a number on [0, upper). - # The high word of the multiplication result represents the integral part - # This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes - seed = UInt64(69069) * seed + UInt64(362437) - prod = (UInt64(max)) * (seed % UInt32) # 64 bit product - i = prod >> 32 % UInt32 # integral part - return i % UInt32, seed -end - - - function multiq_sift_up(heap::taskheap, idx::Int32) while idx > Int32(1) parent = (idx - Int32(2)) รท heap_d + Int32(1) @@ -147,10 +90,10 @@ function multiq_insert(task::Task, priority::UInt16) task.priority = priority - rn = cong(heap_p) + rn = Base.Scheduler.cong(heap_p) tpheaps = heaps[tp] while !trylock(tpheaps[rn].lock) - rn = cong(heap_p) + rn = Base.Scheduler.cong(heap_p) end heap = tpheaps[rn] @@ -190,8 +133,8 @@ function multiq_deletemin() if i == heap_p return nothing end - rn1 = cong(heap_p) - rn2 = cong(heap_p) + rn1 = Base.Scheduler.cong(heap_p) + rn2 = Base.Scheduler.cong(heap_p) prio1 = tpheaps[rn1].priority prio2 = tpheaps[rn2].priority if prio1 > prio2 @@ -235,6 +178,9 @@ function multiq_check_empty() if tp == 0 # Foreign thread return true end + if !isempty(Base.workqueue_for(tid)) + return false + end for i = UInt32(1):length(heaps[tp]) if heaps[tp][i].ntasks != 0 return false @@ -243,4 +189,9 @@ function multiq_check_empty() return true end + +enqueue!(t::Task) = multiq_insert(t, t.priority) +dequeue!() = multiq_deletemin() +checktaskempty() = multiq_check_empty() + end diff --git a/base/scheduler/scheduler.jl b/base/scheduler/scheduler.jl new file mode 100644 index 0000000000000..b47e91aabba9a --- /dev/null +++ b/base/scheduler/scheduler.jl @@ -0,0 +1,74 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module Scheduler + +""" + cong(max::UInt32) + +Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0. +""" +cong(max::UInt32) = iszero(max) ? UInt32(0) : rand_ptls(max) + UInt32(1) #TODO: make sure users don't use 0 and remove this check + +get_ptls_rng() = ccall(:jl_get_ptls_rng, UInt64, ()) + +set_ptls_rng(seed::UInt64) = ccall(:jl_set_ptls_rng, Cvoid, (UInt64,), seed) + +""" + rand_ptls(max::UInt32) + +Return a random UInt32 in the range `0:max-1` using the thread-local RNG +state. Max must be greater than 0. +""" +Base.@assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls(max::UInt32) + rngseed = get_ptls_rng() + val, seed = rand_uniform_max_int32(max, rngseed) + set_ptls_rng(seed) + return val % UInt32 +end + +# This implementation is based on OpenSSLs implementation of rand_uniform +# https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99 +# Comments are vendored from their implementation as well. +# For the original developer check the PR to swift https://github.com/apple/swift/pull/39143. + +# Essentially it boils down to incrementally generating a fixed point +# number on the interval [0, 1) and multiplying this number by the upper +# range limit. Once it is certain what the fractional part contributes to +# the integral part of the product, the algorithm has produced a definitive +# result. +""" + rand_uniform_max_int32(max::UInt32, seed::UInt64) + +Return a random UInt32 in the range `0:max-1` using the given seed. +Max must be greater than 0. +""" +Base.@assume_effects :total function rand_uniform_max_int32(max::UInt32, seed::UInt64) + if max == UInt32(1) + return UInt32(0), seed + end + # We are generating a fixed point number on the interval [0, 1). + # Multiplying this by the range gives us a number on [0, upper). + # The high word of the multiplication result represents the integral part + # This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes + seed = UInt64(69069) * seed + UInt64(362437) + prod = (UInt64(max)) * (seed % UInt32) # 64 bit product + i = prod >> 32 % UInt32 # integral part + return i % UInt32, seed +end + +include("scheduler/partr.jl") + +const ChosenScheduler = Partr + + + +# Scheduler interface: + # enqueue! which pushes a runnable Task into it + # dequeue! which pops a runnable Task from it + # checktaskempty which returns true if the scheduler has no available Tasks + +enqueue!(t::Task) = ChosenScheduler.enqueue!(t) +dequeue!() = ChosenScheduler.dequeue!() +checktaskempty() = ChosenScheduler.checktaskempty() + +end diff --git a/base/task.jl b/base/task.jl index bc20ff8ac320a..d51af56f5e263 100644 --- a/base/task.jl +++ b/base/task.jl @@ -937,7 +937,6 @@ end function enq_work(t::Task) (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable") - # Sticky tasks go into their thread's work queue. if t.sticky tid = Threads.threadid(t) @@ -968,19 +967,26 @@ function enq_work(t::Task) ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) push!(workqueue_for(tid), t) else - # Otherwise, put the task in the multiqueue. - Partr.multiq_insert(t, t.priority) + # Otherwise, push the task to the scheduler + Scheduler.enqueue!(t) tid = 0 end end - ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) + + if (tid == 0) + ccall(:jl_wake_any_thread, Cvoid, (Any,), current_task()) + else + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) + end return t end function schedule(t::Task) # [task] created -scheduled-> wait_time maybe_record_enqueued!(t) + maybe_record_enqueued!(t) enq_work(t) + return t end """ @@ -1186,10 +1192,10 @@ function trypoptask(W::StickyWorkqueue) end return t end - return Partr.multiq_deletemin() + return Scheduler.dequeue!() end -checktaskempty = Partr.multiq_check_empty +checktaskempty = Scheduler.checktaskempty function wait() ct = current_task() diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index ea6f4c38f7f70..4b1402879c34c 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -441,6 +441,7 @@ XX(jl_tagged_gensym) \ XX(jl_take_buffer) \ XX(jl_task_get_next) \ + XX(jl_wake_any_thread) \ XX(jl_termios_size) \ XX(jl_test_cpu_feature) \ XX(jl_threadid) \ diff --git a/src/julia_threads.h b/src/julia_threads.h index dbe9166f288a9..6e24477999448 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -214,6 +214,7 @@ typedef struct _jl_tls_states_t { uint64_t uv_run_leave; uint64_t sleep_enter; uint64_t sleep_leave; + uint64_t woken_up; ) // some hidden state (usually just because we don't have the type's size declaration) diff --git a/src/scheduler.c b/src/scheduler.c index 731a0c5146605..63f1799195823 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -32,7 +32,11 @@ static const int16_t sleeping_like_the_dead JL_UNUSED = 2; // plus a running count of the number of in-flight wake-ups // n.b. this may temporarily exceed jl_n_threads _Atomic(int) n_threads_running = 0; - +// Number of threads sleeping in the scheduler, this is very similar to n_threads_running +// But it's managed by the thread itself, so if there are in flight wake-ups/threads going to sleep but not actually asleep +// then it might be higher than the actual number of threads sleeping. +// This is fine because as the only consequence is we may try to wake more threads than necessary, which is not a problem. +_Atomic(int) n_threads_idle = 0; // invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending). // invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping. // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. @@ -205,6 +209,7 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) { if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) { + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); return 1; } } @@ -220,6 +225,7 @@ static int wake_thread(int16_t tid) JL_NOTSAFEPOINT if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) { int8_t state = sleeping; if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) { + JULIA_DEBUG_SLEEPWAKE( ptls2->woken_up = cycleclock() ); int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); // increment in-flight wakeup count assert(wasrunning); (void)wasrunning; JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state); @@ -302,6 +308,44 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT wakeup_thread(ct, tid); } +STATIC_INLINE void wake_any(jl_task_t *ct) JL_NOTSAFEPOINT +{ + jl_fence(); // [^store_buffering_1] + // Find sleeping thread to wake up + int self = jl_atomic_load_relaxed(&ct->tid); + int nthreads = jl_atomic_load_acquire(&jl_n_threads); + int idle_threads = jl_atomic_load_relaxed(&n_threads_idle); + jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); + if (uvlock == ct) + uv_stop(jl_global_event_loop()); + if (idle_threads > 0) { + for (int i = 1; i < nthreads; i++) { + int tid = self + i; + if (tid >= nthreads) + tid -= nthreads; + if ((tid != self) && wake_thread(tid)) { + if (uvlock != ct) { + jl_fence(); + jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + jl_task_t *tid_task = jl_atomic_load_relaxed(&other->current_task); + // now that we have changed the thread to not-sleeping, ensure that + // either it has not yet acquired the libuv lock, or that it will + // observe the change of state to not_sleeping + if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task) + wake_libuv(); + } + break; + } + } + } +} + +JL_DLLEXPORT void jl_wake_any_thread(jl_task_t *ct) +{ + wake_any(ct); +} + + // get the next runnable task static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) { @@ -366,8 +410,9 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, while (1) { jl_task_t *task = get_next_task(trypoptask, q); - if (task) + if (task) { return task; + } // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); @@ -382,6 +427,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // acquire sleep-check lock assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping); + jl_atomic_fetch_add_relaxed(&n_threads_idle, 1); jl_fence(); // [^store_buffering_1] JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls); if (!check_empty(checkempty)) { // uses relaxed loads @@ -437,7 +483,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // responsibility, so need to make sure thread 0 will take care // of us. if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock - jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid)); + wakeup_thread(ct, jl_atomic_load_relaxed(&io_loop_tid)); } if (uvlock) { @@ -519,6 +565,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0? uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); } + jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); // We got woken up by someone else so we are no longer idle assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); assert(jl_atomic_load_relaxed(&n_threads_running)); start_cycles = 0; @@ -533,13 +580,15 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, } JL_CATCH { // probably SIGINT, but possibly a user mistake in trypoptask - if (!isrunning) + if (!isrunning) { jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + } set_not_sleeping(ptls); jl_rethrow(); } - if (task) + if (task) { return task; + } } else { // maybe check the kernel for new messages too diff --git a/test/threads.jl b/test/threads.jl index fa0b33a6352f3..747452906114c 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -338,7 +338,7 @@ end end @testset "rand_ptls underflow" begin - @test Base.Partr.cong(UInt32(0)) == 0 + @test Base.Scheduler.cong(UInt32(0)) == 0 end @testset "num_stack_mappings metric" begin