Skip to content

Commit 155b88a

Browse files
committed
add: integration with hypershift
1 parent ea5942d commit 155b88a

File tree

5 files changed

+290
-106
lines changed

5 files changed

+290
-106
lines changed

controllers/appwrapper_controller.go

Lines changed: 88 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ package controllers
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"os"
2223
"strconv"
2324
"strings"
2425
"time"
2526

2627
mapiclientset "github.com/openshift/client-go/machine/clientset/versioned"
27-
machineinformersv1beta1 "github.com/openshift/client-go/machine/informers/externalversions"
2828
"github.com/openshift/client-go/machine/listers/machine/v1beta1"
2929

3030
appwrapperClientSet "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/clientset/versioned"
@@ -33,10 +33,13 @@ import (
3333
appwrapperlisters "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/listers/controller/v1beta1"
3434

3535
arbinformersFactory "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/informers/externalversions"
36+
37+
machineinformersv1beta1 "github.com/openshift/client-go/machine/informers/externalversions"
3638
apierrors "k8s.io/apimachinery/pkg/api/errors"
3739
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3840
"k8s.io/apimachinery/pkg/labels"
3941
"k8s.io/apimachinery/pkg/runtime"
42+
"k8s.io/client-go/informers"
4043
"k8s.io/client-go/kubernetes"
4144
"k8s.io/client-go/tools/cache"
4245
"k8s.io/klog"
@@ -51,15 +54,20 @@ type AppWrapperReconciler struct {
5154
Scheme *runtime.Scheme
5255
ConfigsNamespace string
5356
OcmSecretNamespace string
57+
KubeClient *kubernetes.Clientset
58+
InformerFactory informers.SharedInformerFactory
59+
StopCh chan struct{}
5460
}
5561

5662
var (
57-
scaledAppwrapper []string
58-
reuse = true
59-
ocmClusterID string
60-
ocmToken string
61-
useMachineSets bool
62-
63+
scaledAppwrapper []string
64+
scalingType string
65+
reuse = true
66+
ocmClusterID string
67+
ocmToken string
68+
useMachineSets bool
69+
useMachinePool bool
70+
useNodePool bool
6371
maxScaleNodesAllowed int
6472
msLister v1beta1.MachineSetLister
6573
machineLister v1beta1.MachineLister
@@ -92,46 +100,75 @@ const (
92100
//
93101
// For more details, check Reconcile and its Result here:
94102
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
103+
95104
func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
96105

97106
_ = log.FromContext(ctx)
98107

99108
var appwrapper arbv1.AppWrapper
100109
if err := r.Get(ctx, req.NamespacedName, &appwrapper); err != nil {
101110
if apierrors.IsNotFound(err) {
102-
//ignore not-found errors, since we can get them on delete requests.
111+
// ignore not-found errors, since we can get them on delete requests.
103112
return ctrl.Result{}, nil
104113
}
105114
klog.Error(err, "unable to fetch appwrapper")
106115
}
107116

108-
factory := machineinformersv1beta1.NewSharedInformerFactoryWithOptions(machineClient, resyncPeriod()(), machineinformersv1beta1.WithNamespace(""))
109-
informer := factory.Machine().V1beta1().MachineSets().Informer()
110-
msLister = factory.Machine().V1beta1().MachineSets().Lister()
111-
machineLister = factory.Machine().V1beta1().Machines().Lister()
117+
// This block can be moved outside the if-else condition if both the node and machineset use cases require it
112118

113-
// todo: Move the getOCMClusterID call out of reconcile loop.
114-
// Only reason we are calling it here is that the client is not able to make
115-
// calls until it is started, so SetupWithManager is not working.
116-
if !useMachineSets && ocmClusterID == "" {
117-
getOCMClusterID(r)
119+
if ocmClusterID == "" {
120+
ocmManager, err := NewOCMManager()
121+
if err != nil {
122+
klog.Error("Failed to initialize OCMManager:", err)
123+
return ctrl.Result{}, err
124+
}
125+
defer ocmManager.Close()
126+
127+
err = ocmManager.getOCMClusterID(r)
128+
if err != nil {
129+
klog.Error("Failed to get OCM Cluster ID:", err)
130+
return ctrl.Result{}, err
131+
}
118132
}
119133

120-
stopper := make(chan struct{})
121-
defer close(stopper)
122-
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
123-
AddFunc: onAdd,
124-
UpdateFunc: onUpdate,
125-
DeleteFunc: onDelete,
126-
})
127-
go informer.Run(stopper)
128-
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
129-
klog.Info("Wait for cache to sync")
134+
if !useNodePool {
135+
factory := machineinformersv1beta1.NewSharedInformerFactoryWithOptions(machineClient, resyncPeriod()(), machineinformersv1beta1.WithNamespace(""))
136+
informer := factory.Machine().V1beta1().MachineSets().Informer()
137+
msLister = factory.Machine().V1beta1().MachineSets().Lister()
138+
machineLister = factory.Machine().V1beta1().Machines().Lister()
139+
140+
stopper := make(chan struct{})
141+
defer close(stopper)
142+
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
143+
AddFunc: onAdd,
144+
UpdateFunc: onUpdate,
145+
DeleteFunc: onDelete,
146+
})
147+
go informer.Run(stopper)
148+
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
149+
klog.Info("Wait for cache to sync")
150+
}
151+
msInformerHasSynced = informer.HasSynced()
152+
153+
} else {
154+
nodeInformer := r.InformerFactory.Core().V1().Nodes().Informer()
155+
stopper := make(chan struct{})
156+
defer close(stopper)
157+
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
158+
AddFunc: onAdd,
159+
UpdateFunc: onUpdate,
160+
DeleteFunc: onDelete,
161+
})
162+
go nodeInformer.Run(stopper)
163+
if !cache.WaitForCacheSync(stopper, nodeInformer.HasSynced) {
164+
klog.Info("Failed to wait for node informer cache to sync")
165+
return ctrl.Result{}, fmt.Errorf("failed to wait for node informer cache to sync")
166+
}
167+
//TODO: do we need dual sync??
168+
msInformerHasSynced = nodeInformer.HasSynced()
130169
}
131-
//TODO: do we need dual sync??
132-
msInformerHasSynced = informer.HasSynced()
170+
133171
addAppwrappersThatNeedScaling()
134-
<-stopper
135172
return ctrl.Result{}, nil
136173
}
137174

