Skip to content

Commit aa474c5

Browse files
committed
runtime/sched: revise for go1.12
Update #3
1 parent a740361 commit aa474c5

36 files changed

+1473
-978
lines changed

content/11-pkg/syscall/syscall.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,11 @@ func reentersyscall(pc, sp uintptr) {
346346
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
347347
_g_.sysblocktraced = true
348348
_g_.m.mcache = nil
349-
_g_.m.p.ptr().m = 0
350-
atomic.Store(&_g_.m.p.ptr().status, _Psyscall)
349+
pp := _g_.m.p.ptr()
350+
pp.m = 0
351+
_g_.m.oldp.set(pp)
352+
_g_.m.p = 0
353+
atomic.Store(&pp.status, _Psyscall)
351354
if sched.gcwaiting != 0 {
352355
systemstack(entersyscall_gcwait)
353356
save(pc, sp)
@@ -362,7 +365,7 @@ TODO:
362365

363366
```go
364367
//go:nosplit
365-
func exitsyscallfast() bool {
368+
func exitsyscallfast(oldp *p) bool {
366369
_g_ := getg()
367370

368371
// Freezetheworld sets stopwait but does not retake P's.
@@ -375,6 +378,7 @@ func exitsyscallfast() bool {
375378
// Try to re-acquire the last P.
376379
if _g_.m.p != 0 && _g_.m.p.ptr().status == _Psyscall && atomic.Cas(&_g_.m.p.ptr().status, _Psyscall, _Prunning) {
377380
// There's a cpu for us, so we can run.
381+
wirep(oldp)
378382
exitsyscallfast_reacquired()
379383
return true
380384
}
@@ -427,8 +431,9 @@ func exitsyscall() {
427431
}
428432

429433
_g_.waitsince = 0
430-
oldp := _g_.m.p.ptr()
431-
if exitsyscallfast() {
434+
oldp := _g_.m.oldp.ptr()
435+
_g_.m.oldp = 0
436+
if exitsyscallfast(oldp) {
432437
if _g_.m.mcache == nil {
433438
throw("lost mcache")
434439
}
@@ -454,6 +459,12 @@ func exitsyscall() {
454459
_g_.stackguard0 = _g_.stack.lo + _StackGuard
455460
}
456461
_g_.throwsplit = false
462+
463+
if sched.disable.user && !schedEnabled(_g_) {
464+
// Scheduling of this goroutine is disabled.
465+
Gosched()
466+
}
467+
457468
return
458469
}
459470

content/3-main.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ func main() {
7777
}
7878
}()
7979

80-
// 记录程序的启动时间,必须在 runtime.init 之后调用
81-
// 因为 nanotime 在某些平台上依赖于 startNano。
80+
// 记录程序的启动时间
8281
runtimeInitTime = nanotime()
8382

8483
// 启动垃圾回收器后台操作
@@ -196,7 +195,7 @@ func runtime_init()
196195
// 有关可用的 cpu 功能的信息。
197196
// 在 runtime.cpuinit 中启动时设置。
198197
// 运行时之外的包不应使用这些包因为它们不是外部 api。
199-
// TODO: deprecate these; use internal/cpu directly.
198+
// 启动时在 asm_{386,amd64,amd64p32}.s 中设置
200199
processorVersionInfo uint32
201200
isIntel bool
202201
lfenceBeforeRdtsc bool

content/4-sched/basic.md

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ type m struct {
9797
caughtsig guintptr // goroutine 在 fatal signal 中运行
9898
p puintptr // attached p for executing go code (nil if not executing go code)
9999
nextp puintptr
100+
oldp puintptr // 执行系统调用之前绑定的 p
100101
id int64
101102
mallocing int32
102103
throwing int32
103104
preemptoff string // 如果不为空串 "",继续让当前 g 运行在该 M 上
104105
locks int32
105106
dying int32
106107
profilehz int32
107-
helpgc int32
108108
spinning bool // m 当前没有运行 work 且正处于寻找 work 的活跃状态
109109
blocked bool // m 阻塞在一个 note 上
110110
inwb bool // m 正在执行 write barrier
@@ -210,8 +210,10 @@ type p struct {
210210
runnext guintptr
211211

212212
// 有效的 G (状态 == Gdead)
213-
gfree *g
214-
gfreecnt int32
213+
gfree struct {
214+
gList
215+
n int32
216+
}
215217

216218
sudogcache []*sudog
217219
sudogbuf [128]*sudog
@@ -247,7 +249,7 @@ type p struct {
247249

248250
runSafePointFn uint32 // 如果为 1, 则在下一个 safe-point 运行 sched.safePointFn
249251

250-
pad [sys.CacheLineSize]byte // 无实际用处,只是用于防止 P 的 false sharing
252+
pad cpu.CacheLinePad
251253
}
252254
```
253255

@@ -404,15 +406,28 @@ type schedt struct {
404406
nmspinning uint32 // 见 proc.go 中关于 "工作线程 parking/unparking" 的注释.
405407

406408
// 全局 runnable G 队列
407-
runqhead guintptr
408-
runqtail guintptr
409+
runq gQueue
409410
runqsize int32
410411

412+
// disable 控制了选择性的禁止调度器
413+
//
414+
// 使用 schedEnableUser 来控制此这个
415+
//
416+
// disable 受到 sched.lock 保护
417+
disable struct {
418+
// 用户禁用用户 goroutine 的调度
419+
user bool
420+
runnable gQueue // 即将发生的 runable Gs
421+
n int32 // runable 的数量
422+
}
423+
411424
// 有效 dead G 的全局缓存.
412-
gflock mutex
413-
gfreeStack *g
414-
gfreeNoStack *g
415-
ngfree int32
425+
gFree struct {
426+
lock mutex
427+
stack gList // 包含栈的 Gs
428+
noStack gList // 没有栈的 Gs
429+
n int32
430+
}
416431

417432
// sudog 结构的集中缓存
418433
sudoglock mutex

content/4-sched/exec.md

Lines changed: 44 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func mstart() {
4343
mstart1()
4444

4545
// 退出线程
46-
if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" {
47-
// 由于 windows, solaris, darwin 和 plan9 总是系统分配的栈,在在 mstart 之前放进 _g_.stack 的
46+
if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
47+
// 由于 windows, solaris, darwin, aix 和 plan9 总是系统分配的栈,在在 mstart 之前放进 _g_.stack 的
4848
// 因此上面的逻辑还没有设置 osStack。
4949
osStack = true
5050
}
@@ -86,12 +86,8 @@ func mstart1() {
8686
fn()
8787
}
8888

89-
// GC startTheWorld 会检查 spinning M 是否少于并发标记需求
90-
// 新建 m,设置 m.helpgc = -1,加入闲置队列等待唤醒
91-
if _g_.m.helpgc != 0 {
92-
_g_.m.helpgc = 0
93-
stopm()
94-
} else if _g_.m != &m0 {
89+
// 如果当前 m 并非 m0,则要求绑定 p
90+
if _g_.m != &m0 {
9591
// 绑定 p
9692
acquirep(_g_.m.nextp.ptr())
9793
_g_.m.nextp = 0
@@ -137,27 +133,27 @@ m 与 p 的绑定过程只是简单的将 p 链表中的 p ,保存到 m 中的
137133
//go:yeswritebarrierrec
138134
func acquirep(_p_ *p) {
139135
// 此处不允许 write barrier
140-
acquirep1(_p_)
136+
wirep(_p_)
141137

142138
// 已经获取了 p,因此之后允许 write barrier
143-
_g_ := getg()
144-
145-
// 递交 mcache 给 m
146-
_g_.m.mcache = _p_.mcache
139+
//
140+
// 在 P 可以从一个潜在设置的 mcache 分配前执行偏好的 mcache flush
141+
_p_.mcache.prepareForSweep()
147142

148143
if trace.enabled {
149144
traceProcStart()
150145
}
151146
}
152-
// acquirep1 为 acquirep 的实际获取 p 的第一步。
147+
// wirep 为 acquirep 的实际获取 p 的第一步,它关联了当前的 M 到 P 上
153148
// 之所以进行拆分是因为我们可以为这个部分驳回 write barrier
154149
//go:nowritebarrierrec
155-
func acquirep1(_p_ *p) {
150+
//go:nosplit
151+
func wirep(_p_ *p) {
156152
_g_ := getg()
157153

158-
// 检查确实没有 p
154+
// 检查 确实没有 p
159155
if _g_.m.p != 0 || _g_.m.mcache != nil {
160-
throw("acquirep: already in go")
156+
throw("wirep: already in go")
161157
}
162158

163159
// 检查 m 是否正常,并检查要获取的 p 的状态
@@ -166,8 +162,8 @@ func acquirep1(_p_ *p) {
166162
if _p_.m != 0 {
167163
id = _p_.m.ptr().id
168164
}
169-
print("acquirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n")
170-
throw("acquirep: invalid p state")
165+
print("wirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n")
166+
throw("wirep: invalid p state")
171167
}
172168

173169
// 正式获取 p
@@ -207,7 +203,6 @@ func stopm() {
207203
throw("stopm spinning")
208204
}
209205

210-
retry:
211206
// 将 m 放回到 空闲列表中,因为我们马上就要 park 了
212207
lock(&sched.lock)
213208
mput(_g_.m)
@@ -219,17 +214,6 @@ retry:
219214
// 清除 unpark 的 note
220215
noteclear(&_g_.m.park)
221216

222-
// 如果需要 helpgc
223-
if _g_.m.helpgc != 0 {
224-
// helpgc() 会设置 _g_.m.p 与 _g_.m.mcache,因此我们会 acquire 一个 P 进行
225-
gchelper()
226-
// 撤销 helpgc() 的影响
227-
_g_.m.helpgc = 0
228-
_g_.m.mcache = nil
229-
_g_.m.p = 0
230-
goto retry
231-
}
232-
233217
// 此时已经被 unpark,说明有任务要执行
234218
// 立即 acquire P
235219
acquirep(_g_.m.nextp.ptr())
@@ -238,10 +222,7 @@ retry:
238222
```
239223

240224
它的流程也非常简单,将 m 放回至空闲列表中,而后使用 note 注册一个 park 通知,
241-
阻塞到它重新被 unpark;如果在阻塞结束后,恰好需要 helpgc,则会重新被阻塞。
242-
243-
至此,可以看出 helpgc 已经对 m 的 park/unpark 以及 mstart 产生影响,好在下一个版本中
244-
helpgc 的机制会被移除,我们留到后面的垃圾回收器中再详细讨论。
225+
阻塞到它重新被 unpark。
245226

246227
## 核心调度
247228

@@ -332,6 +313,23 @@ top:
332313
resetspinning()
333314
}
334315

316+
if sched.disable.user && !schedEnabled(gp) {
317+
// Scheduling of this goroutine is disabled. Put it on
318+
// the list of pending runnable goroutines for when we
319+
// re-enable user scheduling and look again.
320+
lock(&sched.lock)
321+
if schedEnabled(gp) {
322+
// Something re-enabled scheduling while we
323+
// were acquiring the lock.
324+
unlock(&sched.lock)
325+
} else {
326+
sched.disable.runnable.pushBack(gp)
327+
sched.disable.n++
328+
unlock(&sched.lock)
329+
goto top
330+
}
331+
}
332+
335333
if gp.lockedm != 0 {
336334
// 如果 g 需要 lock 到 m 上,则会将当前的 p
337335
// 给这个要 lock 的 g
@@ -616,7 +614,7 @@ func goexit0(gp *g) {
616614

617615
// 切换当前的 g 为 _Gdead
618616
casgstatus(gp, _Grunning, _Gdead)
619-
if isSystemGoroutine(gp) {
617+
if isSystemGoroutine(gp, false) {
620618
atomic.Xadd(&sched.ngsys, -1)
621619
}
622620

@@ -720,25 +718,14 @@ func globrunqget(_p_ *p, max int32) *g {
720718

721719
// 修改本地队列的剩余空间
722720
sched.runqsize -= n
723-
724-
// 如果已经确定要放满,那么队尾指针置空
725-
if sched.runqsize == 0 {
726-
sched.runqtail = 0
727-
}
728-
729721
// 拿到全局队列队头 g
730-
gp := sched.runqhead.ptr()
731-
732-
// 修改全局队列队头指针
733-
sched.runqhead = gp.schedlink
734-
722+
gp := sched.runq.pop()
735723
// 计数
736724
n--
737725

738726
// 继续取剩下的 n-1 个全局队列放入本地队列
739727
for ; n > 0; n-- {
740-
gp1 := sched.runqhead.ptr()
741-
sched.runqhead = gp1.schedlink
728+
gp1 := sched.runq.pop()
742729
runqput(_p_, gp1, false)
743730
}
744731
return gp
@@ -773,15 +760,15 @@ func runqget(_p_ *p) (gp *g, inheritTime bool) {
773760
// 没有 next
774761
for {
775762
// 本地队列是空,返回 nil
776-
h := atomic.Load(&_p_.runqhead) // load-acquire, 与其他消费者同步
763+
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, 与其他消费者同步
777764
t := _p_.runqtail
778765
if t == h {
779766
return nil, false
780767
}
781768

782769
// 从本地队列中以 cas 方式拿一个
783770
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
784-
if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, 提交消费
771+
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, 提交消费
785772
return gp, false
786773
}
787774
}
@@ -844,8 +831,8 @@ top:
844831
// 如果有任何类型的逻辑竞争与被阻塞的线程(例如它已经从 netpoll 返回,但尚未设置 lastpoll)
845832
// 该线程无论如何都将阻塞 netpoll。
846833
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
847-
if gp := netpoll(false); gp != nil { // 无阻塞
848-
// netpoll 返回 schedlink 连接的 goroutine 链表
834+
if list := netpoll(false); !list.empty() { // 无阻塞
835+
gp := list.pop()
849836
injectglist(gp.schedlink.ptr())
850837
casgstatus(gp, _Gwaiting, _Grunnable)
851838
if trace.enabled {
@@ -907,10 +894,10 @@ stop:
907894
}
908895

909896
// 仅限于 wasm
910-
// 检查一个 goroutine 是否在等待 WebAssembly 宿主机回调
911-
// 如果是,暂停执行直到触发回调
912-
if pauseSchedulerUntilCallback() {
913-
// 回调触发后,至少一个 goroutine 被唤醒
897+
// 如果一个回调返回后没有其他 goroutine 是苏醒的
898+
// 则暂停执行直到回调被触发。
899+
if beforeIdle() {
900+
// 至少一个 goroutine 被唤醒
914901
goto top
915902
}
916903

0 commit comments

Comments
 (0)