@@ -33,8 +33,10 @@ import (
3333	"github.com/prometheus/client_golang/prometheus" 
3434	githubql "github.com/shurcooL/githubv4" 
3535	"github.com/sirupsen/logrus" 
36+ 	"k8s.io/apimachinery/pkg/runtime" 
3637	"k8s.io/apimachinery/pkg/util/sets" 
3738	ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" 
39+ 	"sigs.k8s.io/controller-runtime/pkg/manager" 
3840
3941	"k8s.io/test-infra/pkg/io" 
4042	prowapi "k8s.io/test-infra/prow/apis/prowjobs/v1" 
@@ -204,14 +206,15 @@ func init() {
204206}
205207
206208// NewController makes a Controller out of the given clients. 
207- func  NewController (ghcSync , ghcStatus  github.Client , prowJobClient  ctrlruntimeclient. Client , cfg  config.Getter , gc  * git.Client , maxRecordsPerPool  int , opener  io.Opener , historyURI , statusURI  string , logger  * logrus.Entry ) (* Controller , error ) {
209+ func  NewController (ghcSync , ghcStatus  github.Client , mgr  manager. Manager , cfg  config.Getter , gc  * git.Client , maxRecordsPerPool  int , opener  io.Opener , historyURI , statusURI  string , logger  * logrus.Entry ) (* Controller , error ) {
208210	if  logger  ==  nil  {
209211		logger  =  logrus .NewEntry (logrus .StandardLogger ())
210212	}
211213	hist , err  :=  history .New (maxRecordsPerPool , opener , historyURI )
212214	if  err  !=  nil  {
213215		return  nil , fmt .Errorf ("error initializing history client from %q: %v" , historyURI , err )
214216	}
217+ 
215218	sc  :=  & statusController {
216219		logger :         logger .WithField ("controller" , "status-update" ),
217220		ghc :            ghcStatus ,
@@ -223,11 +226,30 @@ func NewController(ghcSync, ghcStatus github.Client, prowJobClient ctrlruntimecl
223226		path :           statusURI ,
224227	}
225228	go  sc .run ()
229+ 	return  newSyncController (logger , ghcSync , mgr , cfg , gc , sc , hist )
230+ }
231+ 
232+ func  newSyncController (
233+ 	logger  * logrus.Entry ,
234+ 	ghcSync  githubClient ,
235+ 	mgr  manager.Manager ,
236+ 	cfg  config.Getter ,
237+ 	gc  * git.Client ,
238+ 	sc  * statusController ,
239+ 	hist  * history.History ,
240+ ) (* Controller , error ) {
241+ 	if  err  :=  mgr .GetCache ().IndexField (
242+ 		& prowapi.ProwJob {},
243+ 		cacheIndexName ,
244+ 		cacheIndexFunc ,
245+ 	); err  !=  nil  {
246+ 		return  nil , fmt .Errorf ("failed to add baseSHA index to cache: %v" , err )
247+ 	}
226248	return  & Controller {
227249		ctx :           context .Background (),
228250		logger :        logger .WithField ("controller" , "sync" ),
229251		ghc :           ghcSync ,
230- 		prowJobClient : prowJobClient ,
252+ 		prowJobClient : mgr . GetClient () ,
231253		config :        cfg ,
232254		gc :            gc ,
233255		sc :            sc ,
@@ -308,19 +330,9 @@ func (c *Controller) Sync() error {
308330		"duration" , time .Since (start ).String (),
309331	).Debugf ("Found %d (unfiltered) pool PRs." , len (prs ))
310332
311- 	var  pjs  []prowapi.ProwJob 
312333	var  blocks  blockers.Blockers 
313334	var  err  error 
314335	if  len (prs ) >  0  {
315- 		start  :=  time .Now ()
316- 		pjList  :=  & prowapi.ProwJobList {}
317- 		if  err  :=  c .prowJobClient .List (c .ctx , pjList , ctrlruntimeclient .InNamespace (c .config ().ProwJobNamespace )); err  !=  nil  {
318- 			c .logger .WithField ("duration" , time .Since (start ).String ()).Debug ("Failed to list ProwJobs from the cluster." )
319- 			return  err 
320- 		}
321- 		c .logger .WithField ("duration" , time .Since (start ).String ()).Debug ("Listed ProwJobs from the cluster." )
322- 		pjs  =  pjList .Items 
323- 
324336		if  label  :=  c .config ().Tide .BlockerLabel ; label  !=  ""  {
325337			c .logger .Debugf ("Searching for blocking issues (label %q)." , label )
326338			orgExcepts , repos  :=  c .config ().Tide .Queries .OrgExceptionsAndRepos ()
@@ -336,7 +348,7 @@ func (c *Controller) Sync() error {
336348		}
337349	}
338350	// Partition PRs into subpools and filter out non-pool PRs. 
339- 	rawPools , err  :=  c .dividePool (prs ,  pjs )
351+ 	rawPools , err  :=  c .dividePool (prs )
340352	if  err  !=  nil  {
341353		return  err 
342354	}
@@ -1382,7 +1394,7 @@ func poolKey(org, repo, branch string) string {
13821394
13831395// dividePool splits up the list of pull requests and prow jobs into a group 
13841396// per repo and branch. It only keeps ProwJobs that match the latest branch. 
1385- func  (c  * Controller ) dividePool (pool  map [string ]PullRequest ,  pjs  []prowapi. ProwJob ) (map [string ]* subpool , error ) {
1397+ func  (c  * Controller ) dividePool (pool  map [string ]PullRequest ) (map [string ]* subpool , error ) {
13861398	sps  :=  make (map [string ]* subpool )
13871399	for  _ , pr  :=  range  pool  {
13881400		org  :=  string (pr .Repository .Owner .Login )
@@ -1410,15 +1422,19 @@ func (c *Controller) dividePool(pool map[string]PullRequest, pjs []prowapi.ProwJ
14101422		}
14111423		sps [fn ].prs  =  append (sps [fn ].prs , pr )
14121424	}
1413- 	for  _ , pj  :=  range  pjs  {
1414- 		if  pj .Spec .Type  !=  prowapi .PresubmitJob  &&  pj .Spec .Type  !=  prowapi .BatchJob  {
1415- 			continue 
1416- 		}
1417- 		fn  :=  poolKey (pj .Spec .Refs .Org , pj .Spec .Refs .Repo , pj .Spec .Refs .BaseRef )
1418- 		if  sps [fn ] ==  nil  ||  pj .Spec .Refs .BaseSHA  !=  sps [fn ].sha  {
1419- 			continue 
1425+ 
1426+ 	for  subpoolkey , sp  :=  range  sps  {
1427+ 		pjs  :=  & prowapi.ProwJobList {}
1428+ 		err  :=  c .prowJobClient .List (
1429+ 			c .ctx ,
1430+ 			pjs ,
1431+ 			ctrlruntimeclient .MatchingField (cacheIndexName , cacheIndexKey (sp .org , sp .repo , sp .branch , sp .sha )),
1432+ 			ctrlruntimeclient .InNamespace (c .config ().ProwJobNamespace ))
1433+ 		if  err  !=  nil  {
1434+ 			return  nil , fmt .Errorf ("failed to list jobs for subpool %s: %v" , subpoolkey , err )
14201435		}
1421- 		sps [fn ].pjs  =  append (sps [fn ].pjs , pj )
1436+ 		c .logger .WithField ("subpool" , subpoolkey ).Debugf ("Found %d prowjobs." , len (pjs .Items ))
1437+ 		sps [subpoolkey ].pjs  =  pjs .Items 
14221438	}
14231439	return  sps , nil 
14241440}
@@ -1571,3 +1587,28 @@ func orgRepoQueryString(orgs, repos []string, orgExceptions map[string]sets.Stri
15711587	}
15721588	return  strings .Join (toks , " " )
15731589}
1590+ 
1591+ // cacheIndexName is the name of the index that indexes presubmit+batch ProwJobs by 
1592+ // org+repo+branch+baseSHA. Use the cacheIndexKey func to get the correct key. 
1593+ const  cacheIndexName  =  "tide-global-index" 
1594+ 
1595+ // cacheIndexKey returns the index key for the tideCacheIndex 
1596+ func  cacheIndexKey (org , repo , branch , baseSHA  string ) string  {
1597+ 	return  fmt .Sprintf ("%s/%s:%s@%s" , org , repo , branch , baseSHA )
1598+ }
1599+ 
1600+ func  cacheIndexFunc (obj  runtime.Object ) []string  {
1601+ 	pj , ok  :=  obj .(* prowapi.ProwJob )
1602+ 	// Should never happen 
1603+ 	if  ! ok  {
1604+ 		return  nil 
1605+ 	}
1606+ 	// We do not care about jobs other than presubmit and batch 
1607+ 	if  pj .Spec .Type  !=  prowapi .PresubmitJob  &&  pj .Spec .Type  !=  prowapi .BatchJob  {
1608+ 		return  nil 
1609+ 	}
1610+ 	if  pj .Spec .Refs  ==  nil  {
1611+ 		return  nil 
1612+ 	}
1613+ 	return  []string {cacheIndexKey (pj .Spec .Refs .Org , pj .Spec .Refs .Repo , pj .Spec .Refs .BaseRef , pj .Spec .Refs .BaseSHA )}
1614+ }
0 commit comments