Skip to content

✨ Include network policy for all configmap and grpc catalogsources #3568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc/connectivity"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
Expand All @@ -38,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
networkingv1applyconfigurations "k8s.io/client-go/applyconfigurations/networking/v1"
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -600,6 +602,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}
}

// Wire NetworkPolicies
networkPolicyInformer := k8sInformerFactory.Networking().V1().NetworkPolicies()
op.lister.NetworkingV1().RegisterNetworkPolicyLister(metav1.NamespaceAll, networkPolicyInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, networkPolicyInformer.Informer())

networkPoliciesGVR := networkingv1.SchemeGroupVersion.WithResource("networkpolicies")
if err := labelObjects(networkPoliciesGVR, networkPolicyInformer.Informer(), labeller.ObjectLabeler[*networkingv1.NetworkPolicy, *networkingv1applyconfigurations.NetworkPolicyApplyConfiguration](
ctx, op.logger, labeller.Filter(networkPoliciesGVR),
networkPolicyInformer.Lister().List,
networkingv1applyconfigurations.NetworkPolicy,
func(namespace string, ctx context.Context, cfg *networkingv1applyconfigurations.NetworkPolicyApplyConfiguration, opts metav1.ApplyOptions) (*networkingv1.NetworkPolicy, error) {
return op.opClient.KubernetesInterface().NetworkingV1().NetworkPolicies(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Pods for CatalogSource
catsrcReq, err := labels.NewRequirement(reconciler.CatalogSourceLabelKey, selection.Exists, nil)
if err != nil {
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"gopkg.in/yaml.v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
Expand Down Expand Up @@ -1276,6 +1277,10 @@ func TestSyncCatalogSources(t *testing.T) {
pod(t, *grpcCatalog),
service(grpcCatalog.GetName(), grpcCatalog.GetNamespace()),
serviceAccount(grpcCatalog.GetName(), grpcCatalog.GetNamespace(), "", objectReference("init secret")),
networkPolicy(grpcCatalog, map[string]string{
reconciler.CatalogSourceLabelKey: grpcCatalog.GetName(),
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
}),
},
existingSources: []sourceAddress{
{
Expand Down Expand Up @@ -2128,14 +2133,25 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
serviceInformer := factory.Core().V1().Services()
podInformer := factory.Core().V1().Pods()
configMapInformer := factory.Core().V1().ConfigMaps()
sharedInformers = append(sharedInformers, roleInformer.Informer(), roleBindingInformer.Informer(), serviceAccountInformer.Informer(), serviceInformer.Informer(), podInformer.Informer(), configMapInformer.Informer())
networkPolicyInformer := factory.Networking().V1().NetworkPolicies()

sharedInformers = append(sharedInformers,
roleInformer.Informer(),
roleBindingInformer.Informer(),
serviceAccountInformer.Informer(),
serviceInformer.Informer(),
podInformer.Informer(),
configMapInformer.Informer(),
networkPolicyInformer.Informer(),
)

lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, podInformer.Lister())
lister.CoreV1().RegisterConfigMapLister(metav1.NamespaceAll, configMapInformer.Lister())
lister.NetworkingV1().RegisterNetworkPolicyLister(metav1.NamespaceAll, networkPolicyInformer.Lister())
logger := logrus.New()

// Create the new operator
Expand Down Expand Up @@ -2319,6 +2335,10 @@ func configMap(name, namespace string) *corev1.ConfigMap {
}
}

func networkPolicy(catSrc *v1alpha1.CatalogSource, matchLabels map[string]string) *networkingv1.NetworkPolicy {
return reconciler.DesiredRegistryNetworkPolicy(catSrc, matchLabels)
}

func objectReference(name string) *corev1.ObjectReference {
if name == "" {
return &corev1.ObjectReference{}
Expand Down
43 changes: 43 additions & 0 deletions pkg/controller/registry/reconciler/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -121,6 +122,9 @@ func (s *configMapCatalogSourceDecorator) Pod(image string, defaultPodSecurityCo
ownerutil.AddOwner(pod, s.CatalogSource, false, true)
return pod, nil
}
func (s *configMapCatalogSourceDecorator) NetworkPolicy() *networkingv1.NetworkPolicy {
return DesiredRegistryNetworkPolicy(s.CatalogSource, s.Labels())
}

func (s *configMapCatalogSourceDecorator) ServiceAccount() *corev1.ServiceAccount {
sa := &corev1.ServiceAccount{
Expand Down Expand Up @@ -210,6 +214,16 @@ func (c *ConfigMapRegistryReconciler) currentService(source configMapCatalogSour
return service, nil
}

func (c *ConfigMapRegistryReconciler) currentNetworkPolicy(source configMapCatalogSourceDecorator) *networkingv1.NetworkPolicy {
npName := source.NetworkPolicy().GetName()
np, err := c.Lister.NetworkingV1().NetworkPolicyLister().NetworkPolicies(source.GetNamespace()).Get(npName)
if err != nil {
logrus.WithField("networkPolicy", npName).WithError(err).Debug("couldn't find network policy in cache")
return nil
}
return np
}

func (c *ConfigMapRegistryReconciler) currentServiceAccount(source configMapCatalogSourceDecorator) *corev1.ServiceAccount {
serviceAccountName := source.ServiceAccount().GetName()
serviceAccount, err := c.Lister.CoreV1().ServiceAccountLister().ServiceAccounts(source.GetNamespace()).Get(serviceAccountName)
Expand Down Expand Up @@ -328,6 +342,9 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry,
}

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensureNetworkPolicy(source); err != nil {
return pkgerrors.Wrapf(err, "error ensuring network policy: %s", source.GetName())
}
if err := c.ensureServiceAccount(source, overwrite); err != nil {
return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
}
Expand Down Expand Up @@ -365,6 +382,20 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry,
return nil
}

func (c *ConfigMapRegistryReconciler) ensureNetworkPolicy(source configMapCatalogSourceDecorator) error {
networkPolicy := source.NetworkPolicy()
if currentNetworkPolicy := c.currentNetworkPolicy(source); currentNetworkPolicy != nil {
if sanitizedDeepEqual(networkPolicy, currentNetworkPolicy) {
return nil
}
if err := c.OpClient.DeleteNetworkPolicy(networkPolicy.GetNamespace(), networkPolicy.GetName(), metav1.NewDeleteOptions(0)); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
_, err := c.OpClient.CreateNetworkPolicy(networkPolicy)
return err
}

func (c *ConfigMapRegistryReconciler) ensureServiceAccount(source configMapCatalogSourceDecorator, overwrite bool) error {
serviceAccount := source.ServiceAccount()
if c.currentServiceAccount(source) != nil {
Expand Down Expand Up @@ -497,6 +528,18 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(logger *logrus.Entry,
// Check on registry resources
// TODO: more complex checks for resources
// TODO: add gRPC health check
np := c.currentNetworkPolicy(source)
if np == nil {
logger.Error("registry service not healthy: could not get network policy")
healthy = false
return
}
if !sanitizedDeepEqual(source.NetworkPolicy(), np) {
logger.Error("registry service not healthy: unexpected network policy")
healthy = false
return
}

service, err := c.currentService(source)
if err != nil {
return false, err
Expand Down
31 changes: 31 additions & 0 deletions pkg/controller/registry/reconciler/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -83,6 +84,7 @@ func fakeReconcilerFactory(t *testing.T, stopc <-chan struct{}, options ...fakeR
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
configMapInformer := informerFactory.Core().V1().ConfigMaps()
networkPolicyInformer := informerFactory.Networking().V1().NetworkPolicies()

registryInformers := []cache.SharedIndexInformer{
roleInformer.Informer(),
Expand All @@ -91,6 +93,7 @@ func fakeReconcilerFactory(t *testing.T, stopc <-chan struct{}, options ...fakeR
serviceInformer.Informer(),
podInformer.Informer(),
configMapInformer.Informer(),
networkPolicyInformer.Informer(),
}

lister := operatorlister.NewLister()
Expand All @@ -100,6 +103,7 @@ func fakeReconcilerFactory(t *testing.T, stopc <-chan struct{}, options ...fakeR
lister.CoreV1().RegisterServiceLister(testNamespace, serviceInformer.Lister())
lister.CoreV1().RegisterPodLister(testNamespace, podInformer.Lister())
lister.CoreV1().RegisterConfigMapLister(testNamespace, configMapInformer.Lister())
lister.NetworkingV1().RegisterNetworkPolicyLister(testNamespace, networkPolicyInformer.Lister())

rec := &registryReconcilerFactory{
now: config.now,
Expand Down Expand Up @@ -195,6 +199,7 @@ func objectsForCatalogSource(t *testing.T, catsrc *v1alpha1.CatalogSource) []run
switch catsrc.Spec.SourceType {
case v1alpha1.SourceTypeInternal, v1alpha1.SourceTypeConfigmap:
decorated := configMapCatalogSourceDecorator{catsrc, runAsUser}
np := decorated.NetworkPolicy()
service, err := decorated.Service()
if err != nil {
t.Fatal(err)
Expand All @@ -205,13 +210,15 @@ func objectsForCatalogSource(t *testing.T, catsrc *v1alpha1.CatalogSource) []run
t.Fatal(err)
}
objs = append(objs,
np,
pod,
service,
serviceAccount,
)
case v1alpha1.SourceTypeGrpc:
if catsrc.Spec.Image != "" {
decorated := grpcCatalogSourceDecorator{CatalogSource: catsrc, createPodAsUser: runAsUser, opmImage: ""}
np := decorated.NetworkPolicy()
serviceAccount := decorated.ServiceAccount()
service, err := decorated.Service()
if err != nil {
Expand All @@ -222,6 +229,7 @@ func objectsForCatalogSource(t *testing.T, catsrc *v1alpha1.CatalogSource) []run
t.Fatal(err)
}
objs = append(objs,
np,
pod,
service,
serviceAccount,
Expand Down Expand Up @@ -342,6 +350,24 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
},
},
},
{
testName: "ExistingRegistry/BadNetworkPolicy",
in: in{
cluster: cluster{
k8sObjs: append(setLabel(objectsForCatalogSource(t, validCatalogSource), &networkingv1.NetworkPolicy{}, CatalogSourceLabelKey, "wrongValue"), validConfigMap),
},
catsrc: validCatalogSource,
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
CreatedAt: now(),
Protocol: "grpc",
ServiceName: "cool-catalog",
ServiceNamespace: testNamespace,
Port: "50051",
},
},
},
{
testName: "ExistingRegistry/BadServiceAccount",
in: in{
Expand Down Expand Up @@ -504,6 +530,11 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
require.Equal(t, pod.GetLabels(), outPod.GetLabels())
require.Equal(t, pod.Spec, outPod.Spec)

np := decorated.NetworkPolicy()
outNp, err := client.KubernetesInterface().NetworkingV1().NetworkPolicies(np.GetNamespace()).Get(context.TODO(), np.GetName(), metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, np, outNp)

service, err := decorated.Service()
require.NoError(t, err)
outService, err := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(context.TODO(), service.GetName(), metav1.GetOptions{})
Expand Down
45 changes: 45 additions & 0 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -102,6 +103,10 @@ func (s *grpcCatalogSourceDecorator) Service() (*corev1.Service, error) {
return svc, nil
}

func (s *grpcCatalogSourceDecorator) NetworkPolicy() *networkingv1.NetworkPolicy {
return DesiredRegistryNetworkPolicy(s.CatalogSource, s.Labels())
}

func (s *grpcCatalogSourceDecorator) ServiceAccount() *corev1.ServiceAccount {
var secrets []corev1.LocalObjectReference
blockOwnerDeletion := true
Expand Down Expand Up @@ -153,6 +158,16 @@ type GrpcRegistryReconciler struct {

var _ RegistryReconciler = &GrpcRegistryReconciler{}

func (c *GrpcRegistryReconciler) currentNetworkPolicy(source grpcCatalogSourceDecorator) *networkingv1.NetworkPolicy {
npName := source.NetworkPolicy().GetName()
np, err := c.Lister.NetworkingV1().NetworkPolicyLister().NetworkPolicies(source.GetNamespace()).Get(npName)
if err != nil {
logrus.WithField("networkPolicy", npName).WithError(err).Debug("couldn't find network policy in cache")
return nil
}
return np
}

func (c *GrpcRegistryReconciler) currentService(source grpcCatalogSourceDecorator) (*corev1.Service, error) {
protoService, err := source.Service()
if err != nil {
Expand Down Expand Up @@ -261,6 +276,11 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata
}

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensureNetworkPolicy(source); err != nil {
logger.WithError(err).Error("error ensuring registry server: could not ensure registry network policy")
return pkgerrors.Wrapf(err, "error ensuring network policy: %s", source.GetName())
}

sa, err := c.ensureSA(source)
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.WithError(err).Error("error ensuring registry server: could not ensure registry service account")
Expand Down Expand Up @@ -467,6 +487,20 @@ func (c *GrpcRegistryReconciler) ensureService(source grpcCatalogSourceDecorator
return err
}

func (c *GrpcRegistryReconciler) ensureNetworkPolicy(source grpcCatalogSourceDecorator) error {
networkPolicy := source.NetworkPolicy()
if currentNetworkPolicy := c.currentNetworkPolicy(source); currentNetworkPolicy != nil {
if sanitizedDeepEqual(networkPolicy, currentNetworkPolicy) {
return nil
}
if err := c.OpClient.DeleteNetworkPolicy(networkPolicy.GetNamespace(), networkPolicy.GetName(), metav1.NewDeleteOptions(0)); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
_, err := c.OpClient.CreateNetworkPolicy(networkPolicy)
return err
}

func (c *GrpcRegistryReconciler) ensureSA(source grpcCatalogSourceDecorator) (*corev1.ServiceAccount, error) {
sa := source.ServiceAccount()
if _, err := c.OpClient.CreateServiceAccount(sa); err != nil {
Expand Down Expand Up @@ -606,6 +640,17 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal

// Check on registry resources
// TODO: add gRPC health check
currentNetworkPolicy := c.currentNetworkPolicy(source)
if currentNetworkPolicy == nil {
logger.Error("registry service not healthy: could not get network policy")
return false, nil
}
expectedNetworkPolicy := source.NetworkPolicy()
if !sanitizedDeepEqual(expectedNetworkPolicy, currentNetworkPolicy) {
logger.Error("registry service not healthy: unexpected network policy")
return false, nil
}

service, err := c.currentService(source)
if err != nil {
logger.WithError(err).Error("registry service not healthy: could not get current service")
Expand Down
Loading