Skip to content

✨ Plug in unpacker, add Handler #757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions api/v1alpha1/clusterextension_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/operator-framework/operator-controller/internal/conditionsets"
)

var (
ClusterExtensionGVK = SchemeBuilder.GroupVersion.WithKind("ClusterExtension")
ClusterExtensionKind = ClusterExtensionGVK.Kind
)

type UpgradeConstraintPolicy string

const (
Expand Down Expand Up @@ -75,15 +80,18 @@ type ClusterExtensionSpec struct {

const (
// TODO(user): add more Types, here and into init()
TypeInstalled = "Installed"
TypeResolved = "Resolved"
TypeInstalled = "Installed"
TypeResolved = "Resolved"
TypeHasValidBundle = "HasValidBundle"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I really think there should be a blank line after this.

// TypeDeprecated is a rollup condition that is present when
// any of the deprecated conditions are present.
TypeDeprecated = "Deprecated"
TypePackageDeprecated = "PackageDeprecated"
TypeChannelDeprecated = "ChannelDeprecated"
TypeBundleDeprecated = "BundleDeprecated"

ReasonErrorGettingClient = "ErrorGettingClient"
ReasonBundleLoadFailed = "BundleLoadFailed"
ReasonBundleLookupFailed = "BundleLookupFailed"
ReasonInstallationFailed = "InstallationFailed"
ReasonInstallationStatusUnknown = "InstallationStatusUnknown"
Expand Down
46 changes: 42 additions & 4 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package main

import (
"flag"
"fmt"
"net/http"
"net/url"
"os"
"time"

Expand All @@ -42,6 +44,10 @@ import (
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
"github.com/operator-framework/operator-controller/internal/controllers"
"github.com/operator-framework/operator-controller/internal/rukpak/handler"
"github.com/operator-framework/operator-controller/internal/rukpak/source"
"github.com/operator-framework/operator-controller/internal/rukpak/storage"
"github.com/operator-framework/operator-controller/internal/rukpak/util"
"github.com/operator-framework/operator-controller/pkg/features"
)

Expand All @@ -63,17 +69,25 @@ func init() {

func main() {
var (
metricsAddr string
enableLeaderElection bool
probeAddr string
cachePath string
metricsAddr string
enableLeaderElection bool
probeAddr string
cachePath string
httpExternalAddr string
systemNamespace string
unpackImage string
provisionerStorageDirectory string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.StringVar(&httpExternalAddr, "http-external-address", "http://localhost:8080", "The external address at which the http server is reachable.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching")
flag.StringVar(&systemNamespace, "system-namespace", "", "Configures the namespace that gets used to deploy system resources.")
flag.StringVar(&unpackImage, "unpack-image", util.DefaultUnpackImage, "Configures the container image that gets used to unpack Bundle contents.")
flag.StringVar(&provisionerStorageDirectory, "provisioner-storage-dir", storage.DefaultBundleCacheDir, "The directory that is used to store bundle contents.")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -122,11 +136,35 @@ func main() {
setupLog.Error(err, "unable to create helm client")
}

if systemNamespace == "" {
systemNamespace = util.PodNamespace()
}

unpacker, err := source.NewDefaultUnpacker(mgr, systemNamespace, unpackImage)
if err != nil {
setupLog.Error(err, "unable to create unpacker")
}

storageURL, err := url.Parse(fmt.Sprintf("%s/bundles/", httpExternalAddr))
if err != nil {
setupLog.Error(err, "unable to parse bundle content server URL")
os.Exit(1)
}

localStorage := &storage.LocalDirectory{
RootDirectory: provisionerStorageDirectory,
URL: *storageURL,
}

if err = (&controllers.ClusterExtensionReconciler{
Client: cl,
ReleaseNamespace: systemNamespace,
BundleProvider: catalogClient,
Scheme: mgr.GetScheme(),
ActionClientGetter: acg,
Unpacker: unpacker,
Storage: localStorage,
Handler: handler.HandlerFunc(handler.HandleClusterExtension),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension")
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
gopkg.in/yaml.v2 v2.4.0
helm.sh/helm/v3 v3.14.3
k8s.io/api v0.29.3
k8s.io/apiextensions-apiserver v0.29.3
k8s.io/apimachinery v0.29.3
k8s.io/cli-runtime v0.29.2
k8s.io/client-go v0.29.3
k8s.io/component-base v0.29.3
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
Expand Down Expand Up @@ -201,9 +203,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
helm.sh/helm/v3 v3.14.3 // indirect
k8s.io/apiserver v0.29.3 // indirect
k8s.io/cli-runtime v0.29.2 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240221221325-2ac9dc51f3f1 // indirect
k8s.io/kubectl v0.29.2 // indirect
Expand Down
110 changes: 103 additions & 7 deletions internal/controllers/clusterextension_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,35 @@ limitations under the License.
package controllers

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sort"
"strings"

"github.com/operator-framework/operator-controller/internal/rukpak/handler"
"github.com/operator-framework/operator-controller/internal/rukpak/util"
"helm.sh/helm/v3/pkg/postrender"

mmsemver "github.com/Masterminds/semver/v3"
bsemver "github.com/blang/semver/v4"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
crhandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand All @@ -51,13 +59,19 @@ import (
catalogfilter "github.com/operator-framework/operator-controller/internal/catalogmetadata/filter"
catalogsort "github.com/operator-framework/operator-controller/internal/catalogmetadata/sort"
rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api"
"github.com/operator-framework/operator-controller/internal/rukpak/source"
"github.com/operator-framework/operator-controller/internal/rukpak/storage"
)

// ClusterExtensionReconciler reconciles a ClusterExtension object
type ClusterExtensionReconciler struct {
client.Client
ReleaseNamespace string
BundleProvider BundleProvider
Unpacker source.Unpacker
ActionClientGetter helmclient.ActionClientGetter
Storage storage.Storage
Handler handler.Handler
Scheme *runtime.Scheme
}

Expand Down Expand Up @@ -137,10 +151,62 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
// Considering only image source.

// Generate a BundleSource, and then pass this and the ClusterExtension to Unpack
// TODO:
// bs := r.GenerateExpectedBundleSource(*ext, bundle.Image)
// unpacker := NewDefaultUnpacker(msg, namespace, unpackImage)
// unpacker..Unpack(bs, ext)
bs := r.GenerateExpectedBundleSource(*ext, bundle.Image)
unpackResult, err := r.Unpacker.Unpack(ctx, bs, ext)
if err != nil {
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("source bundle content: %v", err))
}

switch unpackResult.State {
case source.StatePending:
updateStatusUnpackPending(&ext.Status, unpackResult)
// There must be a limit to number of entries if status is stuck at
// unpack pending.
return ctrl.Result{}, nil
case source.StateUnpacking:
updateStatusUnpacking(&ext.Status, unpackResult)
return ctrl.Result{}, nil
case source.StateUnpacked:
if err := r.Storage.Store(ctx, ext, unpackResult.Bundle); err != nil {
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("persist bundle content: %v", err))
}
contentURL, err := r.Storage.URLFor(ctx, ext)
if err != nil {
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("get content URL: %v", err))
}
updateStatusUnpacked(&ext.Status, unpackResult, contentURL)
default:
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("unknown unpack state %q: %v", unpackResult.State, err))
}

bundleFS, err := r.Storage.Load(ctx, ext)
if err != nil {
meta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{
Type: ocv1alpha1.TypeHasValidBundle,
Status: metav1.ConditionFalse,
Reason: ocv1alpha1.ReasonBundleLoadFailed,
Message: err.Error(),
})
return ctrl.Result{}, err
}

_, _, err = r.Handler.Handle(ctx, bundleFS, ext)
if err != nil {
meta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{
Type: rukpakv1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: rukpakv1alpha2.ReasonInstallFailed,
Message: err.Error(),
})
return ctrl.Result{}, err
}

ext.SetNamespace(r.ReleaseNamespace)
_, err = r.ActionClientGetter.ActionClientFor(ext)
if err != nil {
setInstalledStatusConditionFailed(&ext.Status.Conditions, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonErrorGettingClient, err), ext.Generation)
return ctrl.Result{}, err
}

// set the status of the cluster extension based on the respective bundle deployment status conditions.
return ctrl.Result{}, nil
Expand Down Expand Up @@ -286,7 +352,7 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&ocv1alpha1.ClusterExtension{}).
Watches(&catalogd.Catalog{},
handler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))).
crhandler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))).
Owns(&rukpakv1alpha2.BundleDeployment{}).
Complete(r)

