Skip to content

Commit 7dc552a

Browse files
authored
Merge pull request #1252 from vincepri/move-cluster-controller
Migrate Cluster Controller to Kubebuilder v2
2 parents 65800b3 + fabf8a4 commit 7dc552a

17 files changed

+345
-535
lines changed

cmd/example-provider/BUILD.bazel

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@ go_library(
77
visibility = ["//visibility:private"],
88
deps = [
99
"//api/v1alpha2:go_default_library",
10-
"//pkg/controller/cluster:go_default_library",
11-
"//pkg/controller/machine:go_default_library",
10+
"//controllers:go_default_library",
1211
"//vendor/k8s.io/klog:go_default_library",
13-
"//vendor/sigs.k8s.io/controller-runtime/pkg/client/config:go_default_library",
14-
"//vendor/sigs.k8s.io/controller-runtime/pkg/manager:go_default_library",
15-
"//vendor/sigs.k8s.io/controller-runtime/pkg/runtime/signals:go_default_library",
12+
"//vendor/sigs.k8s.io/controller-runtime:go_default_library",
1613
],
1714
)
1815

cmd/example-provider/main.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,22 @@ package main
1818

1919
import (
2020
"flag"
21+
"os"
2122

2223
"k8s.io/klog"
2324
"sigs.k8s.io/cluster-api/api/v1alpha2"
24-
capicluster "sigs.k8s.io/cluster-api/pkg/controller/cluster"
25-
capimachine "sigs.k8s.io/cluster-api/pkg/controller/machine"
26-
"sigs.k8s.io/controller-runtime/pkg/client/config"
27-
"sigs.k8s.io/controller-runtime/pkg/manager"
28-
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
25+
"sigs.k8s.io/cluster-api/controllers"
26+
ctrl "sigs.k8s.io/controller-runtime"
2927
)
3028

3129
func main() {
3230
klog.InitFlags(nil)
3331
flag.Parse()
3432

35-
cfg := config.GetConfigOrDie()
33+
cfg := ctrl.GetConfigOrDie()
3634

3735
// Setup a Manager
38-
mgr, err := manager.New(cfg, manager.Options{})
36+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{})
3937
if err != nil {
4038
klog.Fatalf("Failed to set up controller manager: %v", err)
4139
}
@@ -44,10 +42,20 @@ func main() {
4442
klog.Fatal(err)
4543
}
4644

47-
capimachine.Add(mgr)
48-
capicluster.Add(mgr)
45+
if err = (&controllers.ClusterReconciler{
46+
Client: mgr.GetClient(),
47+
Log: ctrl.Log.WithName("controllers").WithName("Cluster"),
48+
}).SetupWithManager(mgr); err != nil {
49+
os.Exit(1)
50+
}
51+
if err = (&controllers.MachineReconciler{
52+
Client: mgr.GetClient(),
53+
Log: ctrl.Log.WithName("controllers").WithName("Machine"),
54+
}).SetupWithManager(mgr); err != nil {
55+
os.Exit(1)
56+
}
4957

50-
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
58+
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
5159
klog.Fatalf("Failed to run manager: %v", err)
5260
}
5361
}

