Skip to content

Commit 0b03e11

Browse files
committed
add a dynamic caching layer for clusterextension managed content
Signed-off-by: everettraven <[email protected]>
1 parent 10ebfdc commit 0b03e11

File tree

2 files changed

+421
-0
lines changed

2 files changed

+421
-0
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package contentmanager
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
9+
"k8s.io/apimachinery/pkg/api/meta"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12+
"k8s.io/apimachinery/pkg/labels"
13+
"k8s.io/apimachinery/pkg/runtime"
14+
"k8s.io/client-go/rest"
15+
"sigs.k8s.io/controller-runtime/pkg/cache"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
"sigs.k8s.io/controller-runtime/pkg/controller"
18+
"sigs.k8s.io/controller-runtime/pkg/handler"
19+
"sigs.k8s.io/controller-runtime/pkg/source"
20+
21+
"github.com/operator-framework/operator-controller/api/v1alpha1"
22+
oclabels "github.com/operator-framework/operator-controller/internal/labels"
23+
)
24+
25+
type Watcher interface {
26+
// Watch will establish watches for resources owned by a ClusterExtension
27+
Watch(context.Context, controller.Controller, *v1alpha1.ClusterExtension, []client.Object) error
28+
// Unwatch will remove watches for a ClusterExtension
29+
Unwatch(*v1alpha1.ClusterExtension)
30+
}
31+
32+
type RestConfigMapper func(context.Context, client.Object, *rest.Config) (*rest.Config, error)
33+
34+
type extensionCacheData struct {
35+
Cache cache.Cache
36+
Cancel context.CancelFunc
37+
}
38+
39+
type instance struct {
40+
rcm RestConfigMapper
41+
baseCfg *rest.Config
42+
extensionCaches map[string]extensionCacheData
43+
mapper meta.RESTMapper
44+
mu *sync.Mutex
45+
}
46+
47+
// New creates a new ContentManager object
48+
func New(rcm RestConfigMapper, cfg *rest.Config, mapper meta.RESTMapper) Watcher {
49+
return &instance{
50+
rcm: rcm,
51+
baseCfg: cfg,
52+
extensionCaches: make(map[string]extensionCacheData),
53+
mapper: mapper,
54+
mu: &sync.Mutex{},
55+
}
56+
}
57+
58+
// buildScheme builds a runtime.Scheme based on the provided client.Objects,
59+
// with all GroupVersionKinds mapping to the unstructured.Unstructured type
60+
// (unstructured.UnstructuredList for list kinds).
61+
//
62+
// If a provided client.Object does not set a Version or Kind field in its
63+
// GroupVersionKind, an error will be returned.
64+
func buildScheme(objs []client.Object) (*runtime.Scheme, error) {
65+
scheme := runtime.NewScheme()
66+
// The ClusterExtension types must be added to the scheme since its
67+
// going to be used to establish watches that trigger reconciliation
68+
// of the owning ClusterExtension
69+
if err := v1alpha1.AddToScheme(scheme); err != nil {
70+
return nil, fmt.Errorf("adding operator controller APIs to scheme: %w", err)
71+
}
72+
73+
for _, obj := range objs {
74+
gvk := obj.GetObjectKind().GroupVersionKind()
75+
76+
// If the Kind or Version is not set in an object's GroupVersionKind
77+
// attempting to add it to the runtime.Scheme will result in a panic.
78+
// To avoid panics, we are doing the validation and returning early
79+
// with an error if any objects are provided with a missing Kind or Version
80+
// field
81+
if gvk.Kind == "" {
82+
return nil, fmt.Errorf(
83+
"adding %s to scheme; object Kind is not defined",
84+
obj.GetName(),
85+
)
86+
}
87+
88+
if gvk.Version == "" {
89+
return nil, fmt.Errorf(
90+
"adding %s to scheme; object Version is not defined",
91+
obj.GetName(),
92+
)
93+
}
94+
95+
listKind := gvk.Kind + "List"
96+
97+
if !scheme.Recognizes(gvk) {
98+
// Since we can't have a mapping to every possible Go type in existence
99+
// based on the GVK we need to use the unstructured types for mapping
100+
u := &unstructured.Unstructured{}
101+
u.SetGroupVersionKind(gvk)
102+
ul := &unstructured.UnstructuredList{}
103+
ul.SetGroupVersionKind(gvk.GroupVersion().WithKind(listKind))
104+
105+
scheme.AddKnownTypeWithName(gvk, u)
106+
scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(listKind), ul)
107+
// Adding the common meta schemas to the scheme for the GroupVersion
108+
// is necessary to ensure the scheme is aware of the different operations
109+
// that can be performed against the resources in this GroupVersion
110+
metav1.AddToGroupVersion(scheme, gvk.GroupVersion())
111+
}
112+
}
113+
114+
return scheme, nil
115+
}
116+
117+
// Watch configures a controller-runtime cache.Cache and establishes watches for the provided resources.
118+
// It utilizes the provided ClusterExtension to set a DefaultLabelSelector on the cache.Cache
119+
// to ensure it is only caching and reacting to content that belongs to the ClusterExtension.
120+
// For each client.Object provided, a new source.Kind is created and used in a call to the Watch() method
121+
// of the provided controller.Controller to establish new watches for the managed resources.
122+
func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1alpha1.ClusterExtension, objs []client.Object) error {
123+
if len(objs) == 0 || ce == nil || ctrl == nil {
124+
return nil
125+
}
126+
127+
cfg, err := i.rcm(ctx, ce, i.baseCfg)
128+
if err != nil {
129+
return fmt.Errorf("getting rest.Config for ClusterExtension %q: %w", ce.Name, err)
130+
}
131+
132+
scheme, err := buildScheme(objs)
133+
if err != nil {
134+
return fmt.Errorf("building scheme for ClusterExtension %q: %w", ce.GetName(), err)
135+
}
136+
137+
tgtLabels := labels.Set{
138+
oclabels.OwnerKindKey: v1alpha1.ClusterExtensionKind,
139+
oclabels.OwnerNameKey: ce.GetName(),
140+
}
141+
142+
c, err := cache.New(cfg, cache.Options{
143+
Scheme: scheme,
144+
DefaultLabelSelector: tgtLabels.AsSelector(),
145+
})
146+
if err != nil {
147+
return fmt.Errorf("creating cache for ClusterExtension %q: %w", ce.Name, err)
148+
}
149+
150+
for _, obj := range objs {
151+
err = ctrl.Watch(
152+
source.Kind(
153+
c,
154+
obj,
155+
handler.TypedEnqueueRequestForOwner[client.Object](
156+
scheme,
157+
i.mapper,
158+
ce,
159+
),
160+
),
161+
)
162+
if err != nil {
163+
return fmt.Errorf("creating watch for ClusterExtension %q managed resource %s: %w", ce.Name, obj.GetObjectKind().GroupVersionKind(), err)
164+
}
165+
}
166+
167+
// TODO: Instead of stopping the existing cache and replacing it every time
168+
// we should stop the informers that are no longer required
169+
// and create any new ones as necessary. To keep the initial pass
170+
// simple, we are going to keep this as is and optimize in a follow-up.
171+
// Doing this in a follow-up gives us the opportunity to verify that this functions
172+
// as expected when wired up in the ClusterExtension reconciler before going too deep
173+
// in optimizations.
174+
i.mu.Lock()
175+
if extCache, ok := i.extensionCaches[ce.GetName()]; ok {
176+
extCache.Cancel()
177+
}
178+
179+
cacheCtx, cancel := context.WithCancel(context.Background())
180+
i.extensionCaches[ce.Name] = extensionCacheData{
181+
Cache: c,
182+
Cancel: cancel,
183+
}
184+
i.mu.Unlock()
185+
186+
go func() {
187+
err := c.Start(cacheCtx)
188+
if err != nil {
189+
i.Unwatch(ce)
190+
}
191+
}()
192+
193+
if !c.WaitForCacheSync(cacheCtx) {
194+
i.Unwatch(ce)
195+
return errors.New("cache could not sync, it has been stopped and removed")
196+
}
197+
198+
return nil
199+
}
200+
201+
// Unwatch will stop the cache for the provided ClusterExtension
202+
// stopping any watches on managed content
203+
func (i *instance) Unwatch(ce *v1alpha1.ClusterExtension) {
204+
if ce == nil {
205+
return
206+
}
207+
208+
i.mu.Lock()
209+
if extCache, ok := i.extensionCaches[ce.GetName()]; ok {
210+
extCache.Cancel()
211+
delete(i.extensionCaches, ce.GetName())
212+
}
213+
i.mu.Unlock()
214+
}

0 commit comments

Comments
 (0)