Skip to content

Commit 52ec611

Browse files
authored
Refactor watchers - Create separate type for namespaced informers (#3238)
* Create separate type for namespaced watchers * Create separate type for namespaced watchers in certmanager controller * Create separate type for namespaced watchers in extdns controller * Fix case where namespace is not watching secrets
1 parent 42dd0d1 commit 52ec611

File tree

12 files changed

+573
-592
lines changed

12 files changed

+573
-592
lines changed

internal/certmanager/cm_controller.go

Lines changed: 60 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,15 @@ const (
5656
// and creates/ updates certificates for VS resources as required,
5757
// and VS resources when certificate objects are created/ updated
5858
type CmController struct {
59-
vsLister []listers_v1.VirtualServerLister
60-
sync SyncFn
61-
ctx context.Context
62-
mustSync []cache.InformerSynced
63-
queue workqueue.RateLimitingInterface
64-
vsSharedInformerFactory []vsinformers.SharedInformerFactory
65-
cmSharedInformerFactory []cm_informers.SharedInformerFactory
66-
kubeSharedInformerFactory []kubeinformers.SharedInformerFactory
67-
recorder record.EventRecorder
68-
cmClient *cm_clientset.Clientset
59+
sync SyncFn
60+
ctx context.Context
61+
mustSync []cache.InformerSynced
62+
queue workqueue.RateLimitingInterface
63+
informerGroup map[string]*namespacedInformer
64+
recorder record.EventRecorder
65+
cmClient *cm_clientset.Clientset
66+
kubeClient kubernetes.Interface
67+
vsClient k8s_nginx.Interface
6968
}
7069

7170
// CmOpts is the options required for building the CmController
@@ -78,27 +77,42 @@ type CmOpts struct {
7877
vsClient k8s_nginx.Interface
7978
}
8079

80+
type namespacedInformer struct {
81+
vsSharedInformerFactory vsinformers.SharedInformerFactory
82+
cmSharedInformerFactory cm_informers.SharedInformerFactory
83+
kubeSharedInformerFactory kubeinformers.SharedInformerFactory
84+
vsLister listers_v1.VirtualServerLister
85+
cmLister cmlisters.CertificateLister
86+
}
87+
8188
func (c *CmController) register() workqueue.RateLimitingInterface {
82-
var cmLister []cmlisters.CertificateLister
83-
for _, sif := range c.vsSharedInformerFactory {
84-
c.vsLister = append(c.vsLister, sif.K8s().V1().VirtualServers().Lister())
85-
sif.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{
86-
Queue: c.queue,
87-
})
88-
c.mustSync = append(c.mustSync, sif.K8s().V1().VirtualServers().Informer().HasSynced)
89-
}
89+
c.sync = SyncFnFor(c.recorder, c.cmClient, c.informerGroup)
90+
return c.queue
91+
}
9092

91-
for _, cif := range c.cmSharedInformerFactory {
92-
cif.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
93-
WorkFunc: certificateHandler(c.queue),
94-
})
95-
cmLister = append(cmLister, cif.Certmanager().V1().Certificates().Lister())
96-
c.mustSync = append(c.mustSync, cif.Certmanager().V1().Certificates().Informer().HasSynced)
97-
}
93+
func (c *CmController) newNamespacedInformer(ns string) {
94+
nsi := &namespacedInformer{}
95+
nsi.cmSharedInformerFactory = cm_informers.NewSharedInformerFactoryWithOptions(c.cmClient, resyncPeriod, cm_informers.WithNamespace(ns))
96+
nsi.kubeSharedInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(c.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns))
97+
nsi.vsSharedInformerFactory = vsinformers.NewSharedInformerFactoryWithOptions(c.vsClient, resyncPeriod, vsinformers.WithNamespace(ns))
9898

99-
c.sync = SyncFnFor(c.recorder, c.cmClient, cmLister)
99+
c.addHandlers(nsi)
100100

101-
return c.queue
101+
c.informerGroup[ns] = nsi
102+
}
103+
104+
func (c *CmController) addHandlers(nsi *namespacedInformer) {
105+
nsi.vsLister = nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Lister()
106+
nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{
107+
Queue: c.queue,
108+
})
109+
c.mustSync = append(c.mustSync, nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().HasSynced)
110+
111+
nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
112+
WorkFunc: certificateHandler(c.queue),
113+
})
114+
nsi.cmLister = nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Lister()
115+
c.mustSync = append(c.mustSync, nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().HasSynced)
102116
}
103117

