Skip to content

Commit c353877

Browse files
Plugin unpacker, add Handler (#757)
Signed-off-by: Varsha Prasad Narsing <[email protected]> Co-authored-by: [email protected] <[email protected]>
1 parent 631c758 commit c353877

File tree

11 files changed

+683
-15
lines changed

11 files changed

+683
-15
lines changed

api/v1alpha1/clusterextension_types.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ import (
2222
"github.com/operator-framework/operator-controller/internal/conditionsets"
2323
)
2424

25+
var (
26+
ClusterExtensionGVK = SchemeBuilder.GroupVersion.WithKind("ClusterExtension")
27+
ClusterExtensionKind = ClusterExtensionGVK.Kind
28+
)
29+
2530
type UpgradeConstraintPolicy string
2631

2732
const (
@@ -75,15 +80,18 @@ type ClusterExtensionSpec struct {
7580

7681
const (
7782
// TODO(user): add more Types, here and into init()
78-
TypeInstalled = "Installed"
79-
TypeResolved = "Resolved"
83+
TypeInstalled = "Installed"
84+
TypeResolved = "Resolved"
85+
TypeHasValidBundle = "HasValidBundle"
8086
// TypeDeprecated is a rollup condition that is present when
8187
// any of the deprecated conditions are present.
8288
TypeDeprecated = "Deprecated"
8389
TypePackageDeprecated = "PackageDeprecated"
8490
TypeChannelDeprecated = "ChannelDeprecated"
8591
TypeBundleDeprecated = "BundleDeprecated"
8692

93+
ReasonErrorGettingClient = "ErrorGettingClient"
94+
ReasonBundleLoadFailed = "BundleLoadFailed"
8795
ReasonBundleLookupFailed = "BundleLookupFailed"
8896
ReasonInstallationFailed = "InstallationFailed"
8997
ReasonInstallationStatusUnknown = "InstallationStatusUnknown"

cmd/manager/main.go

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package main
1818

1919
import (
2020
"flag"
21+
"fmt"
2122
"net/http"
23+
"net/url"
2224
"os"
2325
"time"
2426

@@ -42,6 +44,10 @@ import (
4244
"github.com/operator-framework/operator-controller/internal/catalogmetadata/cache"
4345
catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
4446
"github.com/operator-framework/operator-controller/internal/controllers"
47+
"github.com/operator-framework/operator-controller/internal/rukpak/handler"
48+
"github.com/operator-framework/operator-controller/internal/rukpak/source"
49+
"github.com/operator-framework/operator-controller/internal/rukpak/storage"
50+
"github.com/operator-framework/operator-controller/internal/rukpak/util"
4551
"github.com/operator-framework/operator-controller/pkg/features"
4652
)
4753

@@ -63,17 +69,25 @@ func init() {
6369

6470
func main() {
6571
var (
66-
metricsAddr string
67-
enableLeaderElection bool
68-
probeAddr string
69-
cachePath string
72+
metricsAddr string
73+
enableLeaderElection bool
74+
probeAddr string
75+
cachePath string
76+
httpExternalAddr string
77+
systemNamespace string
78+
unpackImage string
79+
provisionerStorageDirectory string
7080
)
7181
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
7282
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
83+
flag.StringVar(&httpExternalAddr, "http-external-address", "http://localhost:8080", "The external address at which the http server is reachable.")
7384
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
7485
"Enable leader election for controller manager. "+
7586
"Enabling this will ensure there is only one active controller manager.")
7687
flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching")
88+
flag.StringVar(&systemNamespace, "system-namespace", "", "Configures the namespace that gets used to deploy system resources.")
89+
flag.StringVar(&unpackImage, "unpack-image", util.DefaultUnpackImage, "Configures the container image that gets used to unpack Bundle contents.")
90+
flag.StringVar(&provisionerStorageDirectory, "provisioner-storage-dir", storage.DefaultBundleCacheDir, "The directory that is used to store bundle contents.")
7791
opts := zap.Options{
7892
Development: true,
7993
}
@@ -122,11 +136,35 @@ func main() {
122136
setupLog.Error(err, "unable to create helm client")
123137
}
124138

139+
if systemNamespace == "" {
140+
systemNamespace = util.PodNamespace()
141+
}
142+
143+
unpacker, err := source.NewDefaultUnpacker(mgr, systemNamespace, unpackImage)
144+
if err != nil {
145+
setupLog.Error(err, "unable to create unpacker")
146+
}
147+
148+
storageURL, err := url.Parse(fmt.Sprintf("%s/bundles/", httpExternalAddr))
149+
if err != nil {
150+
setupLog.Error(err, "unable to parse bundle content server URL")
151+
os.Exit(1)
152+
}
153+
154+
localStorage := &storage.LocalDirectory{
155+
RootDirectory: provisionerStorageDirectory,
156+
URL: *storageURL,
157+
}
158+
125159
if err = (&controllers.ClusterExtensionReconciler{
126160
Client: cl,
161+
ReleaseNamespace: systemNamespace,
127162
BundleProvider: catalogClient,
128163
Scheme: mgr.GetScheme(),
129164
ActionClientGetter: acg,
165+
Unpacker: unpacker,
166+
Storage: localStorage,
167+
Handler: handler.HandlerFunc(handler.HandleClusterExtension),
130168
}).SetupWithManager(mgr); err != nil {
131169
setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension")
132170
os.Exit(1)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ require (
2222
go.uber.org/zap v1.27.0
2323
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
2424
gopkg.in/yaml.v2 v2.4.0
25+
helm.sh/helm/v3 v3.14.3
2526
k8s.io/api v0.29.3
2627
k8s.io/apiextensions-apiserver v0.29.3
2728
k8s.io/apimachinery v0.29.3
29+
k8s.io/cli-runtime v0.29.2
2830
k8s.io/client-go v0.29.3
2931
k8s.io/component-base v0.29.3
3032
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
@@ -201,9 +203,7 @@ require (
201203
gopkg.in/inf.v0 v0.9.1 // indirect
202204
gopkg.in/warnings.v0 v0.1.2 // indirect
203205
gopkg.in/yaml.v3 v3.0.1 // indirect
204-
helm.sh/helm/v3 v3.14.3 // indirect
205206
k8s.io/apiserver v0.29.3 // indirect
206-
k8s.io/cli-runtime v0.29.2 // indirect
207207
k8s.io/klog/v2 v2.120.1 // indirect
208208
k8s.io/kube-openapi v0.0.0-20240221221325-2ac9dc51f3f1 // indirect
209209
k8s.io/kubectl v0.29.2 // indirect

internal/controllers/clusterextension_controller.go

Lines changed: 103 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,35 @@ limitations under the License.
1717
package controllers
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"errors"
2223
"fmt"
24+
"io"
2325
"sort"
2426
"strings"
2527

28+
"github.com/operator-framework/operator-controller/internal/rukpak/handler"
29+
"github.com/operator-framework/operator-controller/internal/rukpak/util"
30+
"helm.sh/helm/v3/pkg/postrender"
31+
2632
mmsemver "github.com/Masterminds/semver/v3"
2733
bsemver "github.com/blang/semver/v4"
2834
"github.com/go-logr/logr"
2935
"k8s.io/apimachinery/pkg/api/equality"
3036
apierrors "k8s.io/apimachinery/pkg/api/errors"
37+
"k8s.io/apimachinery/pkg/api/meta"
3138
apimeta "k8s.io/apimachinery/pkg/api/meta"
3239
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3340
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3441
"k8s.io/apimachinery/pkg/runtime"
3542
"k8s.io/apimachinery/pkg/types"
3643
utilerrors "k8s.io/apimachinery/pkg/util/errors"
44+
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
3745
"k8s.io/utils/ptr"
3846
ctrl "sigs.k8s.io/controller-runtime"
3947
"sigs.k8s.io/controller-runtime/pkg/client"
40-
"sigs.k8s.io/controller-runtime/pkg/handler"
48+
crhandler "sigs.k8s.io/controller-runtime/pkg/handler"
4149
"sigs.k8s.io/controller-runtime/pkg/log"
4250
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4351

@@ -51,13 +59,19 @@ import (
5159
catalogfilter "github.com/operator-framework/operator-controller/internal/catalogmetadata/filter"
5260
catalogsort "github.com/operator-framework/operator-controller/internal/catalogmetadata/sort"
5361
rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api"
62+
"github.com/operator-framework/operator-controller/internal/rukpak/source"
63+
"github.com/operator-framework/operator-controller/internal/rukpak/storage"
5464
)
5565

5666
// ClusterExtensionReconciler reconciles a ClusterExtension object
5767
type ClusterExtensionReconciler struct {
5868
client.Client
69+
ReleaseNamespace string
5970
BundleProvider BundleProvider
71+
Unpacker source.Unpacker
6072
ActionClientGetter helmclient.ActionClientGetter
73+
Storage storage.Storage
74+
Handler handler.Handler
6175
Scheme *runtime.Scheme
6276
}
6377

@@ -137,10 +151,62 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
137151
// Considering only image source.
138152

139153
// Generate a BundleSource, and then pass this and the ClusterExtension to Unpack
140-
// TODO:
141-
// bs := r.GenerateExpectedBundleSource(*ext, bundle.Image)
142-
// unpacker := NewDefaultUnpacker(msg, namespace, unpackImage)
143-
// unpacker..Unpack(bs, ext)
154+
bs := r.GenerateExpectedBundleSource(*ext, bundle.Image)
155+
unpackResult, err := r.Unpacker.Unpack(ctx, bs, ext)
156+
if err != nil {
157+
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("source bundle content: %v", err))
158+
}
159+
160+
switch unpackResult.State {
161+
case source.StatePending:
162+
updateStatusUnpackPending(&ext.Status, unpackResult)
163+
// There must be a limit to number of entries if status is stuck at
164+
// unpack pending.
165+
return ctrl.Result{}, nil
166+
case source.StateUnpacking:
167+
updateStatusUnpacking(&ext.Status, unpackResult)
168+
return ctrl.Result{}, nil
169+
case source.StateUnpacked:
170+
if err := r.Storage.Store(ctx, ext, unpackResult.Bundle); err != nil {
171+
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("persist bundle content: %v", err))
172+
}
173+
contentURL, err := r.Storage.URLFor(ctx, ext)
174+
if err != nil {
175+
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("get content URL: %v", err))
176+
}
177+
updateStatusUnpacked(&ext.Status, unpackResult, contentURL)
178+
default:
179+
return ctrl.Result{}, updateStatusUnpackFailing(&ext.Status, fmt.Errorf("unknown unpack state %q: %v", unpackResult.State, err))
180+
}
181+
182+
bundleFS, err := r.Storage.Load(ctx, ext)
183+
if err != nil {
184+
meta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{
185+
Type: ocv1alpha1.TypeHasValidBundle,
186+
Status: metav1.ConditionFalse,
187+
Reason: ocv1alpha1.ReasonBundleLoadFailed,
188+
Message: err.Error(),
189+
})
190+
return ctrl.Result{}, err
191+
}
192+
193+
_, _, err = r.Handler.Handle(ctx, bundleFS, ext)
194+
if err != nil {
195+
meta.SetStatusCondition(&ext.Status.Conditions, metav1.Condition{
196+
Type: rukpakv1alpha2.TypeInstalled,
197+
Status: metav1.ConditionFalse,
198+
Reason: rukpakv1alpha2.ReasonInstallFailed,
199+
Message: err.Error(),
200+
})
201+
return ctrl.Result{}, err
202+
}
203+
204+
ext.SetNamespace(r.ReleaseNamespace)
205+
_, err = r.ActionClientGetter.ActionClientFor(ext)
206+
if err != nil {
207+
setInstalledStatusConditionFailed(&ext.Status.Conditions, fmt.Sprintf("%s:%v", ocv1alpha1.ReasonErrorGettingClient, err), ext.Generation)
208+
return ctrl.Result{}, err
209+
}
144210

145211
// set the status of the cluster extension based on the respective bundle deployment status conditions.
146212
return ctrl.Result{}, nil
@@ -286,7 +352,7 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
286352
err := ctrl.NewControllerManagedBy(mgr).
287353
For(&ocv1alpha1.ClusterExtension{}).
288354
Watches(&catalogd.Catalog{},
289-
handler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))).
355+
crhandler.EnqueueRequestsFromMapFunc(clusterExtensionRequestsForCatalog(mgr.GetClient(), mgr.GetLogger()))).
290356
Owns(&rukpakv1alpha2.BundleDeployment{}).
291357
Complete(r)
292358

@@ -297,7 +363,7 @@ func (r *ClusterExtensionReconciler) SetupWithManager(mgr ctrl.Manager) error {
297363
}
298364

299365
// Generate reconcile requests for all cluster extensions affected by a catalog change
300-
func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) handler.MapFunc {
366+
func clusterExtensionRequestsForCatalog(c client.Reader, logger logr.Logger) crhandler.MapFunc {
301367
return func(ctx context.Context, _ client.Object) []reconcile.Request {
302368
// no way of associating an extension to a catalog so create reconcile requests for everything
303369
clusterExtensions := ocv1alpha1.ClusterExtensionList{}
@@ -417,3 +483,33 @@ func (r *ClusterExtensionReconciler) getInstalledVersion(ctx context.Context, cl
417483
}
418484
return existingVersionSemver, nil
419485
}
486+
487+
type postrenderer struct {
488+
labels map[string]string
489+
cascade postrender.PostRenderer
490+
}
491+
492+
func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) {
493+
var buf bytes.Buffer
494+
dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024)
495+
for {
496+
obj := unstructured.Unstructured{}
497+
err := dec.Decode(&obj)
498+
if errors.Is(err, io.EOF) {
499+
break
500+
}
501+
if err != nil {
502+
return nil, err
503+
}
504+
obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels))
505+
b, err := obj.MarshalJSON()
506+
if err != nil {
507+
return nil, err
508+
}
509+
buf.Write(b)
510+
}
511+
if p.cascade != nil {
512+
return p.cascade.Run(&buf)
513+
}
514+
return &buf, nil
515+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2023.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1"
21+
rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api"
22+
"github.com/operator-framework/operator-controller/internal/rukpak/source"
23+
"k8s.io/apimachinery/pkg/api/meta"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
)
26+
27+
func updateStatusUnpackFailing(status *ocv1alpha1.ClusterExtensionStatus, err error) error {
28+
status.ResolvedBundle = nil
29+
status.InstalledBundle = nil
30+
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
31+
Type: rukpakapi.TypeUnpacked,
32+
Status: metav1.ConditionFalse,
33+
Reason: rukpakapi.ReasonUnpackFailed,
34+
Message: err.Error(),
35+
})
36+
return err
37+
}
38+
39+
// TODO: verify if we need to update the installBundle status or leave it as is.
40+
func updateStatusUnpackPending(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result) {
41+
status.ResolvedBundle = nil
42+
status.InstalledBundle = nil
43+
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
44+
Type: rukpakapi.TypeUnpacked,
45+
Status: metav1.ConditionFalse,
46+
Reason: rukpakapi.ReasonUnpackPending,
47+
Message: result.Message,
48+
})
49+
}
50+
51+
// TODO: verify if we need to update the installBundle status or leave it as is.
52+
func updateStatusUnpacking(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result) {
53+
status.ResolvedBundle = nil
54+
status.InstalledBundle = nil
55+
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
56+
Type: rukpakapi.TypeUnpacked,
57+
Status: metav1.ConditionFalse,
58+
Reason: rukpakapi.ReasonUnpacking,
59+
Message: result.Message,
60+
})
61+
}
62+
63+
func updateStatusUnpacked(status *ocv1alpha1.ClusterExtensionStatus, result *source.Result, contentURL string) {
64+
// TODO: Expose content URL through CE status.
65+
status.ResolvedBundle = &ocv1alpha1.BundleMetadata{
66+
Name: result.ResolvedSource.Image.Ref,
67+
}
68+
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
69+
Type: rukpakapi.TypeUnpacked,
70+
Status: metav1.ConditionTrue,
71+
Reason: rukpakapi.ReasonUnpackSuccessful,
72+
Message: result.Message,
73+
})
74+
}

0 commit comments

Comments
 (0)