controllers/BUILD.bazel

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "go_default_library",
55
srcs = [
66
"cluster_controller.go",
7+
"cluster_controller_phases.go",
78
"machine_controller.go",
89
"machinedeployment_controller.go",
910
"machineset_controller.go",
@@ -12,25 +13,49 @@ go_library(
1213
visibility = ["//visibility:public"],
1314
deps = [
1415
"//api/v1alpha2:go_default_library",
16+
"//pkg/controller/external:go_default_library",
17+
"//pkg/errors:go_default_library",
18+
"//pkg/util:go_default_library",
1519
"//vendor/github.com/go-logr/logr:go_default_library",
20+
"//vendor/github.com/pkg/errors:go_default_library",
21+
"//vendor/k8s.io/api/core/v1:go_default_library",
22+
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
23+
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
24+
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
25+
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
26+
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
27+
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
28+
"//vendor/k8s.io/client-go/tools/record:go_default_library",
29+
"//vendor/k8s.io/klog:go_default_library",
30+
"//vendor/k8s.io/utils/pointer:go_default_library",
1631
"//vendor/sigs.k8s.io/controller-runtime:go_default_library",
1732
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
33+
"//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library",
34+
"//vendor/sigs.k8s.io/controller-runtime/pkg/handler:go_default_library",
35+
"//vendor/sigs.k8s.io/controller-runtime/pkg/reconcile:go_default_library",
36+
"//vendor/sigs.k8s.io/controller-runtime/pkg/source:go_default_library",
1837
],
1938
)
2039

2140
go_test(
2241
name = "go_default_test",
23-
srcs = ["suite_test.go"],
42+
srcs = [
43+
"cluster_controller_test.go",
44+
"suite_test.go",
45+
],
2446
embed = [":go_default_library"],
2547
deps = [
2648
"//api/v1alpha2:go_default_library",
2749
"//vendor/github.com/onsi/ginkgo:go_default_library",
2850
"//vendor/github.com/onsi/gomega:go_default_library",
51+
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
2952
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
3053
"//vendor/k8s.io/client-go/rest:go_default_library",
54+
"//vendor/k8s.io/klog:go_default_library",
55+
"//vendor/k8s.io/klog/klogr:go_default_library",
3156
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
3257
"//vendor/sigs.k8s.io/controller-runtime/pkg/envtest:go_default_library",
3358
"//vendor/sigs.k8s.io/controller-runtime/pkg/log:go_default_library",
34-
"//vendor/sigs.k8s.io/controller-runtime/pkg/log/zap:go_default_library",
59+
"//vendor/sigs.k8s.io/controller-runtime/pkg/manager:go_default_library",
3560
],
3661
)

controllers/cluster_controller.go

Lines changed: 202 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,226 @@ package controllers
1818

1919
import (
2020
"context"
21+
"path"
22+
"sync"
23+
"time"
2124

2225
"github.com/go-logr/logr"
26+
"github.com/pkg/errors"
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
"k8s.io/apimachinery/pkg/api/meta"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
kerrors "k8s.io/apimachinery/pkg/util/errors"
31+
"k8s.io/client-go/tools/record"
32+
"k8s.io/klog"
33+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha2"
34+
"sigs.k8s.io/cluster-api/pkg/controller/external"
35+
capierrors "sigs.k8s.io/cluster-api/pkg/errors"
36+
"sigs.k8s.io/cluster-api/pkg/util"
2337
ctrl "sigs.k8s.io/controller-runtime"
2438
"sigs.k8s.io/controller-runtime/pkg/client"
39+
"sigs.k8s.io/controller-runtime/pkg/controller"
40+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
41+
)
2542

26-
clusterv1alpha2 "sigs.k8s.io/cluster-api/api/v1alpha2"
43+
const (
44+
// deleteRequeueAfter is how long to wait before checking again to see if the cluster still has children during
45+
// deletion.
46+
deleteRequeueAfter = 5 * time.Second
2747
)
2848

2949
// ClusterReconciler reconciles a Cluster object
3050
type ClusterReconciler struct {
3151
client.Client
3252
Log logr.Logger
53+
54+
controller controller.Controller
55+
recorder record.EventRecorder
56+
externalWatchers sync.Map
3357
}
3458

35-
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
36-
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters/status,verbs=get;update;patch
59+
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
60+
c, err := ctrl.NewControllerManagedBy(mgr).
61+
For(&clusterv1.Cluster{}).
62+
Build(r)
63+
64+
r.controller = c
65+
r.recorder = mgr.GetEventRecorderFor("cluster-controller")
66+
return err
67+
}
3768

