Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ const liblapack_name = libblas_name
# Note that `atomics.jl` here should be deprecated
Core.eval(Threads, :(include("atomics.jl")))
include("channels.jl")
include("partr.jl")
include("scheduler/scheduler.jl")
include("task.jl")
include("threads_overloads.jl")
include("weakkeydict.jl")
Expand Down
73 changes: 12 additions & 61 deletions base/partr.jl → base/scheduler/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,6 @@ const heap_d = UInt32(8)
const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)]
const heaps_lock = [SpinLock(), SpinLock()]


"""
cong(max::UInt32)

Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0.
"""
cong(max::UInt32) = iszero(max) ? UInt32(0) : rand_ptls(max) + UInt32(1) #TODO: make sure users don't use 0 and remove this check

get_ptls_rng() = ccall(:jl_get_ptls_rng, UInt64, ())

set_ptls_rng(seed::UInt64) = ccall(:jl_set_ptls_rng, Cvoid, (UInt64,), seed)

"""
rand_ptls(max::UInt32)

Return a random UInt32 in the range `0:max-1` using the thread-local RNG
state. Max must be greater than 0.
"""
Base.@assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls(max::UInt32)
rngseed = get_ptls_rng()
val, seed = rand_uniform_max_int32(max, rngseed)
set_ptls_rng(seed)
return val % UInt32
end

# This implementation is based on OpenSSLs implementation of rand_uniform
# https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99
# Comments are vendored from their implementation as well.
# For the original developer check the PR to swift https://github.com/apple/swift/pull/39143.

# Essentially it boils down to incrementally generating a fixed point
# number on the interval [0, 1) and multiplying this number by the upper
# range limit. Once it is certain what the fractional part contributes to
# the integral part of the product, the algorithm has produced a definitive
# result.
"""
rand_uniform_max_int32(max::UInt32, seed::UInt64)

Return a random UInt32 in the range `0:max-1` using the given seed.
Max must be greater than 0.
"""
Base.@assume_effects :total function rand_uniform_max_int32(max::UInt32, seed::UInt64)
if max == UInt32(1)
return UInt32(0), seed
end
# We are generating a fixed point number on the interval [0, 1).
# Multiplying this by the range gives us a number on [0, upper).
# The high word of the multiplication result represents the integral part
# This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes
seed = UInt64(69069) * seed + UInt64(362437)
prod = (UInt64(max)) * (seed % UInt32) # 64 bit product
i = prod >> 32 % UInt32 # integral part
return i % UInt32, seed
end



function multiq_sift_up(heap::taskheap, idx::Int32)
while idx > Int32(1)
parent = (idx - Int32(2)) ÷ heap_d + Int32(1)
Expand Down Expand Up @@ -147,10 +90,10 @@ function multiq_insert(task::Task, priority::UInt16)

task.priority = priority

rn = cong(heap_p)
rn = Base.Scheduler.cong(heap_p)
tpheaps = heaps[tp]
while !trylock(tpheaps[rn].lock)
rn = cong(heap_p)
rn = Base.Scheduler.cong(heap_p)
end

heap = tpheaps[rn]
Expand Down Expand Up @@ -190,8 +133,8 @@ function multiq_deletemin()
if i == heap_p
return nothing
end
rn1 = cong(heap_p)
rn2 = cong(heap_p)
rn1 = Base.Scheduler.cong(heap_p)
rn2 = Base.Scheduler.cong(heap_p)
prio1 = tpheaps[rn1].priority
prio2 = tpheaps[rn2].priority
if prio1 > prio2
Expand Down Expand Up @@ -235,6 +178,9 @@ function multiq_check_empty()
if tp == 0 # Foreign thread
return true
end
if !isempty(Base.workqueue_for(tid))
return false
end
for i = UInt32(1):length(heaps[tp])
if heaps[tp][i].ntasks != 0
return false
Expand All @@ -243,4 +189,9 @@ function multiq_check_empty()
return true
end


enqueue!(t::Task) = multiq_insert(t, t.priority)
dequeue!() = multiq_deletemin()
checktaskempty() = multiq_check_empty()

end
74 changes: 74 additions & 0 deletions base/scheduler/scheduler.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

