Skip to content

Commit da19ce0

Browse files
committed
cmd/coordinator, cmd/buildstats: start of using logs to schedule tests
We've been logging event spans to datastore for years, but I'd lost this CL and just found it back. This does two things: syncs the datastore logs to BigQuery, and starts to use the from-BigQuery timing info in the coordinator for scheduling sharded tests. The plan was to have a job occasionally do a BigQuery query and write out the results to a CSV file on GCS. The code to read that CSV file is in this CL, but that code path is disabled, so this CL should be a no-op. A future change will periodically do the query and write the CSV file, and then we can start using the new code path and remove the static map of expected test durations. Updates golang/go#12669 Change-Id: Ibe5b41d6a3009c2ade8ab728fa1cad646788e621 Reviewed-on: https://go-review.googlesource.com/30716 Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 02f4f0b commit da19ce0

File tree

2 files changed

+237
-15
lines changed

2 files changed

+237
-15
lines changed

cmd/buildstats/buildstats.go

Lines changed: 138 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,36 @@ import (
1717

1818
"cloud.google.com/go/bigquery"
1919
"cloud.google.com/go/datastore"
20+
"golang.org/x/build/buildenv"
2021
"golang.org/x/build/types"
22+
"google.golang.org/api/googleapi"
2123
"google.golang.org/api/iterator"
2224
)
2325

2426
var (
2527
doSync = flag.Bool("sync", false, "sync build stats data from Datastore to BigQuery")
2628
)
2729

30+
var env *buildenv.Environment
31+
2832
func main() {
33+
buildenv.RegisterFlags()
2934
flag.Parse()
35+
36+
env = buildenv.FromFlags()
37+
3038
ctx := context.Background()
3139
if *doSync {
32-
sync(ctx)
40+
syncBuilds(ctx)
41+
syncSpans(ctx)
3342
} else {
3443
log.Fatalf("the buildstats command doesn't yet do anything except the --sync mode")
3544
}
3645

3746
}
3847