Expand All @@ -297,7 +363,7 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

// Generate reconcile requests for all cluster extensions affected by a catalog change
func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) handler.MapFunc {
func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) crhandler.MapFunc {
return func(ctx context.Context, _ client.Object) []reconcile.Request {
// no way of associating an extension to a catalog so create reconcile requests for everything
clusterExtensions := ocv1alpha1.ClusterExtensionList{}
Expand Down Expand Up @@ -417,3 +483,33 @@ func (r *ClusterExtensionReconciler) getInstalledVersion(ctx context.Context, cl
}
return existingVersionSemver, nil
}

type postrenderer struct {
labels map[string]string
cascade postrender.PostRenderer
}

func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) {
var buf bytes.Buffer
dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024)
for {
obj := unstructured.Unstructured{}
err := dec.Decode(&obj)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels))
b, err := obj.MarshalJSON()
if err != nil {
return nil, err
}
buf.Write(b)
}
if p.cascade != nil {
return p.cascade.Run(&buf)
}
return &buf, nil
}
Comment on lines +486 to +515
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for? Something moving forward?

74 changes: 74 additions & 0 deletions internal/controllers/clusterextension_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api"
"github.com/operator-framework/operator-controller/internal/rukpak/source"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func updateStatusUnpackFailing(status *ocv1alpha1.ClusterExtensionStatus, err error) error {
status.ResolvedBundle = nil
status.InstalledBundle = nil
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: rukpakapi.TypeUnpacked,
Status: metav1.ConditionFalse,
Reason: rukpakapi.ReasonUnpackFailed,
Message: err.Error(),
})
return err
}

// TODO: verify if we need to update the installBundle status or leave it as is.
func updateStatusUnpackPending(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result) {
status.ResolvedBundle = nil
status.InstalledBundle = nil
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: rukpakapi.TypeUnpacked,
Status: metav1.ConditionFalse,
Reason: rukpakapi.ReasonUnpackPending,
Message: result.Message,
})
}

// TODO: verify if we need to update the installBundle status or leave it as is.
func updateStatusUnpacking(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result) {
status.ResolvedBundle = nil
status.InstalledBundle = nil
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: rukpakapi.TypeUnpacked,
Status: metav1.ConditionFalse,
Reason: rukpakapi.ReasonUnpacking,
Message: result.Message,
})
}

func updateStatusUnpacked(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result, contentURL string) {
// TODO: Expose content URL through CE status.
status.ResolvedBundle = &ocv1alpha1.BundleMetadata{
Name: result.ResolvedSource.Image.Ref,
}
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: rukpakapi.TypeUnpacked,
Status: metav1.ConditionTrue,
Reason: rukpakapi.ReasonUnpackSuccessful,
Message: result.Message,
})
}
Loading