38-
func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
39-
_ = context.Background()
69+
func (r *ClusterReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, reterr error) {
70+
ctx := context.Background()
4071
_ = r.Log.WithValues("cluster", req.NamespacedName)
4172

42-
// your logic here
73+
// Fetch the Cluster instance.
74+
cluster := &clusterv1.Cluster{}
75+
err := r.Get(ctx, req.NamespacedName, cluster)
76+
if err != nil {
77+
if apierrors.IsNotFound(err) {
78+
// Object not found, return. Created objects are automatically garbage collected.
79+
// For additional cleanup logic use finalizers.
80+
return ctrl.Result{}, nil
81+
}
82+
83+
// Error reading the object - requeue the request.
84+
return ctrl.Result{}, err
85+
}
86+
87+
// Store Cluster early state to allow patching.
88+
patchCluster := client.MergeFrom(cluster.DeepCopy())
89+
90+
// Always issue a Patch for the Cluster object and its status after each reconciliation.
91+
defer func() {
92+
if err := r.Client.Patch(ctx, cluster, patchCluster); err != nil {
93+
klog.Errorf("Error Patching Cluster %q in namespace %q: %v", cluster.Name, cluster.Namespace, err)
94+
if reterr == nil {
95+
reterr = err
96+
}
97+
return
98+
}
99+
if err := r.Client.Status().Patch(ctx, cluster, patchCluster); err != nil {
100+
klog.Errorf("Error Patching Cluster status %q in namespace %q: %v", cluster.Name, cluster.Namespace, err)
101+
if reterr == nil {
102+
reterr = err
103+
}
104+
}
105+
}()
106+
107+
// If object hasn't been deleted and doesn't have a finalizer, add one.
108+
if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
109+
if !util.Contains(cluster.Finalizers, clusterv1.ClusterFinalizer) {
110+
cluster.Finalizers = append(cluster.ObjectMeta.Finalizers, clusterv1.ClusterFinalizer)
111+
}
112+
}
113+
114+
if err := r.reconcile(ctx, cluster); err != nil {
115+
if requeueErr, ok := errors.Cause(err).(capierrors.HasRequeueAfterError); ok {
116+
klog.Infof("Reconciliation for Cluster %q in namespace %q asked to requeue: %v", cluster.Name, cluster.Namespace, err)
117+
return ctrl.Result{Requeue: true, RequeueAfter: requeueErr.GetRequeueAfter()}, nil
118+
}
119+
return ctrl.Result{}, err
120+
}
121+
122+
if !cluster.ObjectMeta.DeletionTimestamp.IsZero() {
123+
return r.reconcileDelete(ctx, cluster)
124+
}
43125

44126
return ctrl.Result{}, nil
45127
}
46128

