diff --git a/pkg/controller/operators/catalog/og_source_provider.go b/pkg/controller/operators/catalog/og_source_provider.go new file mode 100644 index 0000000000..4045a279ae --- /dev/null +++ b/pkg/controller/operators/catalog/og_source_provider.go @@ -0,0 +1,82 @@ +package catalog + +import ( + "context" + "fmt" + + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" +) + +type OperatorGroupToggleSourceProvider struct { + sp cache.SourceProvider + logger *logrus.Logger + ogLister v1listers.OperatorGroupLister +} + +func NewOperatorGroupToggleSourceProvider(sp cache.SourceProvider, logger *logrus.Logger, + ogLister v1listers.OperatorGroupLister) *OperatorGroupToggleSourceProvider { + return &OperatorGroupToggleSourceProvider{ + sp: sp, + logger: logger, + ogLister: ogLister, + } +} + +const exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution" + +func (e *OperatorGroupToggleSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source { + // Check if annotation is set first + resolutionNamespaces, err := e.CheckForExclusion(namespaces...) + if err != nil { + e.logger.Errorf("error checking namespaces %#v for global resolution exlusion: %s", namespaces, err) + // Fail early with a dummy Source that returns an error + // TODO: Update the Sources interface to return an error + m := make(map[cache.SourceKey]cache.Source) + key := cache.SourceKey{Name: "operatorgroup-unavailable", Namespace: namespaces[0]} + source := errorSource{err} + m[key] = source + return m + } + return e.sp.Sources(resolutionNamespaces...) +} + +type errorSource struct { + error +} + +func (e errorSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { + return nil, e.error +} + +func (e *OperatorGroupToggleSourceProvider) CheckForExclusion(namespaces ...string) ([]string, error) { + var defaultResult = namespaces + // The first namespace provided is always the current namespace being synced + var ownNamespace = namespaces[0] + var toggledResult = []string{ownNamespace} + + // Check the OG on the NS provided for the exclusion annotation + ogs, err := e.ogLister.OperatorGroups(ownNamespace).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", ownNamespace, err) + } + + if len(ogs) != 1 { + // Problem with the operatorgroup configuration in the namespace, or the operatorgroup has not yet been persisted + // Note: a resync will be triggered if/when the operatorgroup becomes available + return nil, fmt.Errorf("found %d operatorgroups in namespace %s: expected 1", len(ogs), ownNamespace) + } + + var og = ogs[0] + if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" { + // Exclusion specified + // Ignore the globalNamespace for the purposes of resolution in this namespace + e.logger.Printf("excluding global catalogs from resolution in namespace %s", ownNamespace) + return toggledResult, nil + } + + return defaultResult, nil +} diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index c70582ed16..0432a7adce 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -42,6 +42,7 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/operator-framework/api/pkg/operators/reference" + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" @@ -118,7 +119,7 @@ type Operator struct { bundleUnpackTimeout time.Duration clientFactory clients.Factory muInstallPlan sync.Mutex - resolverSourceProvider *resolver.RegistrySourceProvider + sourceInvalidator *resolver.RegistrySourceProvider } type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error) @@ -191,9 +192,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo clientFactory: clients.NewFactory(config), } op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState) - op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger) + op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger) + resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister()) op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient) - res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger) + res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger) op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure) // Wire OLM CR sharedIndexInformers @@ -259,7 +261,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups() op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister()) - if err := op.RegisterInformer(operatorGroupInformer.Informer()); err != nil { + ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ogs") + op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue) + operatorGroupQueueInformer, err := queueinformer.NewQueueInformer( + ctx, + queueinformer.WithLogger(op.logger), + queueinformer.WithQueue(ogQueue), + queueinformer.WithInformer(operatorGroupInformer.Informer()), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroups).ToSyncer()), + ) + if err != nil { + return nil, err + } + if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil { return nil, err } @@ -475,7 +489,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { switch state.State { case connectivity.Ready: - o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key)) + o.sourceInvalidator.Invalidate(resolvercache.SourceKey(state.Key)) if o.namespace == state.Key.Namespace { namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer, state.Key.Name, state.Key.Namespace) @@ -1085,6 +1099,20 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return nil } +// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup +// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation +func (o *Operator) syncOperatorGroups(obj interface{}) error { + og, ok := obj.(*operatorsv1.OperatorGroup) + if !ok { + o.logger.Debugf("wrong type: %#v", obj) + return fmt.Errorf("casting OperatorGroup failed") + } + + o.nsResolveQueue.Add(og.GetNamespace()) + + return nil +} + func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool { if sub.Status.InstallPlanRef != nil && sub.Status.State == v1alpha1.SubscriptionStateUpgradePending { logger.Debugf("skipping update: installplan already created") diff --git a/pkg/controller/registry/resolver/cache/cache.go b/pkg/controller/registry/resolver/cache/cache.go index a53b40230d..a42b45246f 100644 --- a/pkg/controller/registry/resolver/cache/cache.go +++ b/pkg/controller/registry/resolver/cache/cache.go @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error { err := snapshot.err snapshot.m.RUnlock() if err != nil { - errs = append(errs, fmt.Errorf("error using catalog %s (in namespace %s): %w", key.Name, key.Namespace, err)) + errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err)) } } return errors.NewAggregate(errs) diff --git a/pkg/controller/registry/resolver/cache/cache_test.go b/pkg/controller/registry/resolver/cache/cache_test.go index fc4b630c2d..42800fb5d8 100644 --- a/pkg/controller/registry/resolver/cache/cache_test.go +++ b/pkg/controller/registry/resolver/cache/cache_test.go @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) { key: ErrorSource{Error: errors.New("testing")}, }) - require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalog dummyname (in namespace dummynamespace): testing") + require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing") } diff --git a/pkg/controller/registry/resolver/step_resolver_test.go b/pkg/controller/registry/resolver/step_resolver_test.go index f4a8c3a9a2..c75da4affc 100644 --- a/pkg/controller/registry/resolver/step_resolver_test.go +++ b/pkg/controller/registry/resolver/step_resolver_test.go @@ -1182,7 +1182,7 @@ func TestResolver(t *testing.T) { steps: [][]*v1alpha1.Step{}, subs: []*v1alpha1.Subscription{}, errAssert: func(t *testing.T, err error) { - assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv") + assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv catsrc-namespace/a.v1") assert.Contains(t, err.Error(), "in phase Failed instead of Replacing") }, }, diff --git a/test/e2e/catalog_exclusion_test.go b/test/e2e/catalog_exclusion_test.go new file mode 100644 index 0000000000..8d0bb1b408 --- /dev/null +++ b/test/e2e/catalog_exclusion_test.go @@ -0,0 +1,133 @@ +package e2e + +import ( + "context" + "path/filepath" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" + "github.com/operator-framework/api/pkg/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/test/e2e/ctx" + "github.com/operator-framework/operator-lifecycle-manager/test/e2e/util" + . "github.com/operator-framework/operator-lifecycle-manager/test/e2e/util/gomega" + "google.golang.org/grpc/connectivity" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const magicCatalogDir = "magiccatalog" + +var _ = Describe("Global Catalog Exclusion", func() { + var ( + testNamespace corev1.Namespace + determinedE2eClient *util.DeterminedE2EClient + operatorGroup operatorsv1.OperatorGroup + localCatalog *MagicCatalog + ) + + BeforeEach(func() { + determinedE2eClient = util.NewDeterminedClient(ctx.Ctx().E2EClient()) + + By("creating a namespace with an own namespace operator group without annotations") + e2eTestNamespace := genName("global-catalog-exclusion-e2e-") + operatorGroup = operatorsv1.OperatorGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: e2eTestNamespace, + Name: genName("og-"), + Annotations: nil, + }, + Spec: operatorsv1.OperatorGroupSpec{ + TargetNamespaces: []string{e2eTestNamespace}, + }, + } + testNamespace = SetupGeneratedTestNamespaceWithOperatorGroup(e2eTestNamespace, operatorGroup) + + By("creating a broken catalog in the global namespace") + globalCatalog := &v1alpha1.CatalogSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: genName("bad-global-catalog-"), + Namespace: operatorNamespace, + }, + Spec: v1alpha1.CatalogSourceSpec{ + DisplayName: "Broken Global Catalog Source", + SourceType: v1alpha1.SourceTypeGrpc, + Address: "1.1.1.1:1337", // points to non-existing service + }, + } + _ = determinedE2eClient.Create(context.Background(), globalCatalog) + + By("creating a healthy catalog in the test namespace") + localCatalogName := genName("good-catsrc-") + var err error = nil + + fbcPath := filepath.Join(testdataDir, magicCatalogDir, "fbc_initial.yaml") + localCatalog, err = NewMagicCatalogFromFile(determinedE2eClient, testNamespace.GetName(), localCatalogName, fbcPath) + Expect(err).To(Succeed()) + + // deploy catalog blocks until the catalog has reached a ready state or fails + Expect(localCatalog.DeployCatalog(context.Background())).To(Succeed()) + + By("checking that the global catalog is broken") + // Adding this check here to speed up the test + // the global catalog can fail while we wait for the local catalog to get to a ready state + EventuallyResource(globalCatalog).Should(HaveGrpcConnectionWithLastConnectionState(connectivity.TransientFailure)) + }) + + AfterEach(func() { + TeardownNamespace(testNamespace.GetName()) + }) + + When("a subscription referring to the local catalog is created", func() { + var subscription *v1alpha1.Subscription + + BeforeEach(func() { + subscription = &v1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace.GetName(), + Name: genName("local-subscription-"), + }, + Spec: &v1alpha1.SubscriptionSpec{ + CatalogSource: localCatalog.GetName(), + CatalogSourceNamespace: localCatalog.GetNamespace(), + Package: "packageA", + Channel: "stable", + InstallPlanApproval: v1alpha1.ApprovalAutomatic, + }, + } + + By("creating a subscription") + _ = determinedE2eClient.Create(context.Background(), subscription) + }) + + When("the operator group is annotated with olm.operatorframework.io/exclude-global-namespace-resolution=true", func() { + + It("the broken subscription should resolve and have state AtLatest", func() { + By("checking that the subscription is not resolving and has a condition with type ResolutionFailed") + EventuallyResource(subscription).Should(ContainSubscriptionConditionOfType(v1alpha1.SubscriptionResolutionFailed)) + + By("annotating the operator group with olm.operatorframework.io/exclude-global-namespace-resolution=true") + Eventually(func() error { + annotatedOperatorGroup := operatorGroup.DeepCopy() + if err := determinedE2eClient.Get(context.Background(), k8scontrollerclient.ObjectKeyFromObject(annotatedOperatorGroup), annotatedOperatorGroup); err != nil { + return err + } + + if annotatedOperatorGroup.Annotations == nil { + annotatedOperatorGroup.Annotations = map[string]string{} + } + + annotatedOperatorGroup.Annotations["olm.operatorframework.io/exclude-global-namespace-resolution"] = "true" + if err := determinedE2eClient.Update(context.Background(), annotatedOperatorGroup); err != nil { + return err + } + return nil + }).Should(Succeed()) + + By("checking that the subscription resolves and has state AtLatest") + EventuallyResource(subscription).Should(HaveSubscriptionState(v1alpha1.SubscriptionStateAtLatest)) + }) + }) + }) +})