104118
func (c *CmController) processItem(ctx context.Context, key string) error {
@@ -108,14 +122,11 @@ func (c *CmController) processItem(ctx context.Context, key string) error {
108122
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
109123
return err
110124
}
125+
nsi := getNamespacedInformer(namespace, c.informerGroup)
111126

112127
var vs *conf_v1.VirtualServer
113-
for _, vl := range c.vsLister {
114-
vs, err = vl.VirtualServers(namespace).Get(name)
115-
if err == nil {
116-
break
117-
}
118-
}
128+
vs, err = nsi.vsLister.VirtualServers(namespace).Get(name)
129+
119130
if err != nil {
120131
return err
121132
}
@@ -168,25 +179,22 @@ func NewCmController(opts *CmOpts) *CmController {
168179
// Create a cert-manager api client
169180
intcl, _ := cm_clientset.NewForConfig(opts.kubeConfig)
170181

171-
var vsSharedInformerFactory []vsinformers.SharedInformerFactory
172-
var cmSharedInformerFactory []cm_informers.SharedInformerFactory
173-
var kubeSharedInformerFactory []kubeinformers.SharedInformerFactory
182+
ig := make(map[string]*namespacedInformer)
174183

175-
for _, ns := range opts.namespace {
176-
cmSharedInformerFactory = append(cmSharedInformerFactory, cm_informers.NewSharedInformerFactoryWithOptions(intcl, resyncPeriod, cm_informers.WithNamespace(ns)))
177-
kubeSharedInformerFactory = append(kubeSharedInformerFactory, kubeinformers.NewSharedInformerFactoryWithOptions(opts.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns)))
178-
vsSharedInformerFactory = append(vsSharedInformerFactory, vsinformers.NewSharedInformerFactoryWithOptions(opts.vsClient, resyncPeriod, vsinformers.WithNamespace(ns)))
184+
cm := &CmController{
185+
ctx: opts.context,
186+
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
187+
informerGroup: ig,
188+
recorder: opts.eventRecorder,
189+
cmClient: intcl,
190+
kubeClient: opts.kubeClient,
191+
vsClient: opts.vsClient,
179192
}
180193

181-
cm := &CmController{
182-
ctx: opts.context,
183-
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
184-
cmSharedInformerFactory: cmSharedInformerFactory,
185-
kubeSharedInformerFactory: kubeSharedInformerFactory,
186-
recorder: opts.eventRecorder,
187-
cmClient: intcl,
188-
vsSharedInformerFactory: vsSharedInformerFactory,
194+
for _, ns := range opts.namespace {
195+
cm.newNamespacedInformer(ns)
189196
}
197+
190198
cm.register()
191199
return cm
192200
}
@@ -201,14 +209,10 @@ func (c *CmController) Run(stopCh <-chan struct{}) {
201209

202210
glog.Infof("Starting cert-manager control loop")
203211

204-
for _, vif := range c.vsSharedInformerFactory {
205-
go vif.Start(c.ctx.Done())
206-
}
207-
for _, cif := range c.cmSharedInformerFactory {
208-
go cif.Start(c.ctx.Done())
209-
}
210-
for _, kif := range c.kubeSharedInformerFactory {
211-
go kif.Start(c.ctx.Done())
212+
for _, ig := range c.informerGroup {
213+
go ig.vsSharedInformerFactory.Start(c.ctx.Done())
214+
go ig.cmSharedInformerFactory.Start(c.ctx.Done())
215+
go ig.kubeSharedInformerFactory.Start(c.ctx.Done())
212216
}
213217
// // wait for all the informer caches we depend on are synced
214218
glog.V(3).Infof("Waiting for %d caches to sync", len(c.mustSync))

internal/certmanager/cm_controller_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,16 @@ import (
2323

2424
cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
2525
cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
26-
cm_informers "github.com/cert-manager/cert-manager/pkg/client/informers/externalversions"
2726
controllerpkg "github.com/cert-manager/cert-manager/pkg/controller"
2827
testpkg "github.com/nginxinc/kubernetes-ingress/internal/certmanager/test_files"
2928
"github.com/stretchr/testify/assert"
3029
"github.com/stretchr/testify/require"
3130
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3231
"k8s.io/apimachinery/pkg/runtime"
33-
kubeinformers "k8s.io/client-go/informers"
3432
"k8s.io/client-go/util/workqueue"
3533

3634
vsapi "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1"
3735
k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned"
38-
vsinformers "github.com/nginxinc/kubernetes-ingress/pkg/client/informers/externalversions"
3936
)
4037

4138
func Test_controller_Register(t *testing.T) {
@@ -138,15 +135,27 @@ func Test_controller_Register(t *testing.T) {
138135
// Certificate event is received then HasSynced has not been setup
139136
// properly.
140137

138+
ig := make(map[string]*namespacedInformer)
139+
140+
nsi := &namespacedInformer{
141+
cmSharedInformerFactory: b.Context.SharedInformerFactory,
142+
kubeSharedInformerFactory: b.Context.KubeSharedInformerFactory,
143+
vsSharedInformerFactory: b.VsSharedInformerFactory,
144+
}
145+
146+
ig[""] = nsi
147+
141148
cm := &CmController{
142-
ctx: b.RootContext,
143-
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
144-
cmSharedInformerFactory: []cm_informers.SharedInformerFactory{b.FakeCMInformerFactory()},
145-
kubeSharedInformerFactory: []kubeinformers.SharedInformerFactory{b.FakeKubeInformerFactory()},
146-
recorder: b.Recorder,
147-
vsSharedInformerFactory: []vsinformers.SharedInformerFactory{b.VsSharedInformerFactory},
149+
ctx: b.RootContext,
150+
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
151+
informerGroup: ig,
152+
recorder: b.Recorder,
153+
kubeClient: b.Client,
154+
vsClient: b.VSClient,
148155
}
149156

157+
cm.addHandlers(nsi)
158+
150159
queue := cm.register()
151160

152161
b.Start()

internal/certmanager/helper.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,21 @@ func translateVsSpec(crt *cmapi.Certificate, vsCmSpec *vsapi.CertManager) error
120120
}
121121
return nil
122122
}
123+
124+
func getNamespacedInformer(ns string, ig map[string]*namespacedInformer) *namespacedInformer {
125+
var nsi *namespacedInformer
126+
var isGlobalNs bool
127+
var exists bool
128+
129+
nsi, isGlobalNs = ig[""]
130+
131+
if !isGlobalNs {
132+
// get the correct namespaced informers
133+
nsi, exists = ig[ns]
134+
if !exists {
135+
// we are not watching this namespace
136+
return nil
137+
}
138+
}
139+
return nsi
140+
}

internal/certmanager/sync.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type SyncFn func(context.Context, *vsapi.VirtualServer) error
6060
func SyncFnFor(
6161
rec record.EventRecorder,
6262
cmClient clientset.Interface,
63-
cmLister []cmlisters.CertificateLister,
63+
ig map[string]*namespacedInformer,
6464
) SyncFn {
6565
return func(ctx context.Context, vs *vsapi.VirtualServer) error {
6666
var err error
@@ -75,7 +75,9 @@ func SyncFnFor(
7575
return err
7676
}
7777

78-
newCrts, updateCrts, err := buildCertificates(cmLister, vs, issuerName, issuerKind, issuerGroup)
78+
nsi := getNamespacedInformer(vs.GetNamespace(), ig)
79+
80+
newCrts, updateCrts, err := buildCertificates(nsi.cmLister, vs, issuerName, issuerKind, issuerGroup)
7981
if err != nil {
8082
glog.Errorf("Incorrect cert-manager configuration for VirtualServer resource: %v", err)
8183
rec.Eventf(vs, corev1.EventTypeWarning, reasonBadConfig, "Incorrect cert-manager configuration for VirtualServer resource: %s",
@@ -106,12 +108,8 @@ func SyncFnFor(
106108
}
107109
var certs []*cmapi.Certificate
108110

109-
for _, cl := range cmLister {
110-
certs, err = cl.Certificates(vs.GetNamespace()).List(labels.Everything())
111-
if len(certs) > 0 {
112-
break
113-
}
114-
}
111+
certs, err = nsi.cmLister.Certificates(vs.GetNamespace()).List(labels.Everything())
112+
115113
if err != nil {
116114
return err
117115
}
@@ -131,7 +129,7 @@ func SyncFnFor(
131129
}
132130

133131
func buildCertificates(
134-
cmLister []cmlisters.CertificateLister,
132+
cmLister cmlisters.CertificateLister,
135133
vs *vsapi.VirtualServer,
136134
issuerName, issuerKind, issuerGroup string,
137135
) (newCert, update []*cmapi.Certificate, _ error) {
@@ -140,12 +138,8 @@ func buildCertificates(
140138
var existingCrt *cmapi.Certificate
141139
var err error
142140

143-
for _, cl := range cmLister {
144-
existingCrt, err = cl.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret)
145-
if err == nil {
146-
break
147-
}
148-
}
141+
existingCrt, err = cmLister.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret)
142+
149143
if !apierrors.IsNotFound(err) && err != nil {
150144
return nil, nil, err
151145
}

internal/certmanager/sync_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
2525
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
26-
cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1"
2726
"github.com/cert-manager/cert-manager/test/unit/gen"
2827
"github.com/stretchr/testify/assert"
2928
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -491,7 +490,19 @@ func TestSync(t *testing.T) {
491490
}
492491
b.Init()
493492
defer b.Stop()
494-
sync := SyncFnFor(b.Recorder, b.CMClient, []cmlisters.CertificateLister{b.SharedInformerFactory.Certmanager().V1().Certificates().Lister()})
493+
494+
ig := make(map[string]*namespacedInformer)
495+
496+
nsi := &namespacedInformer{
497+
cmSharedInformerFactory: b.FakeCMInformerFactory(),
498+
kubeSharedInformerFactory: b.FakeKubeInformerFactory(),
499+
vsSharedInformerFactory: b.VsSharedInformerFactory,
500+
cmLister: b.SharedInformerFactory.Certmanager().V1().Certificates().Lister(),
501+
}
502+
503+
ig[""] = nsi
504+
505+
sync := SyncFnFor(b.Recorder, b.CMClient, ig)
495506
b.Start()
496507

497508
err := sync(context.Background(), &test.VirtualServer)

0 commit comments

Comments
 (0)