47-
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
48-
return ctrl.NewControllerManagedBy(mgr).
49-
For(&clusterv1alpha2.Cluster{}).
50-
Complete(r)
129+
// reconcileDelete handles cluster deletion.
130+
// TODO(ncdc): consolidate all deletion logic in here.
131+
func (r *ClusterReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster) (reconcile.Result, error) {
132+
children, err := r.listChildren(ctx, cluster)
133+
if err != nil {
134+
klog.Errorf("Failed to list children of cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
135+
return reconcile.Result{}, err
136+
}
137+
138+
if len(children) > 0 {
139+
klog.Infof("Cluster %s/%s still has %d children - deleting them first", cluster.Namespace, cluster.Name, len(children))
140+
141+
var errs []error
142+
143+
for _, child := range children {
144+
accessor, err := meta.Accessor(child)
145+
if err != nil {
146+
klog.Errorf("Cluster %s/%s: couldn't create accessor for type %T: %v", cluster.Namespace, cluster.Name, child, err)
147+
continue
148+
}
149+
150+
if !accessor.GetDeletionTimestamp().IsZero() {
151+
// Don't handle deleted child
152+
continue
153+
}
154+
155+
gvk := child.GetObjectKind().GroupVersionKind().String()
156+
157+
klog.Infof("Cluster %s/%s: deleting child %s %s", cluster.Namespace, cluster.Name, gvk, accessor.GetName())
158+
if err := r.Delete(context.Background(), child); err != nil {
159+
err = errors.Wrapf(err, "error deleting cluster %s/%s: failed to delete %s %s", cluster.Namespace, cluster.Name, gvk, accessor.GetName())
160+
klog.Errorf(err.Error())
161+
errs = append(errs, err)
162+
}
163+
}
164+
165+
if len(errs) > 0 {
166+
return ctrl.Result{}, kerrors.NewAggregate(errs)
167+
}
168+
169+
// Requeue so we can check the next time to see if there are still any children left.
170+
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
171+
}
172+
173+
if cluster.Spec.InfrastructureRef != nil {
174+
_, err := external.Get(r.Client, cluster.Spec.InfrastructureRef, cluster.Namespace)
175+
switch {
176+
case apierrors.IsNotFound(err):
177+
// All good - the infra resource has been deleted
178+
case err != nil:
179+
return ctrl.Result{}, errors.Wrapf(err, "failed to get %s %q for Cluster %s/%s",
180+
path.Join(cluster.Spec.InfrastructureRef.APIVersion, cluster.Spec.InfrastructureRef.Kind),
181+
cluster.Spec.InfrastructureRef.Name, cluster.Namespace, cluster.Name)
182+
default:
183+
// The infra resource still exists. Once it's been deleted, the cluster will get processed again.
184+
// Return here so we don't remove the finalizer yet.
185+
return ctrl.Result{}, nil
186+
}
187+
}
188+
189+
cluster.Finalizers = util.Filter(cluster.Finalizers, clusterv1.ClusterFinalizer)
190+
191+
return ctrl.Result{}, nil
192+
}
193+
194+
// listChildren returns a list of MachineDeployments, MachineSets, and Machines than have an owner reference to cluster
195+
func (r *ClusterReconciler) listChildren(ctx context.Context, cluster *clusterv1.Cluster) ([]runtime.Object, error) {
196+
listOptions := []client.ListOption{
197+
client.InNamespace(cluster.Namespace),
198+
client.MatchingLabels(map[string]string{clusterv1.MachineClusterLabelName: cluster.Name}),
199+
}
200+
201+
machineDeployments := &clusterv1.MachineDeploymentList{}
202+
if err := r.Client.List(ctx, machineDeployments, listOptions...); err != nil {
203+
return nil, errors.Wrapf(err, "failed to list MachineDeployments for cluster %s/%s", cluster.Namespace, cluster.Name)
204+
}
205+
206+
machineSets := &clusterv1.MachineSetList{}
207+
if err := r.Client.List(ctx, machineSets, listOptions...); err != nil {
208+
return nil, errors.Wrapf(err, "failed to list MachineSets for cluster %s/%s", cluster.Namespace, cluster.Name)
209+
}
210+
211+
machines := &clusterv1.MachineList{}
212+
if err := r.Client.List(ctx, machines, listOptions...); err != nil {
213+
return nil, errors.Wrapf(err, "failed to list Machines for cluster %s/%s", cluster.Namespace, cluster.Name)
214+
}
215+
216+
var children []runtime.Object
217+
eachFunc := func(o runtime.Object) error {
218+
acc, err := meta.Accessor(o)
219+
if err != nil {
220+
klog.Errorf("Cluster %s/%s: couldn't create accessor for type %T: %v", cluster.Namespace, cluster.Name, o, err)
221+
return nil
222+
}
223+
224+
if util.PointsTo(acc.GetOwnerReferences(), &cluster.ObjectMeta) {
225+
children = append(children, o)
226+
}
227+
228+
return nil
229+
}
230+
231+
lists := map[string]runtime.Object{
232+
"MachineDeployment": machineDeployments,
233+
"MachineSet": machineSets,
234+
"Machine": machines,
235+
}
236+
for name, list := range lists {
237+
if err := meta.EachListItem(list, eachFunc); err != nil {
238+
return nil, errors.Wrapf(err, "error finding %s children of cluster %s/%s", name, cluster.Namespace, cluster.Name)
239+
}
240+
}
241+
242+
return children, nil
51243
}

0 commit comments

Comments
 (0)