Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 11 additions & 22 deletions controllers/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,12 @@ func onAdd(obj interface{}) {
if aw.Status.State == arbv1.AppWrapperStateEnqueued || aw.Status.State == "" && aw.Labels["orderedinstance"] != "" {
//scaledAppwrapper = append(scaledAppwrapper, aw.Name)
demandPerInstanceType := discoverInstanceTypes(aw)
//TODO: simplify the looping

if demandPerInstanceType != nil {
if useMachineSets {
if canScaleMachineset(demandPerInstanceType) {
scaleUp(aw, demandPerInstanceType)
} else {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
}
if (useMachineSets && canScaleMachineset(demandPerInstanceType)) || (!useMachineSets && canScaleMachinepool(demandPerInstanceType)) {
scaleUp(aw, demandPerInstanceType)
} else {
if canScaleMachinepool(demandPerInstanceType) {
scaleUp(aw, demandPerInstanceType)
} else {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
}
klog.Infof("Cannot scale up replicas. The maximum allowed replicas is %v", maxScaleNodesAllowed)
}
}
}
Expand All @@ -271,9 +263,11 @@ func onUpdate(old, new interface{}) {
klog.Info("Job completed, deleting resources owned")
deleteMachineSet(aw)
}

if contains(scaledAppwrapper, aw.Name) {
return
}

pending, aw := IsAwPending()
if pending {
demandPerInstanceType := discoverInstanceTypes(aw)
Expand All @@ -283,9 +277,7 @@ func onUpdate(old, new interface{}) {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
}
}

}

}

func discoverInstanceTypes(aw *arbv1.AppWrapper) map[string]int {
Expand Down Expand Up @@ -344,11 +336,13 @@ func IsAwPending() (false bool, aw *arbv1.AppWrapper) {
if err != nil {
klog.Fatalf("Error listing: %v", err)
}

for _, aw := range queuedJobs {
//skip
if contains(scaledAppwrapper, aw.Name) {
continue
}

status := aw.Status.State
allconditions := aw.Status.Conditions
for _, condition := range allconditions {
Expand Down Expand Up @@ -381,13 +375,9 @@ func findExactMatch(aw *arbv1.AppWrapper) *arbv1.AppWrapper {
if eachAw.Status.State != "Pending" {
continue
}
for k, v := range eachAw.Labels {
if k == "orderedinstance" {
if v == existingAcquiredMachineTypes {
match = eachAw
klog.Infof("Found exact match, %v appwrapper has acquire machinetypes %v", eachAw.Name, existingAcquiredMachineTypes)
}
}
if eachAw.Labels["orderedinstance"] == existingAcquiredMachineTypes {
match = eachAw
klog.Infof("Found exact match, %v appwrapper has acquired machinetypes %v", eachAw.Name, existingAcquiredMachineTypes)
}
}
return match
Expand All @@ -405,7 +395,6 @@ func onDelete(obj interface{}) {
swapNodeLabels(aw, matchedAw)
} else {
klog.Infof("Appwrapper %s deleted, scaling down machines", aw.Name)

scaleDown(aw)
}
} else {
Expand Down
71 changes: 21 additions & 50 deletions controllers/machinepools.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
ocmsdk "github.com/openshift-online/ocm-sdk-go"
cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
"github.com/openshift-online/ocm-sdk-go/logging"
configv1 "github.com/openshift/api/config/v1"
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -14,23 +13,30 @@ import (
"strings"
)

func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) {
func createOCMConnection() (*ocmsdk.Connection, error) {
logger, err := ocmsdk.NewGoLoggerBuilder().
Debug(false).
Build()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err)
os.Exit(1)
return nil, fmt.Errorf("can't build logger: %v", err)
}

// Create the connection, and remember to close it:
connection, err := ocmsdk.NewConnectionBuilder().
Logger(logger).
Tokens(ocmToken).
Build()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err)
os.Exit(1)
return nil, fmt.Errorf("can't build connection: %v", err)
}

return connection, nil
}

func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) {
connection, err := createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
return
}
defer connection.Close()

Expand All @@ -53,23 +59,13 @@ func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, re
}

func deleteMachinePool(aw *arbv1.AppWrapper) {

logger, err := ocmsdk.NewGoLoggerBuilder().
Debug(false).
Build()
connection, err := createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err)
os.Exit(1)
}
connection, err := ocmsdk.NewConnectionBuilder().
Logger(logger).
Tokens(ocmToken).
Build()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err)
os.Exit(1)
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
return
}
defer connection.Close()

machinePoolsConnection := connection.ClustersMgmt().V1().Clusters().Cluster(ocmClusterID).MachinePools().List()

machinePoolsListResponse, _ := machinePoolsConnection.Send()
Expand All @@ -88,20 +84,9 @@ func deleteMachinePool(aw *arbv1.AppWrapper) {

// Check if machine pools exist
func machinePoolExists() bool {
logger, err := ocmsdk.NewGoLoggerBuilder().
Debug(false).
Build()
connection, err := createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err)
os.Exit(1)
}
connection, err := ocmsdk.NewConnectionBuilder().
Logger(logger).
Tokens(ocmToken).
Build()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err)
os.Exit(1)
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
}
defer connection.Close()

Expand All @@ -112,7 +97,6 @@ func machinePoolExists() bool {

// getOCMClusterID determines the internal clusterID to be used for OCM API calls
func getOCMClusterID(r *AppWrapperReconciler) error {

cv := &configv1.ClusterVersion{}
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: "version"}, cv)
if err != nil {
Expand All @@ -123,22 +107,9 @@ func getOCMClusterID(r *AppWrapperReconciler) error {

ctx := context.Background()

// Create a logger that has the debug level enabled:
logger, err := logging.NewGoLoggerBuilder().
Debug(false).
Build()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build logger: %v\n", err)
os.Exit(1)
}

connection, err := ocmsdk.NewConnectionBuilder().
Logger(logger).
Tokens(ocmToken).
Build()
connection, err := createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Can't build connection: %v\n", err)
os.Exit(1)
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
}
defer connection.Close()

Expand Down