39-
func sync(ctx context.Context) {
40-
bq, err := bigquery.NewClient(ctx, "symbolic-datum-552")
48+
func syncBuilds(ctx context.Context) {
49+
bq, err := bigquery.NewClient(ctx, env.ProjectName)
4150
if err != nil {
4251
log.Fatal(err)
4352
}
@@ -73,7 +82,7 @@ func sync(ctx context.Context) {
7382
}
7483
log.Printf("Max is %v (%v)", t, t.Location())
7584

76-
ds, err := datastore.NewClient(ctx, "symbolic-datum-552")
85+
ds, err := datastore.NewClient(ctx, env.ProjectName)
7786
if err != nil {
7887
log.Fatalf("datastore.NewClient: %v", err)
7988
}
@@ -99,7 +108,6 @@ func sync(ctx context.Context) {
99108
if s.EndTime.IsZero() {
100109
log.Fatalf("got zero endtime")
101110
}
102-
//log.Printf("need to add %s: %+v", key.Encode(), s)
103111

104112
var row []bigquery.Value
105113
var putSchema bigquery.Schema
@@ -132,5 +140,130 @@ func sync(ctx context.Context) {
132140
os.Exit(1)
133141
}
134142
}
143+
}
144+
145+
func syncSpans(ctx context.Context) {
146+
bq, err := bigquery.NewClient(ctx, env.ProjectName)
147+
if err != nil {
148+
log.Fatal(err)
149+
}
150+
table := bq.Dataset("builds").Table("Spans")
151+
meta, err := table.Metadata(ctx)
152+
if ae, ok := err.(*googleapi.Error); ok && ae.Code == 404 {
153+
log.Printf("Creating table Spans...")
154+
err = table.Create(ctx)
155+
if err == nil {
156+
meta, err = table.Metadata(ctx)
157+
}
158+
}
159+
if err != nil {
160+
log.Fatalf("Metadata: %#v", err)
161+
}
162+
log.Printf("Metadata: %#v", meta)
163+
schema := meta.Schema
164+
if len(schema) == 0 {
165+
log.Printf("EMPTY SCHEMA")
166+
schema, err = bigquery.InferSchema(types.SpanRecord{})
167+
if err != nil {
168+
log.Fatalf("InferSchema: %v", err)
169+
}
170+
meta, err := table.Update(ctx, bigquery.TableMetadataToUpdate{Schema: schema})
171+
if err != nil {
172+
log.Fatalf("table.Update schema: %v", err)
173+
}
174+
schema = meta.Schema
175+
}
176+
for i, fs := range schema {
177+
log.Printf(" schema[%v]: %+v", i, fs)
178+
for j, fs := range fs.Schema {
179+
log.Printf(" .. schema[%v]: %+v", j, fs)
180+
}
181+
}
182+
183+
q := bq.Query("SELECT MAX(EndTime) FROM [symbolic-datum-552:builds.Spans]")
184+
it, err := q.Read(ctx)
185+
if err != nil {
186+
log.Fatalf("Read: %v", err)
187+
}
188+
189+
var since time.Time
190+
var values []bigquery.Value
191+
if err := it.Next(&values); err != nil {
192+
if err == iterator.Done {
193+
log.Fatalf("Expected at least one row fro MAX(EndTime) query; got none.")
194+
}
195+
log.Fatalf("Next: %v", err)
196+
}
197+
switch t := values[0].(type) {
198+
case nil:
199+
// NULL. No rows.
200+
log.Printf("starting from the beginning...")
201+
case time.Time:
202+
since = values[0].(time.Time)
203+
default:
204+
log.Fatalf("MAX(EndType) = %T: want nil or time.Time", t)
205+
}
206+
if since.IsZero() {
207+
since = time.Unix(1, 0) // arbitrary
208+
}
209+
210+
ds, err := datastore.NewClient(ctx, env.ProjectName)
211+
if err != nil {
212+
log.Fatalf("datastore.NewClient: %v", err)
213+
}
214+
215+
up := table.Uploader()
216+
217+
log.Printf("Max: %v", since)
218+
dsit := ds.Run(ctx, datastore.NewQuery("Span").Filter("EndTime >", since).Order("EndTime"))
219+
var maxPut time.Time
220+
for {
221+
n := 0
222+
var rows []*bigquery.ValuesSaver
223+
for {
224+
var s types.SpanRecord
225+
key, err := dsit.Next(&s)
226+
if err == iterator.Done {
227+
break
228+
}
229+
n++
230+
if err != nil {
231+
log.Fatal(err)
232+
}
233+
if s.EndTime.IsZero() {
234+
log.Fatalf("got zero endtime")
235+
}
236+
//log.Printf("need to add %s: %+v", key.Encode(), s)
135237

238+
var row []bigquery.Value
239+
var putSchema bigquery.Schema
240+
rv := reflect.ValueOf(s)
241+
for _, fs := range meta.Schema {
242+
if fs.Name[0] == '_' {
243+
continue
244+
}
245+
putSchema = append(putSchema, fs)
246+
row = append(row, rv.FieldByName(fs.Name).Interface())
247+
maxPut = s.EndTime
248+
}
249+
250+
rows = append(rows, &bigquery.ValuesSaver{
251+
Schema: putSchema,
252+
InsertID: key.Encode(),
253+
Row: row,
254+
})
255+
if len(rows) == 1000 {
256+
break
257+
}
258+
}
259+
if n == 0 {
260+
log.Printf("Done.")
261+
return
262+
}
263+
err = up.Put(ctx, rows)
264+
log.Printf("Put %d rows, up to %v. error = %v", len(rows), maxPut, err)
265+
if err != nil {
266+
os.Exit(1)
267+
}
268+
}
136269
}

cmd/coordinator/coordinator.go

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"crypto/rand"
2020
"crypto/sha1"
2121
"crypto/tls"
22+
"encoding/csv"
2223
"errors"
2324
"flag"
2425
"fmt"
@@ -36,6 +37,7 @@ import (
3637
"path"
3738
"runtime"
3839
"sort"
40+
"strconv"
3941
"strings"
4042
"sync"
4143
"sync/atomic"
@@ -2028,7 +2030,7 @@ func (st *buildStatus) newTestSet(names []string, benchmarks []*benchmarkItem) *
20282030
set.items = append(set.items, &testItem{
20292031
set: set,
20302032
name: name,
2031-
duration: testDuration(name),
2033+
duration: testDuration(st.builderRev.name, name),
20322034
take: make(chan token, 1),
20332035
done: make(chan token),
20342036
})
@@ -2039,15 +2041,15 @@ func (st *buildStatus) newTestSet(names []string, benchmarks []*benchmarkItem) *
20392041
set: set,
20402042
name: name,
20412043
bench: bench,
2042-
duration: testDuration(name),
2044+
duration: testDuration(st.builderRev.name, name),
20432045
take: make(chan token, 1),
20442046
done: make(chan token),
20452047
})
20462048
}
20472049
return set
20482050
}
20492051

