diff --git a/controllers/appwrapper_controller.go b/controllers/appwrapper_controller.go index 01a1ced..04f2a05 100644 --- a/controllers/appwrapper_controller.go +++ b/controllers/appwrapper_controller.go @@ -243,20 +243,12 @@ func onAdd(obj interface{}) { if aw.Status.State == arbv1.AppWrapperStateEnqueued || aw.Status.State == "" && aw.Labels["orderedinstance"] != "" { //scaledAppwrapper = append(scaledAppwrapper, aw.Name) demandPerInstanceType := discoverInstanceTypes(aw) - //TODO: simplify the looping + if demandPerInstanceType != nil { - if useMachineSets { - if canScaleMachineset(demandPerInstanceType) { - scaleUp(aw, demandPerInstanceType) - } else { - klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed) - } + if (useMachineSets && canScaleMachineset(demandPerInstanceType)) || (!useMachineSets && canScaleMachinepool(demandPerInstanceType)) { + scaleUp(aw, demandPerInstanceType) } else { - if canScaleMachinepool(demandPerInstanceType) { - scaleUp(aw, demandPerInstanceType) - } else { - klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed) - } + klog.Infof("Cannot scale up replicas. The maximum allowed replicas is %v", maxScaleNodesAllowed) } } } @@ -271,9 +263,11 @@ func onUpdate(old, new interface{}) { klog.Info("Job completed, deleting resources owned") deleteMachineSet(aw) } + if contains(scaledAppwrapper, aw.Name) { return } + pending, aw := IsAwPending() if pending { demandPerInstanceType := discoverInstanceTypes(aw) @@ -283,9 +277,7 @@ func onUpdate(old, new interface{}) { klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed) } } - } - } func discoverInstanceTypes(aw *arbv1.AppWrapper) map[string]int { @@ -344,11 +336,13 @@ func IsAwPending() (false bool, aw *arbv1.AppWrapper) { if err != nil { klog.Fatalf("Error listing: %v", err) } + for _, aw := range queuedJobs { //skip if contains(scaledAppwrapper, aw.Name) { continue } + status := aw.Status.State allconditions := aw.Status.Conditions for _, condition := range allconditions { @@ -381,13 +375,9 @@ func findExactMatch(aw *arbv1.AppWrapper) *arbv1.AppWrapper { if eachAw.Status.State != "Pending" { continue } - for k, v := range eachAw.Labels { - if k == "orderedinstance" { - if v == existingAcquiredMachineTypes { - match = eachAw - klog.Infof("Found exact match, %v appwrapper has acquire machinetypes %v", eachAw.Name, existingAcquiredMachineTypes) - } - } + if eachAw.Labels["orderedinstance"] == existingAcquiredMachineTypes { + match = eachAw + klog.Infof("Found exact match, %v appwrapper has acquired machinetypes %v", eachAw.Name, existingAcquiredMachineTypes) } } return match @@ -405,7 +395,6 @@ func onDelete(obj interface{}) { swapNodeLabels(aw, matchedAw) } else { klog.Infof("Appwrapper %s deleted, scaling down machines", aw.Name) - scaleDown(aw) } } else { diff --git a/controllers/machinepools.go b/controllers/machinepools.go index 53de061..edf07b2 100644 --- a/controllers/machinepools.go +++ b/controllers/machinepools.go @@ -5,7 +5,6 @@ import ( "fmt" ocmsdk "github.com/openshift-online/ocm-sdk-go" cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1" - "github.com/openshift-online/ocm-sdk-go/logging" configv1 "github.com/openshift/api/config/v1" arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" "k8s.io/apimachinery/pkg/types" @@ -14,23 +13,30 @@ import ( "strings" ) -func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) { +func createOCMConnection() (*ocmsdk.Connection, error) { logger, err := ocmsdk.NewGoLoggerBuilder(). Debug(false). Build() if err != nil { - fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err) - os.Exit(1) + return nil, fmt.Errorf("can't build logger: %v", err) } - // Create the connection, and remember to close it: connection, err := ocmsdk.NewConnectionBuilder(). Logger(logger). Tokens(ocmToken). Build() if err != nil { - fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err) - os.Exit(1) + return nil, fmt.Errorf("can't build connection: %v", err) + } + + return connection, nil +} + +func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) { + connection, err := createOCMConnection() + if err != nil { + fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err) + return } defer connection.Close() @@ -53,23 +59,13 @@ func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, re } func deleteMachinePool(aw *arbv1.AppWrapper) { - - logger, err := ocmsdk.NewGoLoggerBuilder(). - Debug(false). - Build() + connection, err := createOCMConnection() if err != nil { - fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err) - os.Exit(1) - } - connection, err := ocmsdk.NewConnectionBuilder(). - Logger(logger). - Tokens(ocmToken). - Build() - if err != nil { - fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err) - os.Exit(1) + fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err) + return } defer connection.Close() + machinePoolsConnection := connection.ClustersMgmt().V1().Clusters().Cluster(ocmClusterID).MachinePools().List() machinePoolsListResponse, _ := machinePoolsConnection.Send() @@ -88,20 +84,9 @@ func deleteMachinePool(aw *arbv1.AppWrapper) { // Check if machine pools exist func machinePoolExists() bool { - logger, err := ocmsdk.NewGoLoggerBuilder(). - Debug(false). - Build() + connection, err := createOCMConnection() if err != nil { - fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err) - os.Exit(1) - } - connection, err := ocmsdk.NewConnectionBuilder(). - Logger(logger). - Tokens(ocmToken). - Build() - if err != nil { - fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err) - os.Exit(1) + fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err) } defer connection.Close() @@ -112,7 +97,6 @@ func machinePoolExists() bool { // getOCMClusterID determines the internal clusterID to be used for OCM API calls func getOCMClusterID(r *AppWrapperReconciler) error { - cv := &configv1.ClusterVersion{} err := r.Client.Get(context.TODO(), types.NamespacedName{Name: "version"}, cv) if err != nil { @@ -123,22 +107,9 @@ func getOCMClusterID(r *AppWrapperReconciler) error { ctx := context.Background() - // Create a logger that has the debug level enabled: - logger, err := logging.NewGoLoggerBuilder(). - Debug(false). - Build() - if err != nil { - fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err) - os.Exit(1) - } - - connection, err := ocmsdk.NewConnectionBuilder(). - Logger(logger). - Tokens(ocmToken). - Build() + connection, err := createOCMConnection() if err != nil { - fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err) - os.Exit(1) + fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err) } defer connection.Close()