module Scheduler

"""
cong(max::UInt32)

Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0.
"""
cong(max::UInt32) = iszero(max) ? UInt32(0) : rand_ptls(max) + UInt32(1) #TODO: make sure users don't use 0 and remove this check

get_ptls_rng() = ccall(:jl_get_ptls_rng, UInt64, ())

set_ptls_rng(seed::UInt64) = ccall(:jl_set_ptls_rng, Cvoid, (UInt64,), seed)

"""
rand_ptls(max::UInt32)

Return a random UInt32 in the range `0:max-1` using the thread-local RNG
state. Max must be greater than 0.
"""
Base.@assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls(max::UInt32)
rngseed = get_ptls_rng()
val, seed = rand_uniform_max_int32(max, rngseed)
set_ptls_rng(seed)
return val % UInt32
end

# This implementation is based on OpenSSLs implementation of rand_uniform
# https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99
# Comments are vendored from their implementation as well.
# For the original developer check the PR to swift https://github.com/apple/swift/pull/39143.

# Essentially it boils down to incrementally generating a fixed point
# number on the interval [0, 1) and multiplying this number by the upper
# range limit. Once it is certain what the fractional part contributes to
# the integral part of the product, the algorithm has produced a definitive
# result.
"""
rand_uniform_max_int32(max::UInt32, seed::UInt64)

Return a random UInt32 in the range `0:max-1` using the given seed.
Max must be greater than 0.
"""
Base.@assume_effects :total function rand_uniform_max_int32(max::UInt32, seed::UInt64)
if max == UInt32(1)
return UInt32(0), seed
end
# We are generating a fixed point number on the interval [0, 1).
# Multiplying this by the range gives us a number on [0, upper).
# The high word of the multiplication result represents the integral part
# This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes
seed = UInt64(69069) * seed + UInt64(362437)
prod = (UInt64(max)) * (seed % UInt32) # 64 bit product
i = prod >> 32 % UInt32 # integral part
return i % UInt32, seed
end

include("scheduler/partr.jl")

const ChosenScheduler = Partr



# Scheduler interface:
# enqueue! which pushes a runnable Task into it
# dequeue! which pops a runnable Task from it
# checktaskempty which returns true if the scheduler has no available Tasks

enqueue!(t::Task) = ChosenScheduler.enqueue!(t)
dequeue!() = ChosenScheduler.dequeue!()
checktaskempty() = ChosenScheduler.checktaskempty()

end
18 changes: 12 additions & 6 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ end

function enq_work(t::Task)
(t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")

# Sticky tasks go into their thread's work queue.
if t.sticky
tid = Threads.threadid(t)
Expand Down Expand Up @@ -968,19 +967,26 @@ function enq_work(t::Task)
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1)
push!(workqueue_for(tid), t)
else
# Otherwise, put the task in the multiqueue.
Partr.multiq_insert(t, t.priority)
# Otherwise, push the task to the scheduler
Scheduler.enqueue!(t)
tid = 0
end
end
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)

if (tid == 0)
ccall(:jl_wake_any_thread, Cvoid, (Any,), current_task())
else
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
end
return t
end

function schedule(t::Task)
# [task] created -scheduled-> wait_time
maybe_record_enqueued!(t)
maybe_record_enqueued!(t)
enq_work(t)
return t
end

