diff --git a/go.mod b/go.mod index b61fbddedcb1..0ba06193af02 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/go-logr/zapr v0.1.1 // indirect github.com/go-openapi/spec v0.19.2 github.com/go-sql-driver/mysql v0.0.0-20160411075031-7ebe0a500653 // indirect + github.com/go-test/deep v1.0.4 github.com/gobuffalo/envy v1.6.15 // indirect github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b diff --git a/go.sum b/go.sum index b9855efd9426..b807ed2d4617 100644 --- a/go.sum +++ b/go.sum @@ -165,6 +165,8 @@ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+ github.com/go-sql-driver/mysql v0.0.0-20160411075031-7ebe0a500653 h1:cRYCw9putX9pmpNdGkE/rWrY6gNGqZvdgGT1RPl6K18= github.com/go-sql-driver/mysql v0.0.0-20160411075031-7ebe0a500653/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o= github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/gobuffalo/envy v1.6.5 h1:X3is06x7v0nW2xiy2yFbbIjwHz57CD6z6MkvqULTCm8= diff --git a/prow/cmd/tide/BUILD.bazel b/prow/cmd/tide/BUILD.bazel index ba1e01c21c45..0f4de0d25be4 100644 --- a/prow/cmd/tide/BUILD.bazel +++ b/prow/cmd/tide/BUILD.bazel @@ -15,7 +15,6 @@ go_library( deps = [ "//pkg/flagutil:go_default_library", "//pkg/io:go_default_library", - "//prow/apis/prowjobs/v1:go_default_library", "//prow/config:go_default_library", "//prow/config/secret:go_default_library", "//prow/flagutil:go_default_library", diff --git a/prow/cmd/tide/main.go b/prow/cmd/tide/main.go index 116b5f71cae7..571d22665484 100644 --- a/prow/cmd/tide/main.go +++ b/prow/cmd/tide/main.go @@ -30,7 +30,6 @@ import ( "k8s.io/test-infra/pkg/flagutil" "k8s.io/test-infra/pkg/io" - prowjobsv1 "k8s.io/test-infra/prow/apis/prowjobs/v1" "k8s.io/test-infra/prow/config" "k8s.io/test-infra/prow/config/secret" prowflagutil "k8s.io/test-infra/prow/flagutil" @@ -171,26 +170,21 @@ func main() { if err != nil { logrus.WithError(err).Fatal("Error constructing mgr.") } - // Make sure the manager creates a cache for ProwJobs by requesting an informer - if _, err := mgr.GetCache().GetInformer(&prowjobsv1.ProwJob{}); err != nil { - logrus.WithError(err).Fatal("Error getting ProwJob informer.") + c, err := tide.NewController(githubSync, githubStatus, mgr, cfg, gitClient, o.maxRecordsPerPool, opener, o.historyURI, o.statusURI, nil) + if err != nil { + logrus.WithError(err).Fatal("Error creating Tide controller.") } - interrupts.Run(func(ctx context.Context) { if err := mgr.Start(ctx.Done()); err != nil { logrus.WithError(err).Fatal("Mgr failed.") } + logrus.Info("Mgr finished gracefully.") }) mgrSyncCtx, mgrSyncCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) defer mgrSyncCtxCancel() if synced := mgr.GetCache().WaitForCacheSync(mgrSyncCtx.Done()); !synced { logrus.Fatal("Timed out waiting for cachesync") } - - c, err := tide.NewController(githubSync, githubStatus, mgr.GetClient(), cfg, gitClient, o.maxRecordsPerPool, opener, o.historyURI, o.statusURI, nil) - if err != nil { - logrus.WithError(err).Fatal("Error creating Tide controller.") - } interrupts.OnInterrupt(func() { c.Shutdown() if err := gitClient.Clean(); err != nil { diff --git a/prow/tide/BUILD.bazel b/prow/tide/BUILD.bazel index 159d5a663da0..97d5df5d8ef1 100644 --- a/prow/tide/BUILD.bazel +++ b/prow/tide/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "@com_github_shurcool_githubv4//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", + "@io_k8s_apimachinery//pkg/runtime:go_default_library", "@io_k8s_apimachinery//pkg/util/sets:go_default_library", "@io_k8s_sigs_controller_runtime//pkg/client:go_default_library", "@io_k8s_sigs_yaml//:go_default_library", @@ -63,12 +64,15 @@ go_test( "//prow/github:go_default_library", "//prow/tide/blockers:go_default_library", "//prow/tide/history:go_default_library", + "@com_github_go_test_deep//:go_default_library", "@com_github_shurcool_githubv4//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@io_k8s_apimachinery//pkg/api/equality:go_default_library", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", + "@io_k8s_apimachinery//pkg/runtime:go_default_library", "@io_k8s_apimachinery//pkg/util/diff:go_default_library", "@io_k8s_apimachinery//pkg/util/sets:go_default_library", + "@io_k8s_sigs_controller_runtime//pkg/client:go_default_library", "@io_k8s_sigs_controller_runtime//pkg/client/fake:go_default_library", ], ) diff --git a/prow/tide/tide.go b/prow/tide/tide.go index 8ce830f82148..23b70d013a43 100644 --- a/prow/tide/tide.go +++ b/prow/tide/tide.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" githubql "github.com/shurcooL/githubv4" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -203,8 +204,13 @@ func init() { prometheus.MustRegister(tideMetrics.poolErrors) } +type manager interface { + GetClient() ctrlruntimeclient.Client + GetFieldIndexer() ctrlruntimeclient.FieldIndexer +} + // NewController makes a Controller out of the given clients. -func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimeclient.Client, cfg config.Getter, gc *git.Client, maxRecordsPerPool int, opener io.Opener, historyURI, statusURI string, logger *logrus.Entry) (*Controller, error) { +func NewController(ghcSync, ghcStatus github.Client, mgr manager, cfg config.Getter, gc *git.Client, maxRecordsPerPool int, opener io.Opener, historyURI, statusURI string, logger *logrus.Entry) (*Controller, error) { if logger == nil { logger = logrus.NewEntry(logrus.StandardLogger()) } @@ -212,6 +218,7 @@ func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimecl if err != nil { return nil, fmt.Errorf("error initializing history client from %q: %v", historyURI, err) } + sc := &statusController{ logger: logger.WithField("controller", "status-update"), ghc: ghcStatus, @@ -223,11 +230,30 @@ func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimecl path: statusURI, } go sc.run() + return newSyncController(logger, ghcSync, mgr, cfg, gc, sc, hist) +} + +func newSyncController( + logger *logrus.Entry, + ghcSync githubClient, + mgr manager, + cfg config.Getter, + gc *git.Client, + sc *statusController, + hist *history.History, +) (*Controller, error) { + if err := mgr.GetFieldIndexer().IndexField( + &prowapi.ProwJob{}, + cacheIndexName, + cacheIndexFunc, + ); err != nil { + return nil, fmt.Errorf("failed to add baseSHA index to cache: %v", err) + } return &Controller{ ctx: context.Background(), logger: logger.WithField("controller", "sync"), ghc: ghcSync, - prowJobClient: prowJobClient, + prowJobClient: mgr.GetClient(), config: cfg, gc: gc, sc: sc, @@ -308,19 +334,9 @@ func (c *Controller) Sync() error { "duration", time.Since(start).String(), ).Debugf("Found %d (unfiltered) pool PRs.", len(prs)) - var pjs []prowapi.ProwJob var blocks blockers.Blockers var err error if len(prs) > 0 { - start := time.Now() - pjList := &prowapi.ProwJobList{} - if err := c.prowJobClient.List(c.ctx, pjList, ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace)); err != nil { - c.logger.WithField("duration", time.Since(start).String()).Debug("Failed to list ProwJobs from the cluster.") - return err - } - c.logger.WithField("duration", time.Since(start).String()).Debug("Listed ProwJobs from the cluster.") - pjs = pjList.Items - if label := c.config().Tide.BlockerLabel; label != "" { c.logger.Debugf("Searching for blocking issues (label %q).", label) orgExcepts, repos := c.config().Tide.Queries.OrgExceptionsAndRepos() @@ -336,7 +352,7 @@ func (c *Controller) Sync() error { } } // Partition PRs into subpools and filter out non-pool PRs. - rawPools, err := c.dividePool(prs, pjs) + rawPools, err := c.dividePool(prs) if err != nil { return err } @@ -1382,7 +1398,7 @@ func poolKey(org, repo, branch string) string { // dividePool splits up the list of pull requests and prow jobs into a group // per repo and branch. It only keeps ProwJobs that match the latest branch. -func (c *Controller) dividePool(pool map[string]PullRequest, pjs []prowapi.ProwJob) (map[string]*subpool, error) { +func (c *Controller) dividePool(pool map[string]PullRequest) (map[string]*subpool, error) { sps := make(map[string]*subpool) for _, pr := range pool { org := string(pr.Repository.Owner.Login) @@ -1410,15 +1426,19 @@ func (c *Controller) dividePool(pool map[string]PullRequest, pjs []prowapi.ProwJ } sps[fn].prs = append(sps[fn].prs, pr) } - for _, pj := range pjs { - if pj.Spec.Type != prowapi.PresubmitJob && pj.Spec.Type != prowapi.BatchJob { - continue - } - fn := poolKey(pj.Spec.Refs.Org, pj.Spec.Refs.Repo, pj.Spec.Refs.BaseRef) - if sps[fn] == nil || pj.Spec.Refs.BaseSHA != sps[fn].sha { - continue + + for subpoolkey, sp := range sps { + pjs := &prowapi.ProwJobList{} + err := c.prowJobClient.List( + c.ctx, + pjs, + ctrlruntimeclient.MatchingField(cacheIndexName, cacheIndexKey(sp.org, sp.repo, sp.branch, sp.sha)), + ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace)) + if err != nil { + return nil, fmt.Errorf("failed to list jobs for subpool %s: %v", subpoolkey, err) } - sps[fn].pjs = append(sps[fn].pjs, pj) + c.logger.WithField("subpool", subpoolkey).Debugf("Found %d prowjobs.", len(pjs.Items)) + sps[subpoolkey].pjs = pjs.Items } return sps, nil } @@ -1571,3 +1591,28 @@ func orgRepoQueryString(orgs, repos []string, orgExceptions map[string]sets.Stri } return strings.Join(toks, " ") } + +// cacheIndexName is the name of the index that indexes presubmit+batch ProwJobs by +// org+repo+branch+baseSHA. Use the cacheIndexKey func to get the correct key. +const cacheIndexName = "tide-global-index" + +// cacheIndexKey returns the index key for the tideCacheIndex +func cacheIndexKey(org, repo, branch, baseSHA string) string { + return fmt.Sprintf("%s/%s:%s@%s", org, repo, branch, baseSHA) +} + +func cacheIndexFunc(obj runtime.Object) []string { + pj, ok := obj.(*prowapi.ProwJob) + // Should never happen + if !ok { + return nil + } + // We do not care about jobs other than presubmit and batch + if pj.Spec.Type != prowapi.PresubmitJob && pj.Spec.Type != prowapi.BatchJob { + return nil + } + if pj.Spec.Refs == nil { + return nil + } + return []string{cacheIndexKey(pj.Spec.Refs.Org, pj.Spec.Refs.Repo, pj.Spec.Refs.BaseRef, pj.Spec.Refs.BaseSHA)} +} diff --git a/prow/tide/tide_test.go b/prow/tide/tide_test.go index c0f34d42d4d4..5a50378ac1bb 100644 --- a/prow/tide/tide_test.go +++ b/prow/tide/tide_test.go @@ -28,12 +28,16 @@ import ( "text/template" "time" + "github.com/go-test/deep" githubql "github.com/shurcooL/githubv4" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/sets" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" fakectrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" prowapi "k8s.io/test-infra/prow/apis/prowjobs/v1" @@ -673,23 +677,37 @@ func TestDividePool(t *testing.T) { "k/k heads/release-1.6": "789", }, } - c := &Controller{ - ghc: fc, - gc: &git.Client{}, - logger: logrus.WithField("component", "tide"), + configGetter := func() *config.Config { + return &config.Config{ + ProwConfig: config.ProwConfig{ + ProwJobNamespace: "default", + }, + } } - pulls := make(map[string]PullRequest) - for _, p := range testPulls { - npr := PullRequest{Number: githubql.Int(p.number)} - npr.BaseRef.Name = githubql.String(p.branch) - npr.BaseRef.Prefix = "refs/heads/" - npr.Repository.Name = githubql.String(p.repo) - npr.Repository.Owner.Login = githubql.String(p.org) - pulls[prKey(&npr)] = npr + + client := &indexingClient{ + Client: fakectrlruntimeclient.NewFakeClient(), + indexFuncs: map[string]ctrlruntimeclient.IndexerFunc{}, + } + mgr := &fakeManager{ + client: client, + fakeFieldIndexer: &fakeFieldIndexer{ + client: client, + }, } - var pjs []prowapi.ProwJob - for _, pj := range testPJs { - pjs = append(pjs, prowapi.ProwJob{ + + c, err := newSyncController( + logrus.NewEntry(logrus.StandardLogger()), fc, mgr, configGetter, &git.Client{}, nil, nil, + ) + if err != nil { + t.Fatalf("failed to construct sync controller: %v", err) + } + for idx, pj := range testPJs { + prowjob := &prowapi.ProwJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pj-%d", idx), + Namespace: "default", + }, Spec: prowapi.ProwJobSpec{ Type: pj.jobType, Refs: &prowapi.Refs{ @@ -699,9 +717,21 @@ func TestDividePool(t *testing.T) { BaseSHA: pj.baseSHA, }, }, - }) + } + if err := mgr.GetClient().Create(context.Background(), prowjob); err != nil { + t.Fatalf("failed to create prowjob: %v", err) + } + } + pulls := make(map[string]PullRequest) + for _, p := range testPulls { + npr := PullRequest{Number: githubql.Int(p.number)} + npr.BaseRef.Name = githubql.String(p.branch) + npr.BaseRef.Prefix = "refs/heads/" + npr.Repository.Name = githubql.String(p.repo) + npr.Repository.Owner.Login = githubql.String(p.org) + pulls[prKey(&npr)] = npr } - sps, err := c.dividePool(pulls, pjs) + sps, err := c.dividePool(pulls) if err != nil { t.Fatalf("Error dividing pool: %v", err) } @@ -726,8 +756,14 @@ func TestDividePool(t *testing.T) { if pj.Spec.Type != prowapi.PresubmitJob && pj.Spec.Type != prowapi.BatchJob { t.Errorf("PJ with bad type in subpool %s: %+v", name, pj) } - if pj.Spec.Refs.Org != sp.org || pj.Spec.Refs.Repo != sp.repo || pj.Spec.Refs.BaseRef != sp.branch || pj.Spec.Refs.BaseSHA != sp.sha { - t.Errorf("PJ in wrong subpool. Got PJ %+v in subpool %s.", pj, name) + referenceRef := &prowapi.Refs{ + Org: sp.org, + Repo: sp.repo, + BaseRef: sp.branch, + BaseSHA: sp.sha, + } + if diff := deep.Equal(pj.Spec.Refs, referenceRef); diff != nil { + t.Errorf("Got PJ with wrong refs, diff: %v", diff) } } } @@ -3060,3 +3096,157 @@ func getPR(org, name string, number int, opts ...func(*PullRequest)) PullRequest } return pr } + +func TestCacheIndexFuncReturnsDifferentResultsForDifferentInputs(t *testing.T) { + type orgRepoBranch struct{ org, repo, branch string } + + results := sets.String{} + inputs := []orgRepoBranch{ + {"org-a", "repo-a", "branch-a"}, + {"org-a", "repo-a", "branch-b"}, + {"org-a", "repo-b", "branch-a"}, + {"org-b", "repo-a", "branch-a"}, + } + for _, input := range inputs { + pj := getProwJob(prowapi.PresubmitJob, input.org, input.repo, input.branch, "123") + idx := cacheIndexFunc(pj) + if n := len(idx); n != 1 { + t.Fatalf("expected to get exactly one index back, got %d", n) + } + if results.Has(idx[0]) { + t.Errorf("got duplicate idx %q", idx) + } + results.Insert(idx[0]) + } +} + +func TestCacheIndexFunc(t *testing.T) { + testCases := []struct { + name string + prowjob *prowapi.ProwJob + expectedResult string + }{ + { + name: "Wrong type, no result", + prowjob: &prowapi.ProwJob{}, + }, + { + name: "No refs, no result", + prowjob: getProwJob(prowapi.PresubmitJob, "", "", "", ""), + }, + { + name: "presubmit job", + prowjob: getProwJob(prowapi.PresubmitJob, "org", "repo", "master", "123"), + expectedResult: "org/repo:master@123", + }, + { + name: "Batch job", + prowjob: getProwJob(prowapi.BatchJob, "org", "repo", "next", "1234"), + expectedResult: "org/repo:next@1234", + }, + } + + for idx := range testCases { + tc := testCases[idx] + t.Run(tc.name, func(t *testing.T) { + result := cacheIndexFunc(tc.prowjob) + if n := len(result); n > 1 { + t.Errorf("expected at most one result, got %d", n) + } + + var resultString string + if len(result) == 1 { + resultString = result[0] + } + + if resultString != tc.expectedResult { + t.Errorf("Expected result %q, got result %q", tc.expectedResult, resultString) + } + }) + } +} + +func getProwJob(pjtype prowapi.ProwJobType, org, repo, branch, sha string) *prowapi.ProwJob { + pj := &prowapi.ProwJob{} + pj.Spec.Type = pjtype + if org != "" || repo != "" || branch != "" || sha != "" { + pj.Spec.Refs = &prowapi.Refs{ + Org: org, + Repo: repo, + BaseRef: branch, + BaseSHA: sha, + } + } + return pj +} + +type fakeManager struct { + client *indexingClient + *fakeFieldIndexer +} + +type fakeFieldIndexer struct { + client *indexingClient +} + +func (fi *fakeFieldIndexer) IndexField(_ runtime.Object, field string, extractValue ctrlruntimeclient.IndexerFunc) error { + fi.client.indexFuncs[field] = extractValue + return nil +} + +func (fm *fakeManager) GetClient() ctrlruntimeclient.Client { + return fm.client +} + +func (fm *fakeManager) GetFieldIndexer() ctrlruntimeclient.FieldIndexer { + return fm.fakeFieldIndexer +} + +type indexingClient struct { + ctrlruntimeclient.Client + indexFuncs map[string]ctrlruntimeclient.IndexerFunc +} + +func (c *indexingClient) List(ctx context.Context, list runtime.Object, opts ...ctrlruntimeclient.ListOption) error { + if err := c.Client.List(ctx, list, opts...); err != nil { + return err + } + + listOpts := &ctrlruntimeclient.ListOptions{} + for _, opt := range opts { + opt.ApplyToList(listOpts) + } + + if n := len(listOpts.FieldSelector.Requirements()); n == 0 { + return nil + } else if n > 1 { + return fmt.Errorf("the indexing client supports at most one field selector requirement, got %d", n) + } + + indexKey := listOpts.FieldSelector.Requirements()[0].Field + if indexKey == "" { + return nil + } + + indexFunc, ok := c.indexFuncs[indexKey] + if !ok { + return fmt.Errorf("no index with key %q found", indexKey) + } + + pjList, ok := list.(*prowapi.ProwJobList) + if !ok { + return errors.New("indexes are only supported for ProwJobLists") + } + + result := prowapi.ProwJobList{} + for _, pj := range pjList.Items { + for _, indexVal := range indexFunc(&pj) { + if indexVal == listOpts.FieldSelector.Requirements()[0].Value { + result.Items = append(result.Items, pj) + } + } + } + + *pjList = result + return nil +} diff --git a/repos.bzl b/repos.bzl index fd028b98835c..dcfea7786702 100644 --- a/repos.bzl +++ b/repos.bzl @@ -2179,3 +2179,11 @@ def go_repositories(): sum = "h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=", version = "v0.0.0-20190717185122-a985d3407aa7", ) + go_repository( + name = "com_github_go_test_deep", + build_file_generation = "on", + build_file_proto_mode = "disable", + importpath = "github.com/go-test/deep", + sum = "h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho=", + version = "v1.0.4", + )