-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Tide: Use an index on the lister to filter prowjobs for subpool #14830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ import ( | |
| "github.com/prometheus/client_golang/prometheus" | ||
| githubql "github.com/shurcooL/githubv4" | ||
| "github.com/sirupsen/logrus" | ||
| "k8s.io/apimachinery/pkg/runtime" | ||
| "k8s.io/apimachinery/pkg/util/sets" | ||
| ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" | ||
|
|
||
|
|
@@ -203,15 +204,21 @@ func init() { | |
| prometheus.MustRegister(tideMetrics.poolErrors) | ||
| } | ||
|
|
||
| type manager interface { | ||
| GetClient() ctrlruntimeclient.Client | ||
| GetFieldIndexer() ctrlruntimeclient.FieldIndexer | ||
| } | ||
|
|
||
| // NewController makes a Controller out of the given clients. | ||
| func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimeclient.Client, cfg config.Getter, gc *git.Client, maxRecordsPerPool int, opener io.Opener, historyURI, statusURI string, logger *logrus.Entry) (*Controller, error) { | ||
| func NewController(ghcSync, ghcStatus github.Client, mgr manager, cfg config.Getter, gc *git.Client, maxRecordsPerPool int, opener io.Opener, historyURI, statusURI string, logger *logrus.Entry) (*Controller, error) { | ||
| if logger == nil { | ||
| logger = logrus.NewEntry(logrus.StandardLogger()) | ||
| } | ||
| hist, err := history.New(maxRecordsPerPool, opener, historyURI) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("error initializing history client from %q: %v", historyURI, err) | ||
| } | ||
|
|
||
| sc := &statusController{ | ||
| logger: logger.WithField("controller", "status-update"), | ||
| ghc: ghcStatus, | ||
|
|
@@ -223,11 +230,30 @@ func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimecl | |
| path: statusURI, | ||
| } | ||
| go sc.run() | ||
| return newSyncController(logger, ghcSync, mgr, cfg, gc, sc, hist) | ||
| } | ||
|
|
||
| func newSyncController( | ||
| logger *logrus.Entry, | ||
| ghcSync githubClient, | ||
| mgr manager, | ||
| cfg config.Getter, | ||
| gc *git.Client, | ||
| sc *statusController, | ||
| hist *history.History, | ||
| ) (*Controller, error) { | ||
| if err := mgr.GetFieldIndexer().IndexField( | ||
| &prowapi.ProwJob{}, | ||
| cacheIndexName, | ||
| cacheIndexFunc, | ||
| ); err != nil { | ||
| return nil, fmt.Errorf("failed to add baseSHA index to cache: %v", err) | ||
| } | ||
| return &Controller{ | ||
| ctx: context.Background(), | ||
| logger: logger.WithField("controller", "sync"), | ||
| ghc: ghcSync, | ||
| prowJobClient: prowJobClient, | ||
| prowJobClient: mgr.GetClient(), | ||
| config: cfg, | ||
| gc: gc, | ||
| sc: sc, | ||
|
|
@@ -308,19 +334,9 @@ func (c *Controller) Sync() error { | |
| "duration", time.Since(start).String(), | ||
| ).Debugf("Found %d (unfiltered) pool PRs.", len(prs)) | ||
|
|
||
| var pjs []prowapi.ProwJob | ||
| var blocks blockers.Blockers | ||
| var err error | ||
| if len(prs) > 0 { | ||
| start := time.Now() | ||
| pjList := &prowapi.ProwJobList{} | ||
| if err := c.prowJobClient.List(c.ctx, pjList, ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace)); err != nil { | ||
| c.logger.WithField("duration", time.Since(start).String()).Debug("Failed to list ProwJobs from the cluster.") | ||
| return err | ||
| } | ||
| c.logger.WithField("duration", time.Since(start).String()).Debug("Listed ProwJobs from the cluster.") | ||
| pjs = pjList.Items | ||
|
|
||
| if label := c.config().Tide.BlockerLabel; label != "" { | ||
| c.logger.Debugf("Searching for blocking issues (label %q).", label) | ||
| orgExcepts, repos := c.config().Tide.Queries.OrgExceptionsAndRepos() | ||
|
|
@@ -336,7 +352,7 @@ func (c *Controller) Sync() error { | |
| } | ||
| } | ||
| // Partition PRs into subpools and filter out non-pool PRs. | ||
| rawPools, err := c.dividePool(prs, pjs) | ||
| rawPools, err := c.dividePool(prs) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -1382,7 +1398,7 @@ func poolKey(org, repo, branch string) string { | |
|
|
||
| // dividePool splits up the list of pull requests and prow jobs into a group | ||
| // per repo and branch. It only keeps ProwJobs that match the latest branch. | ||
| func (c *Controller) dividePool(pool map[string]PullRequest, pjs []prowapi.ProwJob) (map[string]*subpool, error) { | ||
| func (c *Controller) dividePool(pool map[string]PullRequest) (map[string]*subpool, error) { | ||
| sps := make(map[string]*subpool) | ||
| for _, pr := range pool { | ||
| org := string(pr.Repository.Owner.Login) | ||
|
|
@@ -1410,15 +1426,19 @@ func (c *Controller) dividePool(pool map[string]PullRequest, pjs []prowapi.ProwJ | |
| } | ||
| sps[fn].prs = append(sps[fn].prs, pr) | ||
| } | ||
| for _, pj := range pjs { | ||
| if pj.Spec.Type != prowapi.PresubmitJob && pj.Spec.Type != prowapi.BatchJob { | ||
| continue | ||
| } | ||
| fn := poolKey(pj.Spec.Refs.Org, pj.Spec.Refs.Repo, pj.Spec.Refs.BaseRef) | ||
| if sps[fn] == nil || pj.Spec.Refs.BaseSHA != sps[fn].sha { | ||
| continue | ||
|
|
||
| for subpoolkey, sp := range sps { | ||
| pjs := &prowapi.ProwJobList{} | ||
| err := c.prowJobClient.List( | ||
| c.ctx, | ||
| pjs, | ||
| ctrlruntimeclient.MatchingField(cacheIndexName, cacheIndexKey(sp.org, sp.repo, sp.branch, sp.sha)), | ||
| ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace)) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to list jobs for subpool %s: %v", subpoolkey, err) | ||
| } | ||
| sps[fn].pjs = append(sps[fn].pjs, pj) | ||
| c.logger.WithField("subpool", subpoolkey).Debugf("Found %d prowjobs.", len(pjs.Items)) | ||
| sps[subpoolkey].pjs = pjs.Items | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fact that the base SHA matches is not enough to know that this PJ matches the tide pool. It is possible for the same base sha to exist in different org/repo. More realistically, it is also possible for two different branches ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't necessarily need to make the index more complicated, but if we keep it as is we need to filter out PJs for other subpools like we did here before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have unit tests for these scenarios? That's the best way to ensure it remains invariant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alvaro added unit tests, but they require the (Also if this is what we want to do, do you have any recommendations on the best way to do this with bazel?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add actual unit tests? I don't see any reason why we would need etcd in order to validate the behavior of
AKA ensuring
This seems more useful/important than validating the internal behavior of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Added a Generally, using a cache with an index is probably the most efficient solution for filtering kube api objects repeatedly, so it would be great if we can find a way to test that that is agreeable for everyone :) How do you feel about using envtest for that @fejta ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Faking the interface seems like the most appropriate strategy here. Making everything depend on making real calls to the apiserver is an inefficient test strategy. We're just consumers of all this code. Why can't we just assume kubernetes works correctly? We do not, for example, validate the the Google cloud storage client transfers files correctly. We just assume it works and fake the return values. This is efficient and hasn't lead to any regressions. I would recommend this faking strategy here over replacing unit testing with integration testing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay. I've also opened kubernetes-sigs/controller-runtime#657 for this, because ideally we don't have to build fakes downstream. While a test for "do we actually add the index to the cache" definitely makes sense, adding a fake client that uses an index pretty much only tests our dependencies (that do have tests for this) and our fake implementation. For the sake of getting this done, I'll add it nonetheless. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that is probably more work than is justified? This is the call to test: err := c.prowJobClient.List(
c.ctx,
pjs,
ctrlruntimeclient.MatchingField(cacheIndexName, cacheIndexKey(sp.org, sp.repo, sp.branch, sp.sha)),
ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace))So wrap it in a function: type lister interface {
List(context.Context, *prowapi.ProwJobList, matchingFieldRetValue, namespaceRetValue) error
}
func listMatchingJobs(ctx context.Context, prowJobClient lister, sp subpool, namespace string) (*prowapi.ProwJobList, error) {
var pjs prowapi.ProwJobList
if err := prowJobClient.List(ctx, &pjs, MatchingField(cacheIndexName, cacheIndexKey(sp.org, sp.repo, sp.branch, sp.sha)), InNamespace(namespace); err != nil {
return nil, err
}
return &pjsNow write a Change all the list calls to this helper function. Done. I don't think creating a fake client that works correctly is super necessary here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well I ended up writing a small implementation of the subset of the I think this should be good now, PTAL. |
||
| } | ||
| return sps, nil | ||
| } | ||
|
|
@@ -1571,3 +1591,28 @@ func orgRepoQueryString(orgs, repos []string, orgExceptions map[string]sets.Stri | |
| } | ||
| return strings.Join(toks, " ") | ||
| } | ||
|
|
||
| // cacheIndexName is the name of the index that indexes presubmit+batch ProwJobs by | ||
| // org+repo+branch+baseSHA. Use the cacheIndexKey func to get the correct key. | ||
| const cacheIndexName = "tide-global-index" | ||
|
|
||
| // cacheIndexKey returns the index key for the tideCacheIndex | ||
| func cacheIndexKey(org, repo, branch, baseSHA string) string { | ||
| return fmt.Sprintf("%s/%s:%s@%s", org, repo, branch, baseSHA) | ||
| } | ||
|
|
||
| func cacheIndexFunc(obj runtime.Object) []string { | ||
| pj, ok := obj.(*prowapi.ProwJob) | ||
| // Should never happen | ||
| if !ok { | ||
| return nil | ||
| } | ||
| // We do not care about jobs other than presubmit and batch | ||
| if pj.Spec.Type != prowapi.PresubmitJob && pj.Spec.Type != prowapi.BatchJob { | ||
| return nil | ||
| } | ||
| if pj.Spec.Refs == nil { | ||
| return nil | ||
| } | ||
| return []string{cacheIndexKey(pj.Spec.Refs.Org, pj.Spec.Refs.Repo, pj.Spec.Refs.BaseRef, pj.Spec.Refs.BaseSHA)} | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.