@@ -33,6 +33,7 @@ import (
3333 "github.com/prometheus/client_golang/prometheus"
3434 githubql "github.com/shurcooL/githubv4"
3535 "github.com/sirupsen/logrus"
36+ "k8s.io/apimachinery/pkg/runtime"
3637 "k8s.io/apimachinery/pkg/util/sets"
3738 ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
3839
@@ -203,15 +204,21 @@ func init() {
203204 prometheus .MustRegister (tideMetrics .poolErrors )
204205}
205206
207+ type manager interface {
208+ GetClient () ctrlruntimeclient.Client
209+ GetFieldIndexer () ctrlruntimeclient.FieldIndexer
210+ }
211+
206212// NewController makes a Controller out of the given clients.
207- func NewController (ghcSync , ghcStatus github.Client , prowJobClient ctrlruntimeclient. Client , cfg config.Getter , gc * git.Client , maxRecordsPerPool int , opener io.Opener , historyURI , statusURI string , logger * logrus.Entry ) (* Controller , error ) {
213+ func NewController (ghcSync , ghcStatus github.Client , mgr manager , cfg config.Getter , gc * git.Client , maxRecordsPerPool int , opener io.Opener , historyURI , statusURI string , logger * logrus.Entry ) (* Controller , error ) {
208214 if logger == nil {
209215 logger = logrus .NewEntry (logrus .StandardLogger ())
210216 }
211217 hist , err := history .New (maxRecordsPerPool , opener , historyURI )
212218 if err != nil {
213219 return nil , fmt .Errorf ("error initializing history client from %q: %v" , historyURI , err )
214220 }
221+
215222 sc := & statusController {
216223 logger : logger .WithField ("controller" , "status-update" ),
217224 ghc : ghcStatus ,
@@ -223,11 +230,30 @@ func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimecl
223230 path : statusURI ,
224231 }
225232 go sc .run ()
233+ return newSyncController (logger , ghcSync , mgr , cfg , gc , sc , hist )
234+ }
235+
236+ func newSyncController (
237+ logger * logrus.Entry ,
238+ ghcSync githubClient ,
239+ mgr manager ,
240+ cfg config.Getter ,
241+ gc * git.Client ,
242+ sc * statusController ,
243+ hist * history.History ,
244+ ) (* Controller , error ) {
245+ if err := mgr .GetFieldIndexer ().IndexField (
246+ & prowapi.ProwJob {},
247+ cacheIndexName ,
248+ cacheIndexFunc ,
249+ ); err != nil {
250+ return nil , fmt .Errorf ("failed to add baseSHA index to cache: %v" , err )
251+ }
226252 return & Controller {
227253 ctx : context .Background (),
228254 logger : logger .WithField ("controller" , "sync" ),
229255 ghc : ghcSync ,
230- prowJobClient : prowJobClient ,
256+ prowJobClient : mgr . GetClient () ,
231257 config : cfg ,
232258 gc : gc ,
233259 sc : sc ,
@@ -308,19 +334,9 @@ func (c *Controller) Sync() error {
308334 "duration" , time .Since (start ).String (),
309335 ).Debugf ("Found %d (unfiltered) pool PRs." , len (prs ))
310336
311- var pjs []prowapi.ProwJob
312337 var blocks blockers.Blockers
313338 var err error
314339 if len (prs ) > 0 {
315- start := time .Now ()
316- pjList := & prowapi.ProwJobList {}
317- if err := c .prowJobClient .List (c .ctx , pjList , ctrlruntimeclient .InNamespace (c .config ().ProwJobNamespace )); err != nil {
318- c .logger .WithField ("duration" , time .Since (start ).String ()).Debug ("Failed to list ProwJobs from the cluster." )
319- return err
320- }
321- c .logger .WithField ("duration" , time .Since (start ).String ()).Debug ("Listed ProwJobs from the cluster." )
322- pjs = pjList .Items
323-
324340 if label := c .config ().Tide .BlockerLabel ; label != "" {
325341 c .logger .Debugf ("Searching for blocking issues (label %q)." , label )
326342 orgExcepts , repos := c .config ().Tide .Queries .OrgExceptionsAndRepos ()
@@ -336,7 +352,7 @@ func (c *Controller) Sync() error {
336352 }
337353 }
338354 // Partition PRs into subpools and filter out non-pool PRs.
339- rawPools , err := c .dividePool (prs , pjs )
355+ rawPools , err := c .dividePool (prs )
340356 if err != nil {
341357 return err
342358 }
@@ -1382,7 +1398,7 @@ func poolKey(org, repo, branch string) string {
13821398
13831399// dividePool splits up the list of pull requests and prow jobs into a group
13841400// per repo and branch. It only keeps ProwJobs that match the latest branch.
1385- func (c * Controller ) dividePool (pool map [string ]PullRequest , pjs []prowapi. ProwJob ) (map [string ]* subpool , error ) {
1401+ func (c * Controller ) dividePool (pool map [string ]PullRequest ) (map [string ]* subpool , error ) {
13861402 sps := make (map [string ]* subpool )
13871403 for _ , pr := range pool {
13881404 org := string (pr .Repository .Owner .Login )
@@ -1410,15 +1426,19 @@ func (c *Controller) dividePool(pool map[string]PullRequest, pjs []prowapi.ProwJ
14101426 }
14111427 sps [fn ].prs = append (sps [fn ].prs , pr )
14121428 }
1413- for _ , pj := range pjs {
1414- if pj .Spec .Type != prowapi .PresubmitJob && pj .Spec .Type != prowapi .BatchJob {
1415- continue
1416- }
1417- fn := poolKey (pj .Spec .Refs .Org , pj .Spec .Refs .Repo , pj .Spec .Refs .BaseRef )
1418- if sps [fn ] == nil || pj .Spec .Refs .BaseSHA != sps [fn ].sha {
1419- continue
1429+
1430+ for subpoolkey , sp := range sps {
1431+ pjs := & prowapi.ProwJobList {}
1432+ err := c .prowJobClient .List (
1433+ c .ctx ,
1434+ pjs ,
1435+ ctrlruntimeclient .MatchingField (cacheIndexName , cacheIndexKey (sp .org , sp .repo , sp .branch , sp .sha )),
1436+ ctrlruntimeclient .InNamespace (c .config ().ProwJobNamespace ))
1437+ if err != nil {
1438+ return nil , fmt .Errorf ("failed to list jobs for subpool %s: %v" , subpoolkey , err )
14201439 }
1421- sps [fn ].pjs = append (sps [fn ].pjs , pj )
1440+ c .logger .WithField ("subpool" , subpoolkey ).Debugf ("Found %d prowjobs." , len (pjs .Items ))
1441+ sps [subpoolkey ].pjs = pjs .Items
14221442 }
14231443 return sps , nil
14241444}
@@ -1571,3 +1591,28 @@ func orgRepoQueryString(orgs, repos []string, orgExceptions map[string]sets.Stri
15711591 }
15721592 return strings .Join (toks , " " )
15731593}
1594+
1595+ // cacheIndexName is the name of the index that indexes presubmit+batch ProwJobs by
1596+ // org+repo+branch+baseSHA. Use the cacheIndexKey func to get the correct key.
1597+ const cacheIndexName = "tide-global-index"
1598+
1599+ // cacheIndexKey returns the index key for the tideCacheIndex
1600+ func cacheIndexKey (org , repo , branch , baseSHA string ) string {
1601+ return fmt .Sprintf ("%s/%s:%s@%s" , org , repo , branch , baseSHA )
1602+ }
1603+
1604+ func cacheIndexFunc (obj runtime.Object ) []string {
1605+ pj , ok := obj .(* prowapi.ProwJob )
1606+ // Should never happen
1607+ if ! ok {
1608+ return nil
1609+ }
1610+ // We do not care about jobs other than presubmit and batch
1611+ if pj .Spec .Type != prowapi .PresubmitJob && pj .Spec .Type != prowapi .BatchJob {
1612+ return nil
1613+ }
1614+ if pj .Spec .Refs == nil {
1615+ return nil
1616+ }
1617+ return []string {cacheIndexKey (pj .Spec .Refs .Org , pj .Spec .Refs .Repo , pj .Spec .Refs .BaseRef , pj .Spec .Refs .BaseSHA )}
1618+ }
0 commit comments