2050-
func partitionGoTests(tests []string) (sets [][]string) {
2052+
func partitionGoTests(builderName string, tests []string) (sets [][]string) {
20512053
var srcTests []string
20522054
var cmdTests []string
20532055
for _, name := range tests {
@@ -2073,19 +2075,99 @@ func partitionGoTests(tests []string) (sets [][]string) {
20732075
curDur = 0
20742076
}
20752077
}
2076-
for _, name := range goTests {
2077-
d := testDuration(name) - minGoTestSpeed // subtract 'go' tool overhead
2078+
for _, testName := range goTests {
2079+
d := testDuration(builderName, testName)
20782080
if curDur+d > sizeThres {
20792081
flush() // no-op if empty
20802082
}
2081-
curSet = append(curSet, name)
2083+
curSet = append(curSet, testName)
20822084
curDur += d
20832085
}
20842086

20852087
flush()
20862088
return
20872089
}
20882090

2091+
func secondsToDuration(sec float64) time.Duration {
2092+
return time.Duration(float64(sec) * float64(time.Second))
2093+
}
2094+
2095+
type testDurationMap map[string]map[string]time.Duration // builder name => test name => avg
2096+
2097+
var (
2098+
testDurations atomic.Value // of testDurationMap
2099+
testDurationsMu sync.Mutex // held while updating testDurations
2100+
)
2101+
2102+
func getTestDurations() testDurationMap {
2103+
if m, ok := testDurations.Load().(testDurationMap); ok {
2104+
return m
2105+
}
2106+
testDurationsMu.Lock()
2107+
defer testDurationsMu.Unlock()
2108+
if m, ok := testDurations.Load().(testDurationMap); ok {
2109+
return m
2110+
}
2111+
updateTestDurationsLocked()
2112+
return testDurations.Load().(testDurationMap)
2113+
}
2114+
2115+
func updateTestDurations() {
2116+
testDurationsMu.Lock()
2117+
defer testDurationsMu.Unlock()
2118+
updateTestDurationsLocked()
2119+
}
2120+
2121+
func updateTestDurationsLocked() {
2122+
defer time.AfterFunc(1*time.Hour, updateTestDurations)
2123+
m := loadTestDurations()
2124+
testDurations.Store(m)
2125+
}
2126+
2127+
// The csv file on cloud storage looks like:
2128+
// Builder,Event,MedianSeconds,count
2129+
// linux-arm-arm5,run_test:runtime:cpu124,334.49922194,10
2130+
// linux-arm,run_test:runtime:cpu124,284.609130993,26
2131+
// linux-arm-arm5,run_test:go_test:cmd/compile/internal/gc,260.0241916,12
2132+
// linux-arm,run_test:go_test:cmd/compile/internal/gc,224.425924681,26
2133+
// solaris-amd64-smartosbuildlet,run_test:test:2_5,199.653975717,9
2134+
// solaris-amd64-smartosbuildlet,run_test:test:1_5,169.89733442,9
2135+
// solaris-amd64-smartosbuildlet,run_test:test:3_5,163.770453839,9
2136+
// solaris-amd64-smartosbuildlet,run_test:test:0_5,158.250119402,9
2137+
// openbsd-386-gce58,run_test:runtime:cpu124,146.494229388,12
2138+
func loadTestDurations() (m testDurationMap) {
2139+
m = make(testDurationMap)
2140+
r, err := storageClient.Bucket(buildEnv.BuildletBucket).Object("test-durations.csv").NewReader(context.Background())
2141+
if err != nil {
2142+
log.Printf("loading test durations object from GCS: %v", err)
2143+
return
2144+
}
2145+
defer r.Close()
2146+
recs, err := csv.NewReader(r).ReadAll()
2147+
if err != nil {
2148+
log.Printf("reading test durations CSV: %v", err)
2149+
return
2150+
}
2151+
for _, rec := range recs {
2152+
if len(rec) < 3 || rec[0] == "Builder" {
2153+
continue
2154+
}
2155+
builder, testName, secondsStr := rec[0], rec[1], rec[2]
2156+
secs, err := strconv.ParseFloat(secondsStr, 64)
2157+
if err != nil {
2158+
log.Printf("unexpected seconds value in test durations CSV: %v", err)
2159+
continue
2160+
}
2161+
mm := m[builder]
2162+
if mm == nil {
2163+
mm = make(map[string]time.Duration)
2164+
m[builder] = mm
2165+
}
2166+
mm[testName] = secondsToDuration(secs)
2167+
}
2168+
return
2169+
}
2170+
20892171
var minGoTestSpeed = (func() time.Duration {
20902172
var min Seconds
20912173
for name, secs := range fixedTestDuration {
@@ -2325,11 +2407,18 @@ var fixedTestDuration = map[string]Seconds{
23252407

23262408
// testDuration predicts how long the dist test 'name' will take 'name' will take.
23272409
// It's only a scheduling guess.
2328-
func testDuration(name string) time.Duration {
2329-
if secs, ok := fixedTestDuration[name]; ok {
2410+
func testDuration(builderName, testName string) time.Duration {
2411+
if false { // disabled for now. never tested. TODO: test, enable.
2412+
durs := getTestDurations()
2413+
bdur := durs[builderName]
2414+
if d, ok := bdur[testName]; ok {
2415+
return d
2416+
}
2417+
}
2418+
if secs, ok := fixedTestDuration[testName]; ok {
23302419
return secs.Duration()
23312420
}
2332-
if strings.HasPrefix(name, "bench:") {
2421+
if strings.HasPrefix(testName, "bench:") {
23332422
// Assume benchmarks are roughly 20 seconds per run.
23342423
return 2 * benchRuns * 20 * time.Second
23352424
}
@@ -2845,7 +2934,7 @@ func (s *testSet) initInOrder() {
28452934

28462935
// First do the go_test:* ones. partitionGoTests
28472936
// only returns those, which are the ones we merge together.
2848-
stdSets := partitionGoTests(names)
2937+
stdSets := partitionGoTests(s.st.builderRev.name, names)
28492938
for _, set := range stdSets {
28502939
tis := make([]*testItem, len(set))
28512940
for i, name := range set {

0 commit comments

Comments
 (0)