Skip to content

Commit b0ce0eb

Browse files
committed
internal, internal/coordinator/pool: add a shared polling function
This adds a polling function that is tested and can be reused by callers in the x/build repository. This was suggested in the review of CL 247907. The polling function replaces the pollers used in the EC2 buildlet pool. For golang/go#36841 Change-Id: I120ceb83e2740f0bdc5ee2423e0edd3ad727bf4b Reviewed-on: https://go-review.googlesource.com/c/build/+/255358 Trust: Carlos Amedee <[email protected]> Run-TryBot: Carlos Amedee <[email protected]> TryBot-Result: Go Bot <[email protected]> Reviewed-by: Bryan C. Mills <[email protected]> Reviewed-by: Alexander Rakoczy <[email protected]>
1 parent da0cd80 commit b0ce0eb

File tree

4 files changed

+109
-71
lines changed

4 files changed

+109
-71
lines changed

internal/coordinator/pool/ec2.go

Lines changed: 38 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"golang.org/x/build/buildenv"
2121
"golang.org/x/build/buildlet"
2222
"golang.org/x/build/dashboard"
23+
"golang.org/x/build/internal"
2324
"golang.org/x/build/internal/cloud"
2425
"golang.org/x/build/internal/spanlog"
2526
)
@@ -72,10 +73,6 @@ func WithVMDeleteTimeout(timeout time.Duration) EC2Opt {
7273

7374
// EC2Buildlet manages a pool of AWS EC2 buildlets.
7475
type EC2Buildlet struct {
75-
once sync.Once
76-
// done channel closing will signal the pollers to discontinue polling
77-
done chan struct{}
78-
7976
// awsClient is the client used to interact with AWS services.
8077
awsClient awsClient
8178
// buildEnv contains the build enviornment settings.
@@ -93,8 +90,12 @@ type EC2Buildlet struct {
9390
isRemoteBuildlet IsRemoteBuildletFunc
9491
// ledger tracks instances and their resource allocations.
9592
ledger *ledger
93+
// cancelPoll will signal to the pollers to discontinue polling.
94+
cancelPoll context.CancelFunc
9695
// vmDeleteTimeout contains the timeout used to determine if a VM should be deleted.
9796
vmDeleteTimeout time.Duration
97+
// pollWait waits for all pollers to terminate polling.
98+
pollWait sync.WaitGroup
9899
}
99100

100101
// ec2BuildletClient represents an EC2 buildlet client in the buildlet package.
@@ -111,11 +112,12 @@ func NewEC2Buildlet(client *cloud.AWSClient, buildEnv *buildenv.Environment, hos
111112
if fn == nil {
112113
return nil, errors.New("remote buildlet check function is not set")
113114
}
115+
ctx, cancel := context.WithCancel(context.Background())
114116
b := &EC2Buildlet{
115117
awsClient: client,
116118
buildEnv: buildEnv,
117119
buildletClient: buildlet.NewEC2Client(client),
118-
done: make(chan struct{}),
120+
cancelPoll: cancel,
119121
hosts: hosts,
120122
isRemoteBuildlet: fn,
121123
ledger: newLedger(),
@@ -124,14 +126,34 @@ func NewEC2Buildlet(client *cloud.AWSClient, buildEnv *buildenv.Environment, hos
124126
for _, opt := range opts {
125127
opt(b)
126128
}
127-
if err := b.retrieveAndSetQuota(); err != nil {
129+
if err := b.retrieveAndSetQuota(ctx); err != nil {
128130
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
129131
}
130132
if err := b.retrieveAndSetInstanceTypes(); err != nil {
131133
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
132134
}
133-
go b.cleanupUnusedVMs()
134-
go b.pollQuota()
135+
136+
b.pollWait.Add(1)
137+
// polls for the EC2 quota data and sets the quota data in
138+
// the ledger. When the context has been cancelled, the polling will stop.
139+
go func() {
140+
go internal.PeriodicallyDo(ctx, time.Hour, func(ctx context.Context, _ time.Time) {
141+
log.Printf("retrieveing EC2 quota")
142+
_ = b.retrieveAndSetQuota(ctx)
143+
})
144+
b.pollWait.Done()
145+
}()
146+
147+
b.pollWait.Add(1)
148+
// poll queries for VMs which are not tracked in the ledger and
149+
// deletes them. When the context has been cancelled, the polling will stop.
150+
go func() {
151+
go internal.PeriodicallyDo(ctx, 2*time.Minute, func(ctx context.Context, _ time.Time) {
152+
log.Printf("cleaning up unused EC2 instances")
153+
b.destroyUntrackedInstances(ctx)
154+
})
155+
b.pollWait.Done()
156+
}()
135157

136158
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
137159
// required by the main package.
@@ -256,45 +278,24 @@ func (eb *EC2Buildlet) buildletDone(instName string) {
256278

257279
// Close stops the pollers used by the EC2Buildlet pool from running.
258280
func (eb *EC2Buildlet) Close() {
259-
eb.once.Do(func() {
260-
close(eb.done)
261-
})
281+
eb.cancelPoll()
282+
eb.pollWait.Wait()
262283
}
263284

264285
// retrieveAndSetQuota queries EC2 for account relevant quotas and sets the quota in the ledger.
265-
func (eb *EC2Buildlet) retrieveAndSetQuota() error {
266-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
286+
func (eb *EC2Buildlet) retrieveAndSetQuota(ctx context.Context) error {
287+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
267288
defer cancel()
268289

269290
cpuQuota, err := eb.awsClient.Quota(ctx, cloud.QuotaServiceEC2, cloud.QuotaCodeCPUOnDemand)
270291
if err != nil {
271-
log.Printf("unable to query for cpu quota: %s", err)
292+
log.Printf("unable to query for EC2 cpu quota: %s", err)
272293
return err
273294
}
274295
eb.ledger.SetCPULimit(cpuQuota)
275296
return nil
276297
}
277298

278-
// pollQuota repeatedly polls for the EC2 quota data and sets the quota data in
279-
// the ledger. It stops polling once the done channel has been closed.
280-
func (eb *EC2Buildlet) pollQuota() {
281-
t := time.NewTicker(time.Hour)
282-
defer t.Stop()
283-
for {
284-
select {
285-
case <-t.C:
286-
err := eb.retrieveAndSetQuota()
287-
if err != nil {
288-
log.Printf("polling for EC2 quota failed: %s", err)
289-
}
290-
case <-eb.done:
291-
// closing the done channel signals the end of the polling loop.
292-
log.Printf("stopped polling for EC2 quota")
293-
return
294-
}
295-
}
296-
}
297-
298299
// retrieveAndSetInstanceTypes retrieves the ARM64 instance types from the EC2
299300
// service and sets them in the ledger.
300301
func (eb *EC2Buildlet) retrieveAndSetInstanceTypes() error {
@@ -303,35 +304,17 @@ func (eb *EC2Buildlet) retrieveAndSetInstanceTypes() error {
303304

304305
its, err := eb.awsClient.InstanceTypesARM(ctx)
305306
if err != nil {
306-
return fmt.Errorf("unable to retrieve instance types: %w", err)
307+
return fmt.Errorf("unable to retrieve EC2 instance types: %w", err)
307308
}
308309
eb.ledger.UpdateInstanceTypes(its)
309310
log.Printf("ec2 buildlet pool instance types updated")
310311
return nil
311312
}
312313

313-
// cleanupUnusedVMs periodically queries for VMs which are not tracked in the ledger and
314-
// deletes them. If the done channel has been closed then the polling will exit.
315-
func (eb *EC2Buildlet) cleanupUnusedVMs() {
316-
t := time.NewTicker(2 * time.Minute)
317-
defer t.Stop()
318-
for {
319-
select {
320-
case <-t.C:
321-
log.Printf("cleaning up unused EC2 instances")
322-
eb.destroyUntrackedInstances()
323-
case <-eb.done:
324-
// closing the done channel signals the end of the polling loop.
325-
log.Printf("stopped cleaning up unused EC2 instances")
326-
return
327-
}
328-
}
329-
}
330-
331314
// destroyUntrackedInstances searches for VMs which exist but are not being tracked in the
332315
// ledger and deletes them.
333-
func (eb *EC2Buildlet) destroyUntrackedInstances() {
334-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
316+
func (eb *EC2Buildlet) destroyUntrackedInstances(ctx context.Context) {
317+
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
335318
defer cancel()
336319

337320
insts, err := eb.awsClient.RunningInstances(ctx)

internal/coordinator/pool/ec2_test.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -462,23 +462,13 @@ func TestEC2BuildletbuildletDone(t *testing.T) {
462462
}
463463

464464
func TestEC2BuildletClose(t *testing.T) {
465+
cancelled := false
465466
pool := &EC2Buildlet{
466-
done: make(chan struct{}),
467+
cancelPoll: func() { cancelled = true },
467468
}
468-
defer func() {
469-
if r := recover(); r != nil {
470-
t.Errorf("EC2Buildlet.Close() paniced=%s", r)
471-
}
472-
}()
473-
pool.Close()
474469
pool.Close()
475-
select {
476-
case _, ok := <-pool.done:
477-
if ok {
478-
t.Error("EC2Buildlet.done not closed; read from channel")
479-
}
480-
default:
481-
t.Error("EC2Buildlet.done not closed: waiting for read")
470+
if !cancelled {
471+
t.Error("EC2Buildlet.pollCancel not called")
482472
}
483473
}
484474

@@ -487,7 +477,7 @@ func TestEC2BuildletRetrieveAndSetQuota(t *testing.T) {
487477
awsClient: cloud.NewFakeAWSClient(),
488478
ledger: newLedger(),
489479
}
490-
err := pool.retrieveAndSetQuota()
480+
err := pool.retrieveAndSetQuota(context.Background())
491481
if err != nil {
492482
t.Errorf("EC2Buildlet.retrieveAndSetQuota(ctx) = %s; want nil", err)
493483
}
@@ -556,7 +546,7 @@ func TestEC2BuildeletDestroyUntrackedInstances(t *testing.T) {
556546
},
557547
},
558548
}
559-
pool.destroyUntrackedInstances()
549+
pool.destroyUntrackedInstances(context.Background())
560550
wantInstCount := 3
561551
gotInsts, err := awsC.RunningInstances(context.Background())
562552
if err != nil || len(gotInsts) != wantInstCount {

internal/internal.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2020 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package internal
6+
7+
import (
8+
"context"
9+
"time"
10+
)
11+
12+
// PeriodicallyDo calls f every period until the provided context is cancelled.
13+
func PeriodicallyDo(ctx context.Context, period time.Duration, f func(context.Context, time.Time)) {
14+
ticker := time.NewTicker(period)
15+
defer ticker.Stop()
16+
for {
17+
select {
18+
case <-ctx.Done():
19+
return
20+
case now := <-ticker.C:
21+
f(ctx, now)
22+
}
23+
}
24+
}

internal/internal_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2020 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package internal
6+
7+
import (
8+
"context"
9+
"testing"
10+
"time"
11+
)
12+
13+
func TestPeriodicallyDo(t *testing.T) {
14+
ctx, cancel := context.WithCancel(context.Background())
15+
defer cancel()
16+
17+
didWork := make(chan time.Time, 2)
18+
done := make(chan interface{})
19+
go func() {
20+
PeriodicallyDo(ctx, time.Millisecond, func(ctx context.Context, t time.Time) {
21+
select {
22+
case didWork <- t:
23+
case <-ctx.Done():
24+
}
25+
})
26+
close(done)
27+
}()
28+
select {
29+
case <-time.After(5 * time.Second):
30+
t.Error("PeriodicallyDo() never called f, wanted at least one call")
31+
case <-didWork:
32+
// PeriodicallyDo called f successfully.
33+
}
34+
select {
35+
case <-done:
36+
t.Errorf("PeriodicallyDo() finished early, wanted it to still be looping")
37+
case <-didWork:
38+
cancel()
39+
}
40+
<-done
41+
}

0 commit comments

Comments
 (0)