Skip to content

Commit 06ac262

Browse files
runtime: initial scheduler changes for timers on P's
Add support to the main scheduler loop for handling timers on P's. This is not used yet, as timers are not yet put on P's. Updates #6239 Updates #27707 Change-Id: I6a359df408629f333a9232142ce19e8be8496dae Reviewed-on: https://go-review.googlesource.com/c/go/+/171826 Run-TryBot: Ian Lance Taylor <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: Michael Knyszek <[email protected]>
1 parent ff9f7bc commit 06ac262

File tree

3 files changed

+186
-15
lines changed

3 files changed

+186
-15
lines changed

src/runtime/proc.go

+153-13
Original file line numberDiff line numberDiff line change
@@ -2221,6 +2221,9 @@ top:
22212221
if _p_.runSafePointFn != 0 {
22222222
runSafePointFn()
22232223
}
2224+
2225+
now, pollUntil, _ := checkTimers(_p_, 0)
2226+
22242227
if fingwait && fingwake {
22252228
if gp := wakefing(); gp != nil {
22262229
ready(gp, 0, true)
@@ -2266,12 +2269,7 @@ top:
22662269

22672270
// Steal work from other P's.
22682271
procs := uint32(gomaxprocs)
2269-
if atomic.Load(&sched.npidle) == procs-1 {
2270-
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
2271-
// New work can appear from returning syscall/cgocall, network or timers.
2272-
// Neither of that submits to local run queues, so no point in stealing.
2273-
goto stop
2274-
}
2272+
ranTimer := false
22752273
// If number of spinning M's >= number of busy P's, block.
22762274
// This is necessary to prevent excessive CPU consumption
22772275
// when GOMAXPROCS>>1 but the program parallelism is low.
@@ -2288,11 +2286,48 @@ top:
22882286
goto top
22892287
}
22902288
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
2291-
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
2289+
p2 := allp[enum.position()]
2290+
if _p_ == p2 {
2291+
continue
2292+
}
2293+
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
22922294
return gp, false
22932295
}
2296+
2297+
// Consider stealing timers from p2.
2298+
// This call to checkTimers is the only place where
2299+
// we hold a lock on a different P's timers.
2300+
// Lock contention can be a problem here, so avoid
2301+
// grabbing the lock if p2 is running and not marked
2302+
// for preemption. If p2 is running and not being
2303+
// preempted we assume it will handle its own timers.
2304+
if i > 2 && shouldStealTimers(p2) {
2305+
tnow, w, ran := checkTimers(p2, now)
2306+
now = tnow
2307+
if w != 0 && (pollUntil == 0 || w < pollUntil) {
2308+
pollUntil = w
2309+
}
2310+
if ran {
2311+
// Running the timers may have
2312+
// made an arbitrary number of G's
2313+
// ready and added them to this P's
2314+
// local run queue. That invalidates
2315+
// the assumption of runqsteal
2316+
// that is always has room to add
2317+
// stolen G's. So check now if there
2318+
// is a local G to run.
2319+
if gp, inheritTime := runqget(_p_); gp != nil {
2320+
return gp, inheritTime
2321+
}
2322+
ranTimer = true
2323+
}
2324+
}
22942325
}
22952326
}
2327+
if ranTimer {
2328+
// Running a timer may have made some goroutine ready.
2329+
goto top
2330+
}
22962331

22972332
stop:
22982333

@@ -2309,6 +2344,12 @@ stop:
23092344
return gp, false
23102345
}
23112346

2347+
delta := int64(-1)
2348+
if pollUntil != 0 {
2349+
// checkTimers ensures that polluntil > now.
2350+
delta = pollUntil - now
2351+
}
2352+
23122353
// wasm only:
23132354
// If a callback returned and no other goroutine is awake,
23142355
// then pause execution until a callback was triggered.
@@ -2400,14 +2441,16 @@ stop:
24002441
}
24012442