@@ -166,13 +203,21 @@ func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error {
166203
}
167204

168205
useMachineSets = true
206+
useMachinePool = false
207+
useNodePool = false
208+
169209
ocmSecretExists := ocmSecretExists(r.OcmSecretNamespace)
170210
if ocmSecretExists {
171-
machinePoolsExists := machinePoolExists()
172-
173-
if machinePoolsExists {
211+
nodepoolexists := nodePoolExists()
212+
machinepoolexists := machinePoolExists()
213+
if nodepoolexists {
214+
useMachineSets = false
215+
useNodePool = true
216+
klog.Infof("Using node pools %v", nodepoolexists)
217+
} else if machinepoolexists {
174218
useMachineSets = false
175-
klog.Infof("Using machine pools %v", machinePoolsExists)
219+
useMachinePool = true
220+
klog.Infof("Setting useMachinePools to %v", machinepoolexists)
176221
} else {
177222
klog.Infof("Setting useMachineSets to %v", useMachineSets)
178223
}
@@ -245,7 +290,7 @@ func onAdd(obj interface{}) {
245290
demandPerInstanceType := discoverInstanceTypes(aw)
246291

247292
if demandPerInstanceType != nil {
248-
if (useMachineSets && canScaleMachineset(demandPerInstanceType)) || (!useMachineSets && canScaleMachinepool(demandPerInstanceType)) {
293+
if (useMachineSets && canScaleMachineset(demandPerInstanceType)) || (!useMachineSets && (canScaleNodepool(demandPerInstanceType) || canScaleMachinepool(demandPerInstanceType))) {
249294
scaleUp(aw, demandPerInstanceType)
250295
} else {
251296
klog.Infof("Cannot scale up replicas. The maximum allowed replicas is %v", maxScaleNodesAllowed)
@@ -312,6 +357,10 @@ func canScaleMachinepool(demandPerInstanceType map[string]int) bool {
312357
return true
313358
}
314359

360+
func canScaleNodepool(demandPerInstanceType map[string]int) bool {
361+
return true
362+
}
363+
315364
func scaleUp(aw *arbv1.AppWrapper, demandMapPerInstanceType map[string]int) {
316365
if msInformerHasSynced {
317366
//Assumption is made that the cluster has machineset configure that AW needs
@@ -321,6 +370,8 @@ func scaleUp(aw *arbv1.AppWrapper, demandMapPerInstanceType map[string]int) {
321370

322371
if useMachineSets {
323372
scaleMachineSet(aw, userRequestedInstanceType, replicas)
373+
} else if useNodePool {
374+
scaleNodePool(aw, userRequestedInstanceType, replicas)
324375
} else {
325376
scaleMachinePool(aw, userRequestedInstanceType, replicas)
326377
}
@@ -401,6 +452,8 @@ func onDelete(obj interface{}) {
401452
klog.Infof("Appwrapper deleted scale-down machineset: %s ", aw.Name)
402453
scaleDown(aw)
403454
}
455+
} else if useNodePool {
456+
deleteNodePool(aw)
404457
} else {
405458
deleteMachinePool(aw)
406459
}

controllers/machinepools.go

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,20 @@ package controllers
22

33
import (
44
"context"
5-
"fmt"
6-
ocmsdk "github.com/openshift-online/ocm-sdk-go"
75
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
8-
configv1 "github.com/openshift/api/config/v1"
96
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
10-
"k8s.io/apimachinery/pkg/types"
117
"k8s.io/klog"
12-
"os"
138
"strings"
149
)
1510

16-
func createOCMConnection() (*ocmsdk.Connection, error) {
17-
logger, err := ocmsdk.NewGoLoggerBuilder().
18-
Debug(false).
19-
Build()
20-
if err != nil {
21-
return nil, fmt.Errorf("can't build logger: %v", err)
22-
}
23-
24-
connection, err := ocmsdk.NewConnectionBuilder().
25-
Logger(logger).
26-
Tokens(ocmToken).
27-
Build()
28-
if err != nil {
29-
return nil, fmt.Errorf("can't build connection: %v", err)
30-
}
31-
32-
return connection, nil
33-
}
34-
3511
func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) {
36-
connection, err := createOCMConnection()
12+
ocmManager, err := NewOCMManager()
3713
if err != nil {
38-
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
39-
return
14+
klog.Error("Failed to initialize OCMManager:", err)
4015
}
41-
defer connection.Close()
16+
defer ocmManager.Close()
17+
18+
connection := ocmManager.Connection
4219

4320
clusterMachinePools := connection.ClustersMgmt().V1().Clusters().Cluster(ocmClusterID).MachinePools()
4421

@@ -59,12 +36,13 @@ func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, re
5936
}
6037

6138
func deleteMachinePool(aw *arbv1.AppWrapper) {
62-
connection, err := createOCMConnection()
39+
ocmManager, err := NewOCMManager()
6340
if err != nil {
64-
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
65-
return
41+
klog.Error("Failed to initialize OCMManager:", err)
6642
}
67-
defer connection.Close()
43+
defer ocmManager.Close()
44+
45+
connection := ocmManager.Connection
6846

6947
machinePoolsConnection := connection.ClustersMgmt().V1().Clusters().Cluster(ocmClusterID).MachinePools().List()
7048

@@ -84,47 +62,20 @@ func deleteMachinePool(aw *arbv1.AppWrapper) {
8462

8563
// Check if machine pools exist
8664
func machinePoolExists() bool {
87-
connection, err := createOCMConnection()
65+
ocmManager, err := NewOCMManager()
8866
if err != nil {
89-
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
67+
klog.Error("Failed to initialize OCMManager:", err)
9068
}
91-
defer connection.Close()
69+
defer ocmManager.Close()
70+
71+
connection := ocmManager.Connection
9272

9373
machinePools := connection.ClustersMgmt().V1().Clusters().Cluster(ocmClusterID).MachinePools()
9474
klog.Infof("Machine pools: %v", machinePools)
95-
return machinePools != nil
96-
}
97-
98-
// getOCMClusterID determines the internal clusterID to be used for OCM API calls
99-
func getOCMClusterID(r *AppWrapperReconciler) error {
100-
cv := &configv1.ClusterVersion{}
101-
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: "version"}, cv)
102-
if err != nil {
103-
return fmt.Errorf("can't get clusterversion: %v", err)
104-
}
105-
106-
internalClusterID := string(cv.Spec.ClusterID)
107-
108-
ctx := context.Background()
109-
110-
connection, err := createOCMConnection()
111-
if err != nil {
112-
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
113-
}
114-
defer connection.Close()
11575

116-
// Get the client for the resource that manages the collection of clusters:
117-
collection := connection.ClustersMgmt().V1().Clusters()
118-
119-
response, err := collection.List().Search(fmt.Sprintf("external_id = '%s'", internalClusterID)).Size(1).Page(1).SendContext(ctx)
120-
if err != nil {
121-
klog.Errorf(`Error getting cluster id: %v`, err)
122-
}
123-
124-
response.Items().Each(func(cluster *cmv1.Cluster) bool {
125-
ocmClusterID = cluster.ID()
126-
fmt.Printf("%s - %s - %s\n", cluster.ID(), cluster.Name(), cluster.State())
76+
if machinePools.List() != nil {
77+
klog.Infof("machine pools are present %v", machinePools)
12778
return true
128-
})
129-
return nil
79+
}
80+
return false
13081
}

0 commit comments

Comments
 (0)