Skip to content

✨ Add an internal package that implements a dynamic caching layer for ClusterExtension managed content #1001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions internal/contentmanager/contentmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package contentmanager

import (
"context"
"errors"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/operator-framework/operator-controller/api/v1alpha1"
oclabels "github.com/operator-framework/operator-controller/internal/labels"
)

type Watcher interface {
// Watch will establish watches for resources owned by a ClusterExtension
Watch(context.Context, controller.Controller, *v1alpha1.ClusterExtension, []client.Object) error
// Unwatch will remove watches for a ClusterExtension
Unwatch(*v1alpha1.ClusterExtension)
}

type RestConfigMapper func(context.Context, client.Object, *rest.Config) (*rest.Config, error)

type extensionCacheData struct {
Cache cache.Cache
Cancel context.CancelFunc
}

type instance struct {
rcm RestConfigMapper
baseCfg *rest.Config
extensionCaches map[string]extensionCacheData
mapper meta.RESTMapper
mu *sync.Mutex
}

// New creates a new ContentManager object
func New(rcm RestConfigMapper, cfg *rest.Config, mapper meta.RESTMapper) Watcher {
return &instance{
rcm: rcm,
baseCfg: cfg,
extensionCaches: make(map[string]extensionCacheData),
mapper: mapper,
mu: &sync.Mutex{},
}
}

// buildScheme builds a runtime.Scheme based on the provided client.Objects,
// with all GroupVersionKinds mapping to the unstructured.Unstructured type
// (unstructured.UnstructuredList for list kinds).
//
// If a provided client.Object does not set a Version or Kind field in its
// GroupVersionKind, an error will be returned.
func buildScheme(objs []client.Object) (*runtime.Scheme, error) {
scheme := runtime.NewScheme()
// The ClusterExtension types must be added to the scheme since its
// going to be used to establish watches that trigger reconciliation
// of the owning ClusterExtension
if err := v1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("adding operator controller APIs to scheme: %w", err)
}

for _, obj := range objs {
gvk := obj.GetObjectKind().GroupVersionKind()

// If the Kind or Version is not set in an object's GroupVersionKind
// attempting to add it to the runtime.Scheme will result in a panic.
// To avoid panics, we are doing the validation and returning early
// with an error if any objects are provided with a missing Kind or Version
// field
if gvk.Kind == "" {
return nil, fmt.Errorf(
"adding %s to scheme; object Kind is not defined",
obj.GetName(),
)
}

if gvk.Version == "" {
return nil, fmt.Errorf(
"adding %s to scheme; object Version is not defined",
obj.GetName(),
)
}

listKind := gvk.Kind + "List"

if !scheme.Recognizes(gvk) {
// Since we can't have a mapping to every possible Go type in existence
// based on the GVK we need to use the unstructured types for mapping
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
ul := &unstructured.UnstructuredList{}
ul.SetGroupVersionKind(gvk.GroupVersion().WithKind(listKind))

scheme.AddKnownTypeWithName(gvk, u)
scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(listKind), ul)
// Adding the common meta schemas to the scheme for the GroupVersion
// is necessary to ensure the scheme is aware of the different operations
// that can be performed against the resources in this GroupVersion
metav1.AddToGroupVersion(scheme, gvk.GroupVersion())
}
}

return scheme, nil
}

// Watch configures a controller-runtime cache.Cache and establishes watches for the provided resources.
// It utilizes the provided ClusterExtension to set a DefaultLabelSelector on the cache.Cache
// to ensure it is only caching and reacting to content that belongs to the ClusterExtension.
// For each client.Object provided, a new source.Kind is created and used in a call to the Watch() method
// of the provided controller.Controller to establish new watches for the managed resources.
func (i *instance) Watch(ctx context.Context, ctrl controller.Controller, ce *v1alpha1.ClusterExtension, objs []client.Object) error {
if len(objs) == 0 || ce == nil || ctrl == nil {
return nil
}

cfg, err := i.rcm(ctx, ce, i.baseCfg)
if err != nil {
return fmt.Errorf("getting rest.Config for ClusterExtension %q: %w", ce.Name, err)
}

scheme, err := buildScheme(objs)
if err != nil {
return fmt.Errorf("building scheme for ClusterExtension %q: %w", ce.GetName(), err)
}

tgtLabels := labels.Set{
oclabels.OwnerKindKey: v1alpha1.ClusterExtensionKind,
oclabels.OwnerNameKey: ce.GetName(),
}

c, err := cache.New(cfg, cache.Options{
Scheme: scheme,
DefaultLabelSelector: tgtLabels.AsSelector(),
})
if err != nil {
return fmt.Errorf("creating cache for ClusterExtension %q: %w", ce.Name, err)
}

for _, obj := range objs {
err = ctrl.Watch(
source.Kind(
c,
obj,
handler.TypedEnqueueRequestForOwner[client.Object](
scheme,
i.mapper,
ce,
),
),
)
if err != nil {
return fmt.Errorf("creating watch for ClusterExtension %q managed resource %s: %w", ce.Name, obj.GetObjectKind().GroupVersionKind(), err)
}
}

// TODO: Instead of stopping the existing cache and replacing it every time
// we should stop the informers that are no longer required
// and create any new ones as necessary. To keep the initial pass
// simple, we are going to keep this as is and optimize in a follow-up.
// Doing this in a follow-up gives us the opportunity to verify that this functions
// as expected when wired up in the ClusterExtension reconciler before going too deep
// in optimizations.
Comment on lines +167 to +173
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be done in a follow-up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we hooked this up as is, what would end up triggering a cache teardown and re-creation? Would it be just when we call actionclient.Upgrade? Or would we do it even after actionclient.Reconcile?

I think if it is just for an upgrade, we can probably get by because that would only happen when the resolved bundle changes or (eventually when we support it) the user-supplied configuration changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hooked up as is, I would assume we only call this whenever the identified state is one of Install or Upgrade

Copy link
Contributor Author

@everettraven everettraven Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, by identified state I mean the same state that is used in a few other places in the reconcile function of the ClusterExtensionReconciler, so I imagine something like what we have for preflights here:

switch state {
case stateNeedsInstall:
err := preflight.Install(ctx, desiredRel)
if err != nil {
setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err))
return ctrl.Result{}, err
}
case stateNeedsUpgrade:
err := preflight.Upgrade(ctx, desiredRel)
if err != nil {
setInstalledStatusConditionFailed(ext, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonInstallationFailed, err))
return ctrl.Result{}, err
}
}

i.mu.Lock()
if extCache, ok := i.extensionCaches[ce.GetName()]; ok {
extCache.Cancel()
}

cacheCtx, cancel := context.WithCancel(context.Background())
i.extensionCaches[ce.Name] = extensionCacheData{
Cache: c,
Cancel: cancel,
}
i.mu.Unlock()

go func() {
err := c.Start(cacheCtx)
if err != nil {
i.Unwatch(ce)
}
}()

if !c.WaitForCacheSync(cacheCtx) {
i.Unwatch(ce)
return errors.New("cache could not sync, it has been stopped and removed")
}

return nil
}

// Unwatch will stop the cache for the provided ClusterExtension
// stopping any watches on managed content
func (i *instance) Unwatch(ce *v1alpha1.ClusterExtension) {
if ce == nil {
return
}

i.mu.Lock()
if extCache, ok := i.extensionCaches[ce.GetName()]; ok {
extCache.Cancel()
delete(i.extensionCaches, ce.GetName())
}
i.mu.Unlock()
}
Loading
Loading