24022443
// poll network
2403-
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
2444+
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
2445+
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
24042446
if _g_.m.p != 0 {
24052447
throw("findrunnable: netpoll with p")
24062448
}
24072449
if _g_.m.spinning {
24082450
throw("findrunnable: netpoll with spinning")
24092451
}
2410-
list := netpoll(-1) // block until new work is available
2452+
list := netpoll(delta) // block until new work is available
2453+
atomic.Store64(&sched.pollUntil, 0)
24112454
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
24122455
lock(&sched.lock)
24132456
_p_ = pidleget()
@@ -2431,6 +2474,11 @@ stop:
24312474
}
24322475
goto top
24332476
}
2477+
} else if pollUntil != 0 && netpollinited() {
2478+
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
2479+
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
2480+
netpollBreak()
2481+
}
24342482
}
24352483
stopm()
24362484
goto top
@@ -2457,6 +2505,22 @@ func pollWork() bool {
24572505
return false
24582506
}
24592507

2508+
// wakeNetPoller wakes up the thread sleeping in the network poller,
2509+
// if there is one, and if it isn't going to wake up anyhow before
2510+
// the when argument.
2511+
func wakeNetPoller(when int64) {
2512+
if atomic.Load64(&sched.lastpoll) == 0 {
2513+
// In findrunnable we ensure that when polling the pollUntil
2514+
// field is either zero or the time to which the current
2515+
// poll is expected to run. This can have a spurious wakeup
2516+
// but should never miss a wakeup.
2517+
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
2518+
if pollerPollUntil == 0 || pollerPollUntil > when {
2519+
netpollBreak()
2520+
}
2521+
}
2522+
}
2523+
24602524
func resetspinning() {
24612525
_g_ := getg()
24622526
if !_g_.m.spinning {
@@ -2525,10 +2589,20 @@ top:
25252589
gcstopm()
25262590
goto top
25272591
}
2528-
if _g_.m.p.ptr().runSafePointFn != 0 {
2592+
pp := _g_.m.p.ptr()
2593+
if pp.runSafePointFn != 0 {
25292594
runSafePointFn()
25302595
}
25312596

2597+
// Sanity check: if we are spinning, the run queue should be empty.
2598+
// Check this before calling checkTimers, as that might call
2599+
// goready to put a ready goroutine on the local run queue.
2600+
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
2601+
throw("schedule: spinning with local work")
2602+
}
2603+
2604+
checkTimers(pp, 0)
2605+
25322606
var gp *g
25332607
var inheritTime bool
25342608

@@ -2560,9 +2634,8 @@ top:
25602634
}
25612635
if gp == nil {
25622636
gp, inheritTime = runqget(_g_.m.p.ptr())
2563-
if gp != nil && _g_.m.spinning {
2564-
throw("schedule: spinning with local work")
2565-
}
2637+
// We can see gp != nil here even if the M is spinning,
2638+
// if checkTimers added a local goroutine via goready.
25662639
}
25672640
if gp == nil {
25682641
gp, inheritTime = findrunnable() // blocks until work is available
@@ -2623,6 +2696,60 @@ func dropg() {
26232696
setGNoWB(&_g_.m.curg, nil)
26242697
}
26252698

2699+
// checkTimers runs any timers for the P that are ready.
2700+
// If now is not 0 it is the current time.
2701+
// It returns the current time or 0 if it is not known,
2702+
// and the time when the next timer should run or 0 if there is no next timer,
2703+
// and reports whether it ran any timers.
2704+
// If the time when the next timer should run is not 0,
2705+
// it is always larger than the returned time.
2706+
// We pass now in and out to avoid extra calls of nanotime.
2707+
//go:yeswritebarrierrec
2708+
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
2709+
lock(&pp.timersLock)
2710+
2711+
adjusttimers(pp)
2712+
2713+
rnow = now
2714+
if len(pp.timers) > 0 {
2715+
if rnow == 0 {
2716+
rnow = nanotime()
2717+
}
2718+
for len(pp.timers) > 0 {
2719+
if tw := runtimer(pp, rnow); tw != 0 {
2720+
if tw > 0 {
2721+
pollUntil = tw
2722+
}
2723+
break
2724+
}
2725+
ran = true
2726+
}
2727+
}
2728+
2729+
unlock(&pp.timersLock)
2730+
2731+
return rnow, pollUntil, ran
2732+
}
2733+
2734+
// shouldStealTimers reports whether we should try stealing the timers from p2.
2735+
// We don't steal timers from a running P that is not marked for preemption,
2736+
// on the assumption that it will run its own timers. This reduces
2737+
// contention on the timers lock.
2738+
func shouldStealTimers(p2 *p) bool {
2739+
if p2.status != _Prunning {
2740+
return true
2741+
}
2742+
mp := p2.m.ptr()
2743+
if mp == nil || mp.locks > 0 {
2744+
return false
2745+
}
2746+
gp := mp.curg
2747+
if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt {
2748+
return false
2749+
}
2750+
return true
2751+
}
2752+
26262753
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
26272754
unlock((*mutex)(lock))
26282755
return true
@@ -4305,6 +4432,13 @@ func checkdead() {
43054432
return
43064433
}
43074434

