Skip to content

Commit 0c1c3ca

Browse files
committed
cmd/coordinator: start of a scheduler, not yet enabled
Updates golang/go#19178 Change-Id: I24aa368df01a85259b53d6cfb08de7ab3a80e4fe Reviewed-on: https://go-review.googlesource.com/132076 Reviewed-by: Dmitri Shuralyov <[email protected]>
1 parent d4c3f10 commit 0c1c3ca

File tree

6 files changed

+302
-3
lines changed

6 files changed

+302
-3
lines changed

cmd/coordinator/coordinator.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ var (
8585
processID = "P" + randHex(9)
8686
)
8787

88+
var sched = NewScheduler()
89+
8890
var Version string // set by linker -X
8991

9092
// devPause is a debug option to pause for 5 minutes after the build
@@ -1391,6 +1393,12 @@ type BuildletPool interface {
13911393
// and highPriorityOpt.
13921394
GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error)
13931395

1396+
// HasCapacity reports whether the buildlet pool has
1397+
// quota/capacity to create a buildlet of the provided host
1398+
// type. This should return as fast as possible and err on
1399+
// the side of returning false.
1400+
HasCapacity(hostType string) bool
1401+
13941402
String() string // TODO(bradfitz): more status stuff
13951403
}
13961404

@@ -1683,7 +1691,12 @@ func (st *buildStatus) build() error {
16831691

16841692
sp = st.CreateSpan("get_buildlet")
16851693
pool := st.buildletPool()
1686-
bc, err := pool.GetBuildlet(st.ctx, st.conf.HostType, st)
1694+
bc, err := sched.GetBuildlet(st.ctx, st, &SchedItem{
1695+
HostType: st.conf.HostType,
1696+
IsTry: st.trySet != nil,
1697+
Pool: pool,
1698+
BuilderRev: st.BuilderRev,
1699+
})
16871700
sp.Done(err)
16881701
if err != nil {
16891702
err = fmt.Errorf("failed to get a buildlet: %v", err)
@@ -1932,7 +1945,12 @@ func (st *buildStatus) crossCompileMakeAndSnapshot(config *crossCompileConfig) (
19321945
ctx, cancel := context.WithCancel(st.ctx)
19331946
defer cancel()
19341947
sp := st.CreateSpan("get_buildlet_cross")
1935-
kubeBC, err := kubePool.GetBuildlet(ctx, config.Buildlet, st)
1948+
kubeBC, err := sched.GetBuildlet(ctx, st, &SchedItem{
1949+
HostType: config.Buildlet,
1950+
IsTry: st.trySet != nil,
1951+
Pool: kubePool,
1952+
BuilderRev: st.BuilderRev,
1953+
})
19361954
sp.Done(err)
19371955
if err != nil {
19381956
err = fmt.Errorf("cross-compile and snapshot: failed to get a buildlet: %v", err)

cmd/coordinator/gce.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,13 +411,32 @@ func (p *gceBuildletPool) awaitVMCountQuota(ctx context.Context, numCPU int) err
411411
}
412412
}
413413

414+
func (p *gceBuildletPool) HasCapacity(hostType string) bool {
415+
hconf, ok := dashboard.Hosts[hostType]
416+
if !ok {
417+
return false
418+
}
419+
numCPU := hconf.GCENumCPU()
420+
p.mu.Lock()
421+
defer p.mu.Unlock()
422+
return p.haveQuotaLocked(numCPU)
423+
}
424+
425+
// haveQuotaLocked reports whether the current GCE quota permits
426+
// starting numCPU more CPUs.
427+
//
428+
// precondition: p.mu must be held.
429+
func (p *gceBuildletPool) haveQuotaLocked(numCPU int) bool {
430+
return p.cpuLeft >= numCPU && p.instLeft >= 1 && len(p.inst) < maxInstances && p.addrUsage < maxInstances
431+
}
432+
414433
func (p *gceBuildletPool) tryAllocateQuota(numCPU int) bool {
415434
p.mu.Lock()
416435
defer p.mu.Unlock()
417436
if p.disabled {
418437
return false
419438
}
420-
if p.cpuLeft >= numCPU && p.instLeft >= 1 && len(p.inst) < maxInstances && p.addrUsage < maxInstances {
439+
if p.haveQuotaLocked(numCPU) {
421440
p.cpuUsage += numCPU
422441
p.cpuLeft -= numCPU
423442
p.instLeft--

cmd/coordinator/kube.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,16 @@ func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
205205

206206
}
207207

208+
func (p *kubeBuildletPool) HasCapacity(hostType string) bool {
209+
// TODO: implement. But for now we don't care because we only
210+
// use the kubePool for the cross-compiled builds and we have
211+
// very few hostTypes for those, and only one (ARM) that's
212+
// used day-to-day. So it's okay if we lie here and always try
213+
// to create buildlets. The scheduler will still give created
214+
// buildlets to the highest priority waiter.
215+
return true
216+
}
217+
208218
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
209219
hconf, ok := dashboard.Hosts[hostType]
210220
if !ok || !hconf.IsContainer() {

cmd/coordinator/remote_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ func (tp *TestBuildletPool) Remove(hostType string) {
5656

5757
func (tp *TestBuildletPool) String() string { return "test" }
5858

59+
func (tp *TestBuildletPool) HasCapacity(string) bool { return true }
60+
5961
var testPool = &TestBuildletPool{}
6062

6163
func TestHandleBuildletCreateWrongMethod(t *testing.T) {

cmd/coordinator/reverse.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,21 @@ func (p *reverseBuildletPool) updateWaiterCounter(hostType string, delta int) {
258258
p.waiters[hostType] += delta
259259
}
260260

261+
func (p *reverseBuildletPool) HasCapacity(hostType string) bool {
262+
p.mu.Lock()
263+
defer p.mu.Unlock()
264+
for _, b := range p.buildlets {
265+
if b.hostType != hostType {
266+
continue
267+
}
268+
if b.inUse {
269+
continue
270+
}
271+
return true
272+
}
273+
return false
274+
}
275+
261276
func (p *reverseBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
262277
p.updateWaiterCounter(hostType, 1)
263278
defer p.updateWaiterCounter(hostType, -1)

cmd/coordinator/sched.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Copyright 2018 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package main
6+
7+
import (
8+
"context"
9+
"sort"
10+
"sync"
11+
"time"
12+
13+
"golang.org/x/build/buildlet"
14+
"golang.org/x/build/internal/buildgo"
15+
)
16+
17+
// useScheduler controls whether we actually use the scheduler. This
18+
// is temporarily false during development. Once we're happy with it
19+
// we'll delete this const.
20+
//
21+
// If false, any GetBuildlet call to the schedule delegates directly
22+
// to the BuildletPool's GetBuildlet and we make a bunch of callers
23+
// fight over a mutex and a random one wins, like we used to do it.
24+
const useScheduler = false
25+
26+
// The Scheduler prioritizes access to buidlets. It accepts requests
27+
// for buildlets, starts the creation of buildlets from BuildletPools,
28+
// and prioritizes which callers gets them first when they're ready.
29+
type Scheduler struct {
30+
mu sync.Mutex
31+
paused bool
32+
waiting []*SchedWaiter // index 0 is highest priority
33+
34+
readyc chan ReadyBuildlet
35+
36+
launching map[*SchedWaiter]bool
37+
}
38+
39+
// A ReadyBuildlet is a buildlet that was just created and is up and
40+
// is ready to be assigned to a caller based on priority.
41+
type ReadyBuildlet struct {
42+
Pool BuildletPool
43+
HostType string
44+
Client *buildlet.Client
45+
}
46+
47+
// NewScheduler returns a new scheduler.
48+
func NewScheduler() *Scheduler {
49+
s := &Scheduler{
50+
readyc: make(chan ReadyBuildlet, 8),
51+
}
52+
if useScheduler {
53+
go s.assignLoop()
54+
}
55+
return s
56+
}
57+
58+
// assignLoop waits for the successful creation of buildlets and
59+
// assigns them the highest priority waiter.
60+
//
61+
// TODO: probably also need to deal with buildlet creation failures to
62+
// at least re-nudge the scheduler to kick off new buildlet creations
63+
// if still necessary.
64+
func (s *Scheduler) assignLoop() {
65+
for {
66+
rb := <-s.readyc
67+
bestWaiter, ok := s.matchWaiter(rb)
68+
if !ok {
69+
go rb.Client.Close()
70+
continue
71+
}
72+
select {
73+
case bestWaiter.Res <- rb.Client:
74+
// Normal happy case. Something gets its buildlet.
75+
default:
76+
// Wait went away. (context timeout?)
77+
go rb.Client.Close()
78+
}
79+
}
80+
}
81+
82+
// pause pauses the scheduler.
83+
func (s *Scheduler) pause(v bool) {
84+
if !useScheduler {
85+
return
86+
}
87+
s.mu.Lock()
88+
s.paused = true
89+
s.mu.Unlock()
90+
}
91+
92+
// unpause unpauses the scheduler and runs schedule.
93+
func (s *Scheduler) unpause() {
94+
if !useScheduler {
95+
return
96+
}
97+
s.mu.Lock()
98+
s.paused = false
99+
s.mu.Unlock()
100+
s.schedule()
101+
}
102+
103+
// schedule starts creating buildlets if there's demand.
104+
//
105+
// It acquires s.mu so should run as quickly as possible.
106+
func (s *Scheduler) schedule() {
107+
s.mu.Lock()
108+
defer s.mu.Unlock()
109+
if s.paused {
110+
return
111+
}
112+
poolExhausted := map[BuildletPool]bool{}
113+
for _, sw := range s.waiting {
114+
si := sw.si
115+
if poolExhausted[si.Pool] || !si.Pool.HasCapacity(si.HostType) {
116+
poolExhausted[si.Pool] = true
117+
continue
118+
}
119+
// ... TODO kick things off, using a goroutine per
120+
// slow buildlet creation call. If the creation fails,
121+
// the goroutine can call back into the scheduler to
122+
// inform it of that.
123+
}
124+
}
125+
126+
// matchWaiter returns (and removes from the waiting queue) the highest priority SchedWaiter
127+
// that matches the provided ReadyBuildlet.
128+
func (s *Scheduler) matchWaiter(rb ReadyBuildlet) (sw *SchedWaiter, ok bool) {
129+
s.mu.Lock()
130+
defer s.mu.Unlock()
131+
for i, sw := range s.waiting {
132+
si := sw.si
133+
if si.Pool == rb.Pool && si.HostType == rb.HostType {
134+
copy(s.waiting[i:], s.waiting[i+1:])
135+
s.waiting[len(s.waiting)-1] = nil
136+
s.waiting = s.waiting[:len(s.waiting)-1]
137+
return sw, true
138+
}
139+
}
140+
return nil, false
141+
}
142+
143+
func (s *Scheduler) removeWaiter(remove *SchedWaiter) {
144+
s.mu.Lock()
145+
defer s.mu.Unlock()
146+
newWaiting := s.waiting[:0]
147+
for _, sw := range s.waiting {
148+
if sw != remove {
149+
newWaiting = append(newWaiting, sw)
150+
}
151+
}
152+
s.waiting = newWaiting
153+
}
154+
155+
func (s *Scheduler) enqueueWaiter(si *SchedItem) *SchedWaiter {
156+
defer s.schedule()
157+
158+
w := &SchedWaiter{
159+
s: s,
160+
si: si,
161+
Res: make(chan interface{}), // NOT buffered
162+
}
163+
164+
s.mu.Lock()
165+
defer s.mu.Unlock()
166+
s.waiting = append(s.waiting, w)
167+
sort.Slice(s.waiting, func(i, j int) bool {
168+
ia, ib := s.waiting[i].si, s.waiting[j].si
169+
return schedLess(ia, ib)
170+
})
171+
return w
172+
}
173+
174+
// schedLess reports whether scheduled item ia is "less" (more
175+
// important) than scheduled item ib.
176+
func schedLess(ia, ib *SchedItem) bool {
177+
// TryBots are more important.
178+
if ia.IsTry != ib.IsTry {
179+
return ia.IsTry
180+
}
181+
return ia.commitTime.Before(ib.commitTime)
182+
}
183+
184+
type SchedItem struct {
185+
buildgo.BuilderRev
186+
Pool BuildletPool
187+
HostType string
188+
IsTry bool
189+
190+
// We set in GetBuildlet:
191+
commitTime time.Time
192+
tryFor string // which user. (user with 1 trybot >> user with 50 trybots)
193+
}
194+
195+
type SchedWaiter struct {
196+
s *Scheduler
197+
si *SchedItem
198+
199+
// Res is the result channel, containing either a
200+
// *buildlet.Client or an error. It is read by GetBuildlet and
201+
// written by assignBuildlet.
202+
Res chan interface{}
203+
}
204+
205+
func (sw *SchedWaiter) cancel() {
206+
sw.s.removeWaiter(sw)
207+
}
208+
209+
// GetBuildlet requests a buildlet with the parameters described in si.
210+
func (s *Scheduler) GetBuildlet(ctx context.Context, lg logger, si *SchedItem) (*buildlet.Client, error) {
211+
if !useScheduler {
212+
return si.Pool.GetBuildlet(ctx, si.HostType, lg)
213+
}
214+
215+
// TODO: once we remove the useScheduler const, we can
216+
// probably remove the "lg" logger parameter. We don't need to
217+
// log anything during the buildlet creation process anymore
218+
// because we don't which build it'll be for. So all we can
219+
// say in the logs for is "Asking for a buildlet" and "Got
220+
// one", which the caller already does. I think. Verify that.
221+
222+
// TODO: populate si unexported fields
223+
224+
sw := s.enqueueWaiter(si)
225+
select {
226+
case v := <-sw.Res:
227+
if bc, ok := v.(*buildlet.Client); ok {
228+
return bc, nil
229+
}
230+
return nil, v.(error)
231+
case <-ctx.Done():
232+
sw.cancel()
233+
return nil, ctx.Err()
234+
}
235+
}

0 commit comments

Comments
 (0)