diff --git a/api/v1alpha1/clusterextension_types.go b/api/v1alpha1/clusterextension_types.go index 0d7f0b83d..2e00e7485 100644 --- a/api/v1alpha1/clusterextension_types.go +++ b/api/v1alpha1/clusterextension_types.go @@ -22,6 +22,11 @@ import ( "github.com/operator-framework/operator-controller/internal/conditionsets" ) +var ( + ClusterExtensionGVK = SchemeBuilder.GroupVersion.WithKind("ClusterExtension") + ClusterExtensionKind = ClusterExtensionGVK.Kind +) + type UpgradeConstraintPolicy string const ( @@ -75,8 +80,9 @@ type ClusterExtensionSpec struct { const ( // TODO(user): add more Types, here and into init() - TypeInstalled = "Installed" - TypeResolved = "Resolved" + TypeInstalled = "Installed" + TypeResolved = "Resolved" + TypeHasValidBundle = "HasValidBundle" // TypeDeprecated is a rollup condition that is present when // any of the deprecated conditions are present. TypeDeprecated = "Deprecated" @@ -84,6 +90,8 @@ const ( TypeChannelDeprecated = "ChannelDeprecated" TypeBundleDeprecated = "BundleDeprecated" + ReasonErrorGettingClient = "ErrorGettingClient" + ReasonBundleLoadFailed = "BundleLoadFailed" ReasonBundleLookupFailed = "BundleLookupFailed" ReasonInstallationFailed = "InstallationFailed" ReasonInstallationStatusUnknown = "InstallationStatusUnknown" diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7d230b4be..17107866c 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -18,7 +18,9 @@ package main import ( "flag" + "fmt" "net/http" + "net/url" "os" "time" @@ -42,6 +44,10 @@ import ( "github.com/operator-framework/operator-controller/internal/catalogmetadata/cache" catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" "github.com/operator-framework/operator-controller/internal/controllers" + "github.com/operator-framework/operator-controller/internal/rukpak/handler" + "github.com/operator-framework/operator-controller/internal/rukpak/source" + "github.com/operator-framework/operator-controller/internal/rukpak/storage" + "github.com/operator-framework/operator-controller/internal/rukpak/util" "github.com/operator-framework/operator-controller/pkg/features" ) @@ -63,17 +69,25 @@ func init() { func main() { var ( - metricsAddr string - enableLeaderElection bool - probeAddr string - cachePath string + metricsAddr string + enableLeaderElection bool + probeAddr string + cachePath string + httpExternalAddr string + systemNamespace string + unpackImage string + provisionerStorageDirectory string ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.StringVar(&httpExternalAddr, "http-external-address", "http://localhost:8080", "The external address at which the http server is reachable.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching") + flag.StringVar(&systemNamespace, "system-namespace", "", "Configures the namespace that gets used to deploy system resources.") + flag.StringVar(&unpackImage, "unpack-image", util.DefaultUnpackImage, "Configures the container image that gets used to unpack Bundle contents.") + flag.StringVar(&provisionerStorageDirectory, "provisioner-storage-dir", storage.DefaultBundleCacheDir, "The directory that is used to store bundle contents.") opts := zap.Options{ Development: true, } @@ -122,11 +136,35 @@ func main() { setupLog.Error(err, "unable to create helm client") } + if systemNamespace == "" { + systemNamespace = util.PodNamespace() + } + + unpacker, err := source.NewDefaultUnpacker(mgr, systemNamespace, unpackImage) + if err != nil { + setupLog.Error(err, "unable to create unpacker") + } + + storageURL, err := url.Parse(fmt.Sprintf("%s/bundles/", httpExternalAddr)) + if err != nil { + setupLog.Error(err, "unable to parse bundle content server URL") + os.Exit(1) + } + + localStorage := &storage.LocalDirectory{ + RootDirectory: provisionerStorageDirectory, + URL: *storageURL, + } + if err = (&controllers.ClusterExtensionReconciler{ Client: cl, + ReleaseNamespace: systemNamespace, BundleProvider: catalogClient, Scheme: mgr.GetScheme(), ActionClientGetter: acg, + Unpacker: unpacker, + Storage: localStorage, + Handler: handler.HandlerFunc(handler.HandleClusterExtension), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension") os.Exit(1) diff --git a/go.mod b/go.mod index 1ab679785..78cf9321d 100644 --- a/go.mod +++ b/go.mod @@ -22,9 +22,11 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240213143201-ec583247a57a gopkg.in/yaml.v2 v2.4.0 + helm.sh/helm/v3 v3.14.3 k8s.io/api v0.29.3 k8s.io/apiextensions-apiserver v0.29.3 k8s.io/apimachinery v0.29.3 + k8s.io/cli-runtime v0.29.2 k8s.io/client-go v0.29.3 k8s.io/component-base v0.29.3 k8s.io/utils v0.0.0-20240102154912-e7106e64919e @@ -201,9 +203,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - helm.sh/helm/v3 v3.14.3 // indirect k8s.io/apiserver v0.29.3 // indirect - k8s.io/cli-runtime v0.29.2 // indirect k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240221221325-2ac9dc51f3f1 // indirect k8s.io/kubectl v0.29.2 // indirect diff --git a/internal/controllers/clusterextension_controller.go b/internal/controllers/clusterextension_controller.go index cc61206a6..2b86afe56 100644 --- a/internal/controllers/clusterextension_controller.go +++ b/internal/controllers/clusterextension_controller.go @@ -17,27 +17,35 @@ limitations under the License. package controllers import ( + "bytes" "context" "errors" "fmt" + "io" "sort" "strings" + "github.com/operator-framework/operator-controller/internal/rukpak/handler" + "github.com/operator-framework/operator-controller/internal/rukpak/util" + "helm.sh/helm/v3/pkg/postrender" + mmsemver "github.com/Masterminds/semver/v3" bsemver "github.com/blang/semver/v4" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" + apimachyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" + crhandler "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -51,13 +59,19 @@ import ( catalogfilter "github.com/operator-framework/operator-controller/internal/catalogmetadata/filter" catalogsort "github.com/operator-framework/operator-controller/internal/catalogmetadata/sort" rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api" + "github.com/operator-framework/operator-controller/internal/rukpak/source" + "github.com/operator-framework/operator-controller/internal/rukpak/storage" ) // ClusterExtensionReconciler reconciles a ClusterExtension object type ClusterExtensionReconciler struct { client.Client + ReleaseNamespace string BundleProvider BundleProvider + Unpacker source.Unpacker ActionClientGetter helmclient.ActionClientGetter + Storage storage.Storage + Handler handler.Handler Scheme *runtime.Scheme } @@ -137,10 +151,62 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp // Considering only image source. // Generate a BundleSource, and then pass this and the ClusterExtension to Unpack - // TODO: - // bs := r.GenerateExpectedBundleSource(*ext, bundle.Image) - // unpacker := NewDefaultUnpacker(msg, namespace, unpackImage) - // unpacker..Unpack(bs, ext) + bs := r.GenerateExpectedBundleSource(*ext, bundle.Image) + unpackResult, err := r.Unpacker.Unpack(ctx, bs, ext) + if err != nil { + return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("source bundle content: %v", err)) + } + + switch unpackResult.State { + case source.StatePending: + updateStatusUnpackPending(&ext.Status, unpackResult) + // There must be a limit to number of entries if status is stuck at + // unpack pending. + return ctrl.Result{}, nil + case source.StateUnpacking: + updateStatusUnpacking(&ext.Status, unpackResult) + return ctrl.Result{}, nil + case source.StateUnpacked: + if err := r.Storage.Store(ctx, ext, unpackResult.Bundle); err != nil { + return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("persist bundle content: %v", err)) + } + contentURL, err := r.Storage.URLFor(ctx, ext) + if err != nil { + return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("get content URL: %v", err)) + } + updateStatusUnpacked(&ext.Status, unpackResult, contentURL) + default: + return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("unknown unpack state %q: %v", unpackResult.State, err)) + } + + bundleFS, err := r.Storage.Load(ctx, ext) + if err != nil { + meta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: ocv1alpha1.TypeHasValidBundle, + Status: metav1.ConditionFalse, + Reason: ocv1alpha1.ReasonBundleLoadFailed, + Message: err.Error(), + }) + return ctrl.Result{}, err + } + + _, _, err = r.Handler.Handle(ctx, bundleFS, ext) + if err != nil { + meta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{ + Type: rukpakv1alpha2.TypeInstalled, + Status: metav1.ConditionFalse, + Reason: rukpakv1alpha2.ReasonInstallFailed, + Message: err.Error(), + }) + return ctrl.Result{}, err + } + + ext.SetNamespace(r.ReleaseNamespace) + _, err = r.ActionClientGetter.ActionClientFor(ext) + if err != nil { + setInstalledStatusConditionFailed(&ext.Status.Conditions, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonErrorGettingClient, err), ext.Generation) + return ctrl.Result{}, err + } // set the status of the cluster extension based on the respective bundle deployment status conditions. return ctrl.Result{}, nil @@ -286,7 +352,7 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error { err := ctrl.NewControllerManagedBy(mgr). For(&ocv1alpha1.ClusterExtension{}). Watches(&catalogd.Catalog{}, - handler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))). + crhandler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))). Owns(&rukpakv1alpha2.BundleDeployment{}). Complete(r) @@ -297,7 +363,7 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error { } // Generate reconcile requests for all cluster extensions affected by a catalog change -func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) handler.MapFunc { +func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) crhandler.MapFunc { return func(ctx context.Context, _ client.Object) []reconcile.Request { // no way of associating an extension to a catalog so create reconcile requests for everything clusterExtensions := ocv1alpha1.ClusterExtensionList{} @@ -417,3 +483,33 @@ func (r *ClusterExtensionReconciler) getInstalledVersion(ctx context.Context, cl } return existingVersionSemver, nil } + +type postrenderer struct { + labels map[string]string + cascade postrender.PostRenderer +} + +func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) { + var buf bytes.Buffer + dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024) + for { + obj := unstructured.Unstructured{} + err := dec.Decode(&obj) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels)) + b, err := obj.MarshalJSON() + if err != nil { + return nil, err + } + buf.Write(b) + } + if p.cascade != nil { + return p.cascade.Run(&buf) + } + return &buf, nil +} diff --git a/internal/controllers/clusterextension_status.go b/internal/controllers/clusterextension_status.go new file mode 100644 index 000000000..92b5ac5d1 --- /dev/null +++ b/internal/controllers/clusterextension_status.go @@ -0,0 +1,74 @@ +/* +Copyright 2023. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" + rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api" + "github.com/operator-framework/operator-controller/internal/rukpak/source" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func updateStatusUnpackFailing(status *ocv1alpha1.ClusterExtensionStatus, err error) error { + status.ResolvedBundle = nil + status.InstalledBundle = nil + meta.SetStatusCondition(&status.Conditions, metav1.Condition{ + Type: rukpakapi.TypeUnpacked, + Status: metav1.ConditionFalse, + Reason: rukpakapi.ReasonUnpackFailed, + Message: err.Error(), + }) + return err +} + +// TODO: verify if we need to update the installBundle status or leave it as is. +func updateStatusUnpackPending(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result) { + status.ResolvedBundle = nil + status.InstalledBundle = nil + meta.SetStatusCondition(&status.Conditions, metav1.Condition{ + Type: rukpakapi.TypeUnpacked, + Status: metav1.ConditionFalse, + Reason: rukpakapi.ReasonUnpackPending, + Message: result.Message, + }) +} + +// TODO: verify if we need to update the installBundle status or leave it as is. +func updateStatusUnpacking(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result) { + status.ResolvedBundle = nil + status.InstalledBundle = nil + meta.SetStatusCondition(&status.Conditions, metav1.Condition{ + Type: rukpakapi.TypeUnpacked, + Status: metav1.ConditionFalse, + Reason: rukpakapi.ReasonUnpacking, + Message: result.Message, + }) +} + +func updateStatusUnpacked(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result, contentURL string) { + // TODO: Expose content URL through CE status. + status.ResolvedBundle = &ocv1alpha1.BundleMetadata{ + Name: result.ResolvedSource.Image.Ref, + } + meta.SetStatusCondition(&status.Conditions, metav1.Condition{ + Type: rukpakapi.TypeUnpacked, + Status: metav1.ConditionTrue, + Reason: rukpakapi.ReasonUnpackSuccessful, + Message: result.Message, + }) +} diff --git a/internal/rukpak/handler/interfaces.go b/internal/rukpak/handler/interfaces.go new file mode 100644 index 000000000..2bdd502af --- /dev/null +++ b/internal/rukpak/handler/interfaces.go @@ -0,0 +1,21 @@ +package handler + +import ( + "context" + "io/fs" + + "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/chartutil" + + ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" +) + +type Handler interface { + Handle(context.Context, fs.FS, *ocv1alpha1.ClusterExtension) (*chart.Chart, chartutil.Values, error) +} + +type HandlerFunc func(context.Context, fs.FS, *ocv1alpha1.ClusterExtension) (*chart.Chart, chartutil.Values, error) + +func (f HandlerFunc) Handle(ctx context.Context, fsys fs.FS, bd *ocv1alpha1.ClusterExtension) (*chart.Chart, chartutil.Values, error) { + return f(ctx, fsys, bd) +} diff --git a/internal/rukpak/handler/registry.go b/internal/rukpak/handler/registry.go new file mode 100644 index 000000000..823b628cf --- /dev/null +++ b/internal/rukpak/handler/registry.go @@ -0,0 +1,104 @@ +package handler + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "io/fs" + "path/filepath" + + "gopkg.in/yaml.v2" + "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/chartutil" + "sigs.k8s.io/controller-runtime/pkg/client" + + ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" + "github.com/operator-framework/operator-controller/internal/rukpak/convert" + "github.com/operator-framework/operator-controller/internal/rukpak/util" +) + +const ( + manifestsDir = "manifests" +) + +func HandleClusterExtension(ctx context.Context, fsys fs.FS, ext *ocv1alpha1.ClusterExtension) (*chart.Chart, chartutil.Values, error) { + plainFS, err := convert.RegistryV1ToPlain(fsys, ext.Spec.WatchNamespaces) + if err != nil { + return nil, nil, fmt.Errorf("convert registry+v1 bundle to plain+v0 bundle: %v", err) + } + if err := ValidateBundle(plainFS); err != nil { + return nil, nil, err + } + + chrt, err := chartFromBundle(plainFS) + if err != nil { + return nil, nil, err + } + return chrt, nil, nil +} + +func ValidateBundle(fsys fs.FS) error { + objects, err := getBundleObjects(fsys) + if err != nil { + return fmt.Errorf("get objects from bundle manifests: %v", err) + } + if len(objects) == 0 { + return errors.New("invalid bundle: found zero objects: plain+v0 bundles are required to contain at least one object") + } + return nil +} + +func getBundleObjects(bundleFS fs.FS) ([]client.Object, error) { + entries, err := fs.ReadDir(bundleFS, manifestsDir) + if err != nil { + return nil, err + } + + var bundleObjects []client.Object + for _, e := range entries { + if e.IsDir() { + return nil, fmt.Errorf("subdirectories are not allowed within the %q directory of the bundle image filesystem: found %q", manifestsDir, filepath.Join(manifestsDir, e.Name())) + } + + manifestObjects, err := getObjects(bundleFS, e) + if err != nil { + return nil, err + } + bundleObjects = append(bundleObjects, manifestObjects...) + } + return bundleObjects, nil +} + +func getObjects(bundle fs.FS, manifest fs.DirEntry) ([]client.Object, error) { + manifestPath := filepath.Join(manifestsDir, manifest.Name()) + manifestReader, err := bundle.Open(manifestPath) + if err != nil { + return nil, err + } + defer manifestReader.Close() + return util.ManifestObjects(manifestReader, manifestPath) +} + +func chartFromBundle(fsys fs.FS) (*chart.Chart, error) { + objects, err := getBundleObjects(fsys) + if err != nil { + return nil, fmt.Errorf("read bundle objects from bundle: %v", err) + } + + chrt := &chart.Chart{ + Metadata: &chart.Metadata{}, + } + for _, obj := range objects { + yamlData, err := yaml.Marshal(obj) + if err != nil { + return nil, err + } + hash := sha256.Sum256(yamlData) + chrt.Templates = append(chrt.Templates, &chart.File{ + Name: fmt.Sprintf("object-%x.yaml", hash[0:8]), + Data: yamlData, + }) + } + return chrt, nil +} diff --git a/internal/rukpak/storage/localdir.go b/internal/rukpak/storage/localdir.go new file mode 100644 index 000000000..e5c38086f --- /dev/null +++ b/internal/rukpak/storage/localdir.go @@ -0,0 +1,87 @@ +package storage + +import ( + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "net/url" + "os" + "path/filepath" + + "github.com/nlepage/go-tarfs" + "github.com/operator-framework/operator-controller/internal/rukpak/util" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ Storage = &LocalDirectory{} + +const DefaultBundleCacheDir = "/var/cache/bundles" + +type LocalDirectory struct { + RootDirectory string + URL url.URL +} + +func (s *LocalDirectory) Load(_ context.Context, owner client.Object) (fs.FS, error) { + bundleFile, err := os.Open(s.bundlePath(owner.GetName())) + if err != nil { + return nil, err + } + defer bundleFile.Close() + tarReader, err := gzip.NewReader(bundleFile) + if err != nil { + return nil, err + } + return tarfs.New(tarReader) +} + +func (s *LocalDirectory) Store(_ context.Context, owner client.Object, bundle fs.FS) error { + buf := &bytes.Buffer{} + if err := util.FSToTarGZ(buf, bundle); err != nil { + return fmt.Errorf("convert bundle %q to tar.gz: %v", owner.GetName(), err) + } + + bundleFile, err := os.Create(s.bundlePath(owner.GetName())) + if err != nil { + return err + } + defer bundleFile.Close() + + if _, err := io.Copy(bundleFile, buf); err != nil { + return err + } + return nil +} + +func (s *LocalDirectory) Delete(_ context.Context, owner client.Object) error { + return ignoreNotExist(os.Remove(s.bundlePath(owner.GetName()))) +} + +func (s *LocalDirectory) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + fsys := &util.FilesOnlyFilesystem{FS: os.DirFS(s.RootDirectory)} + http.StripPrefix(s.URL.Path, http.FileServer(http.FS(fsys))).ServeHTTP(resp, req) +} + +func (s *LocalDirectory) URLFor(_ context.Context, owner client.Object) (string, error) { + return fmt.Sprintf("%s%s", s.URL.String(), localDirectoryBundleFile(owner.GetName())), nil +} + +func (s *LocalDirectory) bundlePath(bundleName string) string { + return filepath.Join(s.RootDirectory, localDirectoryBundleFile(bundleName)) +} + +func localDirectoryBundleFile(bundleName string) string { + return fmt.Sprintf("%s.tgz", bundleName) +} + +func ignoreNotExist(err error) error { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err +} diff --git a/internal/rukpak/storage/storage.go b/internal/rukpak/storage/storage.go new file mode 100644 index 000000000..cedc99a11 --- /dev/null +++ b/internal/rukpak/storage/storage.go @@ -0,0 +1,46 @@ +package storage + +import ( + "context" + "io/fs" + "net/http" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type Storage interface { + Loader + Storer +} + +type Loader interface { + Load(ctx context.Context, owner client.Object) (fs.FS, error) +} + +type Storer interface { + Store(ctx context.Context, owner client.Object, bundle fs.FS) error + Delete(ctx context.Context, owner client.Object) error + + http.Handler + URLFor(ctx context.Context, owner client.Object) (string, error) +} + +type fallbackLoaderStorage struct { + Storage + fallbackLoader Loader +} + +func WithFallbackLoader(s Storage, fallback Loader) Storage { + return &fallbackLoaderStorage{ + Storage: s, + fallbackLoader: fallback, + } +} + +func (s *fallbackLoaderStorage) Load(ctx context.Context, owner client.Object) (fs.FS, error) { + fsys, err := s.Storage.Load(ctx, owner) + if err != nil { + return s.fallbackLoader.Load(ctx, owner) + } + return fsys, nil +} diff --git a/internal/rukpak/util/fs.go b/internal/rukpak/util/fs.go new file mode 100644 index 000000000..b036ebe3f --- /dev/null +++ b/internal/rukpak/util/fs.go @@ -0,0 +1,90 @@ +package util + +import ( + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" + "testing/fstest" +) + +// FilesOnlyFilesystem is an fs.FS implementation that treats non-regular files +// (e.g. directories, symlinks, devices, etc.) as non-existent. The reason for +// this is so that we only serve bundle files. +// +// This treats directories as not found so that the http server does not serve +// HTML directory index responses. +// +// This treats other symlink files as not found so that we prevent HTTP requests +// from escaping the filesystem root. +// +// Lastly, this treats other non-regular files as not found because they are +// out of scope for serving bundle contents. +type FilesOnlyFilesystem struct { + FS fs.FS +} + +func (f *FilesOnlyFilesystem) Open(name string) (fs.File, error) { + file, err := f.FS.Open(name) + if err != nil { + return nil, err + } + stat, err := file.Stat() + if err != nil { + return nil, err + } + if !stat.Mode().IsRegular() { + return nil, os.ErrNotExist + } + return file, nil +} + +// EnsureBaseDirFS ensures that an fs.FS contains a single directory in its root +// This is useful for bundle formats that require a base directory in the root of +// the bundle. +// +// For example, an unpacked Helm chart might have /Chart.yaml, and we'd +// typically assume as the bundle root. However, when helm archives +// contain at the root of the archive: //Chart.yaml. +// +// If the fs.FS already has this structure, EnsureBaseDirFS just returns fsys +// directly. Otherwise, it returns a new fs.FS where the defaultBaseDir is inserted +// at the root, such that fsys appears within defaultBaseDir. +func EnsureBaseDirFS(fsys fs.FS, defaultBaseDir string) (fs.FS, error) { + cleanDefaultBaseDir := filepath.Clean(defaultBaseDir) + if dir, _ := filepath.Split(cleanDefaultBaseDir); dir != "" { + return nil, fmt.Errorf("default base directory %q contains multiple path segments: must be exactly one", defaultBaseDir) + } + rootFSEntries, err := fs.ReadDir(fsys, ".") + if err != nil { + return nil, err + } + if len(rootFSEntries) == 1 && rootFSEntries[0].IsDir() { + return fsys, nil + } + return &baseDirFS{fsys, defaultBaseDir}, nil +} + +type baseDirFS struct { + fsys fs.FS + baseDir string +} + +func (f baseDirFS) Open(name string) (fs.File, error) { + if name == "." { + return fstest.MapFS{f.baseDir: &fstest.MapFile{Mode: fs.ModeDir}}.Open(name) + } + if name == f.baseDir { + return f.fsys.Open(".") + } + basePrefix := f.baseDir + string(os.PathSeparator) + if strings.HasPrefix(name, basePrefix) { + subName := strings.TrimPrefix(name, basePrefix) + if subName == "" { + subName = "." + } + return f.fsys.Open(subName) + } + return nil, fs.ErrNotExist +} diff --git a/internal/rukpak/util/util.go b/internal/rukpak/util/util.go index edfc06c4f..132071d05 100644 --- a/internal/rukpak/util/util.go +++ b/internal/rukpak/util/util.go @@ -1,5 +1,23 @@ package util +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "io/fs" + "os" + + "k8s.io/cli-runtime/pkg/resource" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// TODO verify these +const ( + DefaultSystemNamespace = "operator-controller-system" + DefaultUnpackImage = "quay.io/operator-framework/operator-controller:main" +) + func MergeMaps(maps ...map[string]string) map[string]string { out := map[string]string{} for _, m := range maps { @@ -9,3 +27,89 @@ func MergeMaps(maps ...map[string]string) map[string]string { } return out } + +// PodNamespace checks whether the controller is running in a Pod vs. +// being run locally by inspecting the namespace file that gets mounted +// automatically for Pods at runtime. If that file doesn't exist, then +// return DefaultSystemNamespace. +func PodNamespace() string { + namespace, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + return DefaultSystemNamespace + } + return string(namespace) +} + +// FSToTarGZ writes the filesystem represented by fsys to w as a gzipped tar archive. +// This function unsets user and group information in the tar archive so that readers +// of archives produced by this function do not need to account for differences in +// permissions between source and destination filesystems. +func FSToTarGZ(w io.Writer, fsys fs.FS) error { + gzw := gzip.NewWriter(w) + tw := tar.NewWriter(gzw) + if err := fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.Type()&os.ModeSymlink != 0 { + return nil + } + info, err := d.Info() + if err != nil { + return fmt.Errorf("get file info for %q: %v", path, err) + } + + h, err := tar.FileInfoHeader(info, "") + if err != nil { + return fmt.Errorf("build tar file info header for %q: %v", path, err) + } + h.Uid = 0 + h.Gid = 0 + h.Uname = "" + h.Gname = "" + h.Name = path + + if err := tw.WriteHeader(h); err != nil { + return fmt.Errorf("write tar header for %q: %v", path, err) + } + if d.IsDir() { + return nil + } + f, err := fsys.Open(path) + if err != nil { + return fmt.Errorf("open file %q: %v", path, err) + } + if _, err := io.Copy(tw, f); err != nil { + return fmt.Errorf("write tar data for %q: %v", path, err) + } + return nil + }); err != nil { + return fmt.Errorf("generate tar.gz from FS: %v", err) + } + if err := tw.Close(); err != nil { + return err + } + return gzw.Close() +} + +func ManifestObjects(r io.Reader, name string) ([]client.Object, error) { + result := resource.NewLocalBuilder().Flatten().Unstructured().Stream(r, name).Do() + if err := result.Err(); err != nil { + return nil, err + } + infos, err := result.Infos() + if err != nil { + return nil, err + } + return infosToObjects(infos), nil +} + +func infosToObjects(infos []*resource.Info) []client.Object { + objects := make([]client.Object, 0, len(infos)) + for _, info := range infos { + clientObject := info.Object.(client.Object) + objects = append(objects, clientObject) + } + return objects +}