4435+
// There are no goroutines running, so we can look at the P's.
4436+
for _, _p_ := range allp {
4437+
if len(_p_.timers) > 0 {
4438+
return
4439+
}
4440+
}
4441+
43084442
getg().m.throwing = -1 // do not dump full stacks
43094443
throw("all goroutines are asleep - deadlock!")
43104444
}
@@ -4392,6 +4526,12 @@ func sysmon() {
43924526
incidlelocked(1)
43934527
}
43944528
}
4529+
if timeSleepUntil() < now {
4530+
// There are timers that should have already run,
4531+
// perhaps because there is an unpreemptible P.
4532+
// Try to start an M to run them.
4533+
startm(nil, false)
4534+
}
43954535
// retake P's blocked in syscalls
43964536
// and preempt long running G's
43974537
if retake(now) != 0 {

src/runtime/runtime2.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -598,13 +598,23 @@ type p struct {
598598

599599
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
600600

601+
// Lock for timers. We normally access the timers while running
602+
// on this P, but the scheduler can also do it from a different P.
603+
timersLock mutex
604+
605+
// Actions to take at some time. This is used to implement the
606+
// standard library's time package.
607+
// Must hold timersLock to access.
608+
timers []*timer
609+
601610
pad cpu.CacheLinePad
602611
}
603612

604613
type schedt struct {
605614
// accessed atomically. keep at top to ensure alignment on 32-bit systems.
606-
goidgen uint64
607-
lastpoll uint64
615+
goidgen uint64
616+
lastpoll uint64 // time of last network poll, 0 if currently polling
617+
pollUntil uint64 // time to which current poll is sleeping
608618

609619
lock mutex
610620

src/runtime/time.go

+21
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) {
325325
}
326326
}
327327

328+
// adjusttimers looks through the timers in the current P's heap for
329+
// any timers that have been modified to run earlier, and puts them in
330+
// the correct place in the heap.
331+
// The caller must have locked the timers for pp.
332+
func adjusttimers(pp *p) {
333+
if len(pp.timers) == 0 {
334+
return
335+
}
336+
throw("adjusttimers: not yet implemented")
337+
}
338+
339+
// runtimer examines the first timer in timers. If it is ready based on now,
340+
// it runs the timer and removes or updates it.
341+
// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
342+
// when the first timer should run.
343+
// The caller must have locked the timers for pp.
344+
func runtimer(pp *p, now int64) int64 {
345+
throw("runtimer: not yet implemented")
346+
return -1
347+
}
348+
328349
func timejump() *g {
329350
if faketime == 0 {
330351
return nil

0 commit comments

Comments
 (0)