"""
Expand Down Expand Up @@ -1186,10 +1192,10 @@ function trypoptask(W::StickyWorkqueue)
end
return t
end
return Partr.multiq_deletemin()
return Scheduler.dequeue!()
end

checktaskempty = Partr.multiq_check_empty
checktaskempty = Scheduler.checktaskempty

function wait()
ct = current_task()
Expand Down
1 change: 1 addition & 0 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@
XX(jl_tagged_gensym) \
XX(jl_take_buffer) \
XX(jl_task_get_next) \
XX(jl_wake_any_thread) \
XX(jl_termios_size) \
XX(jl_test_cpu_feature) \
XX(jl_threadid) \
Expand Down
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ typedef struct _jl_tls_states_t {
uint64_t uv_run_leave;
uint64_t sleep_enter;
uint64_t sleep_leave;
uint64_t woken_up;
)

// some hidden state (usually just because we don't have the type's size declaration)
Expand Down
59 changes: 54 additions & 5 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ static const int16_t sleeping_like_the_dead JL_UNUSED = 2;
// plus a running count of the number of in-flight wake-ups
// n.b. this may temporarily exceed jl_n_threads
_Atomic(int) n_threads_running = 0;

// Number of threads sleeping in the scheduler, this is very similar to n_threads_running
// But it's managed by the thread itself, so if there are in flight wake-ups/threads going to sleep but not actually asleep
// then it might be higher than the actual number of threads sleeping.
// This is fine because as the only consequence is we may try to wake more threads than necessary, which is not a problem.
_Atomic(int) n_threads_idle = 0;
// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
Expand Down Expand Up @@ -205,6 +209,7 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
jl_atomic_fetch_add_relaxed(&n_threads_idle, -1);
return 1;
}
}
Expand All @@ -220,6 +225,7 @@ static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) {
int8_t state = sleeping;
if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) {
JULIA_DEBUG_SLEEPWAKE( ptls2->woken_up = cycleclock() );
int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); // increment in-flight wakeup count
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
Expand Down Expand Up @@ -302,6 +308,44 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
wakeup_thread(ct, tid);
}

STATIC_INLINE void wake_any(jl_task_t *ct) JL_NOTSAFEPOINT
{
jl_fence(); // [^store_buffering_1]
// Find sleeping thread to wake up
int self = jl_atomic_load_relaxed(&ct->tid);
int nthreads = jl_atomic_load_acquire(&jl_n_threads);
int idle_threads = jl_atomic_load_relaxed(&n_threads_idle);
jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner);
if (uvlock == ct)
uv_stop(jl_global_event_loop());
if (idle_threads > 0) {
for (int i = 1; i < nthreads; i++) {
int tid = self + i;
if (tid >= nthreads)
tid -= nthreads;
if ((tid != self) && wake_thread(tid)) {
if (uvlock != ct) {
jl_fence();
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
jl_task_t *tid_task = jl_atomic_load_relaxed(&other->current_task);
// now that we have changed the thread to not-sleeping, ensure that
// either it has not yet acquired the libuv lock, or that it will
// observe the change of state to not_sleeping
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task)
wake_libuv();
}
break;
}
}
}
}

JL_DLLEXPORT void jl_wake_any_thread(jl_task_t *ct)
{
wake_any(ct);
}


// get the next runnable task
static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
{
Expand Down Expand Up @@ -366,8 +410,9 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,

while (1) {
jl_task_t *task = get_next_task(trypoptask, q);
if (task)
if (task) {
return task;
}

// quick, race-y check to see if there seems to be any stuff in there
jl_cpu_pause();
Expand All @@ -382,6 +427,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// acquire sleep-check lock
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
jl_atomic_fetch_add_relaxed(&n_threads_idle, 1);
jl_fence(); // [^store_buffering_1]
JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls);
if (!check_empty(checkempty)) { // uses relaxed loads
Expand Down Expand Up @@ -437,7 +483,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid));
wakeup_thread(ct, jl_atomic_load_relaxed(&io_loop_tid));

}
if (uvlock) {
Expand Down Expand Up @@ -519,6 +565,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// else should we warn the user of certain deadlock here if tid == 0 && n_threads_running == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
jl_atomic_fetch_add_relaxed(&n_threads_idle, -1); // We got woken up by someone else so we are no longer idle
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&n_threads_running));
start_cycles = 0;
Expand All @@ -533,13 +580,15 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
JL_CATCH {
// probably SIGINT, but possibly a user mistake in trypoptask
if (!isrunning)
if (!isrunning) {
jl_atomic_fetch_add_relaxed(&n_threads_running, 1);
}
set_not_sleeping(ptls);
jl_rethrow();
}
if (task)
if (task) {
return task;
}
}
else {
// maybe check the kernel for new messages too
Expand Down
2 changes: 1 addition & 1 deletion test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ end
end

@testset "rand_ptls underflow" begin
@test Base.Partr.cong(UInt32(0)) == 0
@test Base.Scheduler.cong(UInt32(0)) == 0
end

@testset "num_stack_mappings metric" begin
Expand Down