@@ -24,26 +24,36 @@ import (
24
24
"io"
25
25
"sort"
26
26
"strings"
27
+ "sync"
27
28
28
29
mmsemver "github.com/Masterminds/semver/v3"
29
30
bsemver "github.com/blang/semver/v4"
30
31
"github.com/go-logr/logr"
32
+ "helm.sh/helm/v3/pkg/action"
33
+ "helm.sh/helm/v3/pkg/chart"
34
+ "helm.sh/helm/v3/pkg/chartutil"
31
35
"helm.sh/helm/v3/pkg/postrender"
36
+ "helm.sh/helm/v3/pkg/release"
37
+ "helm.sh/helm/v3/pkg/storage/driver"
32
38
"k8s.io/apimachinery/pkg/api/equality"
33
39
apierrors "k8s.io/apimachinery/pkg/api/errors"
34
40
apimeta "k8s.io/apimachinery/pkg/api/meta"
35
41
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
42
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
37
43
"k8s.io/apimachinery/pkg/runtime"
44
+ "k8s.io/apimachinery/pkg/runtime/schema"
38
45
"k8s.io/apimachinery/pkg/types"
39
46
utilerrors "k8s.io/apimachinery/pkg/util/errors"
40
47
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
41
48
"k8s.io/utils/ptr"
42
49
ctrl "sigs.k8s.io/controller-runtime"
50
+ "sigs.k8s.io/controller-runtime/pkg/cache"
43
51
"sigs.k8s.io/controller-runtime/pkg/client"
52
+ crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
44
53
crhandler "sigs.k8s.io/controller-runtime/pkg/handler"
45
54
"sigs.k8s.io/controller-runtime/pkg/log"
46
55
"sigs.k8s.io/controller-runtime/pkg/reconcile"
56
+ "sigs.k8s.io/controller-runtime/pkg/source"
47
57
48
58
catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
49
59
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
@@ -56,7 +66,8 @@ import (
56
66
catalogsort "github.com/operator-framework/operator-controller/internal/catalogmetadata/sort"
57
67
rukpakapi "github.com/operator-framework/operator-controller/internal/rukpak/api"
58
68
"github.com/operator-framework/operator-controller/internal/rukpak/handler"
59
- "github.com/operator-framework/operator-controller/internal/rukpak/source"
69
+ helmpredicate "github.com/operator-framework/operator-controller/internal/rukpak/helm-operator-plugins/predicate"
70
+ rukpaksource "github.com/operator-framework/operator-controller/internal/rukpak/source"
60
71
"github.com/operator-framework/operator-controller/internal/rukpak/storage"
61
72
"github.com/operator-framework/operator-controller/internal/rukpak/util"
62
73
)
@@ -66,11 +77,15 @@ type ClusterExtensionReconciler struct {
66
77
client.Client
67
78
ReleaseNamespace string
68
79
BundleProvider BundleProvider
69
- Unpacker source .Unpacker
80
+ Unpacker rukpaksource .Unpacker
70
81
ActionClientGetter helmclient.ActionClientGetter
71
82
Storage storage.Storage
72
83
Handler handler.Handler
73
84
Scheme * runtime.Scheme
85
+ dynamicWatchMutex sync.RWMutex
86
+ dynamicWatchGVKs map [schema.GroupVersionKind ]struct {}
87
+ controller crcontroller.Controller
88
+ cache cache.Cache
74
89
}
75
90
76
91
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clusterextensions,verbs=get;list;watch
@@ -156,15 +171,15 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
156
171
}
157
172
158
173
switch unpackResult .State {
159
- case source .StatePending :
174
+ case rukpaksource .StatePending :
160
175
updateStatusUnpackPending (& ext .Status , unpackResult )
161
176
// There must be a limit to number of entries if status is stuck at
162
177
// unpack pending.
163
178
return ctrl.Result {}, nil
164
- case source .StateUnpacking :
179
+ case rukpaksource .StateUnpacking :
165
180
updateStatusUnpacking (& ext .Status , unpackResult )
166
181
return ctrl.Result {}, nil
167
- case source .StateUnpacked :
182
+ case rukpaksource .StateUnpacked :
168
183
if err := r .Storage .Store (ctx , ext , unpackResult .Bundle ); err != nil {
169
184
return ctrl.Result {}, updateStatusUnpackFailing (& ext .Status , fmt .Errorf ("persist bundle content: %v" , err ))
170
185
}
@@ -188,7 +203,7 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
188
203
return ctrl.Result {}, err
189
204
}
190
205
191
- _ , _ , err = r .Handler .Handle (ctx , bundleFS , ext )
206
+ chrt , values , err : = r .Handler .Handle (ctx , bundleFS , ext )
192
207
if err != nil {
193
208
apimeta .SetStatusCondition (& ext .Status .Conditions , metav1.Condition {
194
209
Type : rukpakv1alpha2 .TypeInstalled ,
@@ -200,12 +215,95 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp
200
215
}
201
216
202
217
ext .SetNamespace (r .ReleaseNamespace )
203
- _ , err = r .ActionClientGetter .ActionClientFor (ext )
218
+ ac , err : = r .ActionClientGetter .ActionClientFor (ext )
204
219
if err != nil {
205
220
setInstalledStatusConditionFailed (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonErrorGettingClient , err ), ext .Generation )
206
221
return ctrl.Result {}, err
207
222
}
208
223
224
+ post := & postrenderer {
225
+ labels : map [string ]string {
226
+ util .CoreOwnerKindKey : ocv1alpha1 .ClusterExtensionKind ,
227
+ util .CoreOwnerNameKey : ext .GetName (),
228
+ },
229
+ }
230
+
231
+ rel , state , err := r .getReleaseState (ac , ext , chrt , values , post )
232
+ if err != nil {
233
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonErrorGettingReleaseState , err ), ext .Generation )
234
+ return ctrl.Result {}, err
235
+ }
236
+
237
+ switch state {
238
+ case stateNeedsInstall :
239
+ rel , err = ac .Install (ext .GetName (), r .ReleaseNamespace , chrt , values , func (install * action.Install ) error {
240
+ install .CreateNamespace = false
241
+ return nil
242
+ }, helmclient .AppendInstallPostRenderer (post ))
243
+ if err != nil {
244
+ if isResourceNotFoundErr (err ) {
245
+ err = errRequiredResourceNotFound {err }
246
+ }
247
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonInstallationFailed , err ), ext .Generation )
248
+ return ctrl.Result {}, err
249
+ }
250
+ case stateNeedsUpgrade :
251
+ rel , err = ac .Upgrade (ext .GetName (), r .ReleaseNamespace , chrt , values , helmclient .AppendUpgradePostRenderer (post ))
252
+ if err != nil {
253
+ if isResourceNotFoundErr (err ) {
254
+ err = errRequiredResourceNotFound {err }
255
+ }
256
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonUpgradeFailed , err ), ext .Generation )
257
+ return ctrl.Result {}, err
258
+ }
259
+ case stateUnchanged :
260
+ if err := ac .Reconcile (rel ); err != nil {
261
+ if isResourceNotFoundErr (err ) {
262
+ err = errRequiredResourceNotFound {err }
263
+ }
264
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonResolutionFailed , err ), ext .Generation )
265
+ return ctrl.Result {}, err
266
+ }
267
+ default :
268
+ return ctrl.Result {}, fmt .Errorf ("unexpected release state %q" , state )
269
+ }
270
+
271
+ relObjects , err := util .ManifestObjects (strings .NewReader (rel .Manifest ), fmt .Sprintf ("%s-release-manifest" , rel .Name ))
272
+ if err != nil {
273
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonCreateDynamicWatchFailed , err ), ext .Generation )
274
+ return ctrl.Result {}, err
275
+ }
276
+
277
+ for _ , obj := range relObjects {
278
+ uMap , err := runtime .DefaultUnstructuredConverter .ToUnstructured (obj )
279
+ if err != nil {
280
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonCreateDynamicWatchFailed , err ), ext .Generation )
281
+ return ctrl.Result {}, err
282
+ }
283
+
284
+ unstructuredObj := & unstructured.Unstructured {Object : uMap }
285
+ if err := func () error {
286
+ r .dynamicWatchMutex .Lock ()
287
+ defer r .dynamicWatchMutex .Unlock ()
288
+
289
+ _ , isWatched := r .dynamicWatchGVKs [unstructuredObj .GroupVersionKind ()]
290
+ if ! isWatched {
291
+ if err := r .controller .Watch (
292
+ source .Kind (r .cache , unstructuredObj ),
293
+ crhandler .EnqueueRequestForOwner (r .Scheme , r .RESTMapper (), ext , crhandler .OnlyControllerOwner ()),
294
+ helmpredicate .DependentPredicateFuncs ()); err != nil {
295
+ return err
296
+ }
297
+ r .dynamicWatchGVKs [unstructuredObj .GroupVersionKind ()] = struct {}{}
298
+ }
299
+ return nil
300
+ }(); err != nil {
301
+ setInstalledAndHealthyFalse (& ext .Status .Conditions , fmt .Sprintf ("%s:%v" , ocv1alpha1 .ReasonCreateDynamicWatchFailed , err ), ext .Generation )
302
+ return ctrl.Result {}, err
303
+ }
304
+ }
305
+ setInstalledStatusConditionSuccess (& ext .Status .Conditions , fmt .Sprintf ("Instantiated bundle %s successfully" , ext .GetName ()), ext .Generation )
306
+
209
307
// set the status of the cluster extension based on the respective bundle deployment status conditions.
210
308
return ctrl.Result {}, nil
211
309
}
@@ -347,16 +445,18 @@ func (r *ClusterExtensionReconciler) GenerateExpectedBundleDeployment(o ocv1alph
347
445
348
446
// SetupWithManager sets up the controller with the Manager.
349
447
func (r * ClusterExtensionReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
350
- err := ctrl .NewControllerManagedBy (mgr ).
448
+ controller , err := ctrl .NewControllerManagedBy (mgr ).
351
449
For (& ocv1alpha1.ClusterExtension {}).
352
450
Watches (& catalogd.Catalog {},
353
451
crhandler .EnqueueRequestsFromMapFunc (clusterExtensionRequestsForCatalog (mgr .GetClient (), mgr .GetLogger ()))).
354
452
Owns (& rukpakv1alpha2.BundleDeployment {}).
355
- Complete (r )
453
+ Build (r )
356
454
357
455
if err != nil {
358
456
return err
359
457
}
458
+ r .controller = controller
459
+ r .cache = mgr .GetCache ()
360
460
return nil
361
461
}
362
462
@@ -490,6 +590,69 @@ func (r *ClusterExtensionReconciler) getInstalledVersion(ctx context.Context, cl
490
590
return existingVersionSemver , nil
491
591
}
492
592
593
+ type releaseState string
594
+
595
+ const (
596
+ stateNeedsInstall releaseState = "NeedsInstall"
597
+ stateNeedsUpgrade releaseState = "NeedsUpgrade"
598
+ stateUnchanged releaseState = "Unchanged"
599
+ stateError releaseState = "Error"
600
+ )
601
+
602
+ func (r * ClusterExtensionReconciler ) getReleaseState (cl helmclient.ActionInterface , obj metav1.Object , chrt * chart.Chart , values chartutil.Values , post * postrenderer ) (* release.Release , releaseState , error ) {
603
+ currentRelease , err := cl .Get (obj .GetName ())
604
+ if err != nil && ! errors .Is (err , driver .ErrReleaseNotFound ) {
605
+ return nil , stateError , err
606
+ }
607
+ if errors .Is (err , driver .ErrReleaseNotFound ) {
608
+ return nil , stateNeedsInstall , nil
609
+ }
610
+ desiredRelease , err := cl .Upgrade (obj .GetName (), r .ReleaseNamespace , chrt , values , func (upgrade * action.Upgrade ) error {
611
+ upgrade .DryRun = true
612
+ return nil
613
+ }, helmclient .AppendUpgradePostRenderer (post ))
614
+ if err != nil {
615
+ return currentRelease , stateError , err
616
+ }
617
+ if desiredRelease .Manifest != currentRelease .Manifest ||
618
+ currentRelease .Info .Status == release .StatusFailed ||
619
+ currentRelease .Info .Status == release .StatusSuperseded {
620
+ return currentRelease , stateNeedsUpgrade , nil
621
+ }
622
+ return currentRelease , stateUnchanged , nil
623
+ }
624
+
625
+ type errRequiredResourceNotFound struct {
626
+ error
627
+ }
628
+
629
+ func (err errRequiredResourceNotFound ) Error () string {
630
+ return fmt .Sprintf ("required resource not found: %v" , err .error )
631
+ }
632
+
633
+ func isResourceNotFoundErr (err error ) bool {
634
+ var agg utilerrors.Aggregate
635
+ if errors .As (err , & agg ) {
636
+ for _ , err := range agg .Errors () {
637
+ return isResourceNotFoundErr (err )
638
+ }
639
+ }
640
+
641
+ nkme := & apimeta.NoKindMatchError {}
642
+ if errors .As (err , & nkme ) {
643
+ return true
644
+ }
645
+ if apierrors .IsNotFound (err ) {
646
+ return true
647
+ }
648
+
649
+ // TODO: improve NoKindMatchError matching
650
+ // An error that is bubbled up from the k8s.io/cli-runtime library
651
+ // does not wrap meta.NoKindMatchError, so we need to fallback to
652
+ // the use of string comparisons for now.
653
+ return strings .Contains (err .Error (), "no matches for kind" )
654
+ }
655
+
493
656
type postrenderer struct {
494
657
labels map [string ]string
495
658
cascade postrender.PostRenderer
0 commit comments