diff --git a/docs/cassandra.rst b/docs/cassandra.rst index 179f605c1..94d26e9ca 100644 --- a/docs/cassandra.rst +++ b/docs/cassandra.rst @@ -193,6 +193,7 @@ Supported Configuration Changes Navigator supports the following changes to a Cassandra cluster: * :ref:`create-cluster-cassandra`: Add all initially configured node pools and nodes. + * :ref:`minor-upgrade-cassandra`: Trigger a rolling upgrade of Cassandra nodes by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``. * :ref:`scale-out-cassandra`: Increase ``CassandraCluster.Spec.NodePools[0].Replicas`` to add more C* nodes to a ``nodepool``. Navigator does not currently support any other changes to the Cassandra cluster configuration. @@ -202,7 +203,6 @@ Unsupported Configuration Changes The following configuration changes are not currently supported but will be supported in the near future: - * Minor Upgrade: Trigger a rolling Cassandra upgrade by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``. * Scale In: Decrease ``CassandraCluster.Spec.NodePools[0].Replicas`` to remove C* nodes from a ``nodepool``. The following configuration changes are not currently supported: @@ -222,6 +222,19 @@ in order of ``NodePool`` and according to the process described in :ref:`scale-o The order of node creation is determined by the order of the entries in the ``CassandraCluster.Spec.NodePools`` list. You can look at ``CassandraCluster.Status.NodePools`` to see the current state. +.. _minor-upgrade-cassandra: + +Minor Upgrade +~~~~~~~~~~~~~ + +If you increment the minor or patch number in ``CassandraCluster.Spec.Version``, Navigator will trigger a rolling update of the existing C* nodes. + +C* nodes are upgraded serially, in order of NodePool and Pod ordinal, starting with the pod with the highest ordinal in the first NodePool. + +`StatefulSet Rolling Updates `_ describes the update process in more detail. + +.. note:: Major version upgrades are not yet supported. + .. _scale-out-cassandra: Scale Out diff --git a/hack/e2e.sh b/hack/e2e.sh index 36dc79aef..051619682 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -266,12 +266,23 @@ function test_cassandracluster() { stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ get pilots \ - --output 'jsonpath={.items[0].status.cassandra.version}' + --selector "navigator.jetstack.io/cluster-type=CassandraCluster,navigator.jetstack.io/cluster-name=${CASS_NAME}" \ + --output 'jsonpath={.items[*].status.cassandra.version}' then kubectl --namespace "${namespace}" get pilots -o yaml fail_test "Pilots failed to report the expected version" fi + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get cassandracluster "${CASS_NAME}" \ + --output 'jsonpath={.status.nodePools.*.version}' + then + kubectl --namespace "${namespace}" get cassandracluster -o yaml + fail_test "NodePools failed to report the expected version" + fi + # Wait 5 minutes for cassandra to start and listen for CQL queries. if ! retry TIMEOUT=300 cql_connect \ "${namespace}" \ @@ -303,6 +314,43 @@ function test_cassandracluster() { --debug \ --execute="INSERT INTO space1.testtable1(key, value) VALUES('testkey1', 'testvalue1')" + # Upgrade to newer patch version + export CASS_VERSION="3.11.2" + kubectl apply \ + --namespace "${namespace}" \ + --filename \ + <(envsubst \ + '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_NODEPOOL1_DATACENTER:$CASS_NODEPOOL1_RACK:$CASS_NODEPOOL1_NAME:$CASS_REPLICAS:$CASS_VERSION' \ + < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") + + # The cluster is upgraded + if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \ + "navigator-controller:CassandraCluster:Normal:UpdateVersion" + then + fail_test "An UpdateVersion event was not recorded" + fi + + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get pilots \ + --selector "navigator.jetstack.io/cluster-type=CassandraCluster,navigator.jetstack.io/cluster-name=${CASS_NAME}" \ + --output 'jsonpath={.items[*].status.cassandra.version}' + then + kubectl --namespace "${namespace}" get pilots -o yaml + fail_test "Pilots failed to report the expected version" + fi + + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get cassandracluster "${CASS_NAME}" \ + --output 'jsonpath={.status.nodePools.*.version}' + then + kubectl --namespace "${namespace}" get cassandracluster -o yaml + fail_test "NodePools failed to report the expected version" + fi + # Delete the Cassandra pod and wait for the CQL service to become # unavailable (readiness probe fails) @@ -344,7 +392,7 @@ function test_cassandracluster() { # Get names of nodepool pods before the scale out (line separated) local original_pods_file="${ARTIFACTS_DIR}/test_cassandra.scale_out_original_pods" kubectl --namespace "${namespace}" get pods \ - --selector="navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \ + --selector "navigator.jetstack.io/cluster-type=CassandraCluster,navigator.jetstack.io/cluster-name=${CASS_NAME}" \ --output='jsonpath={range .items[*]}{.metadata.name}{"\n"}{end}' \ > "${original_pods_file}" diff --git a/internal/test/util/generate/generate.go b/internal/test/util/generate/generate.go index 2723e51ff..80f822b3d 100644 --- a/internal/test/util/generate/generate.go +++ b/internal/test/util/generate/generate.go @@ -22,8 +22,9 @@ func Pilot(c PilotConfig) *v1alpha1.Pilot { c.Namespace = "default" } labels := map[string]string{} - labels[v1alpha1.ElasticsearchClusterNameLabel] = c.Cluster - labels[v1alpha1.ElasticsearchNodePoolNameLabel] = c.NodePool + labels[v1alpha1.ClusterTypeLabel] = "ElasticsearchCluster" + labels[v1alpha1.ClusterNameLabel] = c.Cluster + labels[v1alpha1.NodePoolNameLabel] = c.NodePool var version *semver.Version if c.Version != "" { version = semver.New(c.Version) @@ -122,18 +123,24 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet { type CassandraClusterConfig struct { Name, Namespace string + Version *version.Version } func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster { - return &v1alpha1.CassandraCluster{ + o := &v1alpha1.CassandraCluster{ ObjectMeta: metav1.ObjectMeta{ Name: c.Name, Namespace: c.Namespace, }, - Spec: v1alpha1.CassandraClusterSpec{ - Version: *version.New("3.11.2"), - }, + Spec: v1alpha1.CassandraClusterSpec{}, + } + if c.Version == nil { + o.Spec.Version = *version.New("3.11.2") + } else { + o.Spec.Version = *c.Version } + + return o } type CassandraClusterNodePoolConfig struct { diff --git a/pkg/api/version/version.go b/pkg/api/version/version.go index 563144083..efb9bf00f 100644 --- a/pkg/api/version/version.go +++ b/pkg/api/version/version.go @@ -3,6 +3,7 @@ package version import ( "encoding/json" "strconv" + "strings" semver "github.com/hashicorp/go-version" ) @@ -59,6 +60,11 @@ func (v *Version) Semver() *semver.Version { return v.semver } +// TODO: Add tests for this +func (v *Version) LessThan(versionB *Version) bool { + return v.semver.LessThan(versionB.semver) +} + func (v *Version) UnmarshalJSON(data []byte) error { s, err := strconv.Unquote(string(data)) if err != nil { @@ -84,3 +90,31 @@ func (v Version) DeepCopy() Version { } return *New(v.String()) } + +func (v *Version) bump(i int) *Version { + v2 := v.DeepCopy() + parts := strings.Split(v2.Semver().String(), ".") + part, err := strconv.Atoi(parts[i]) + if err != nil { + panic(err) + } + part++ + parts[i] = strconv.Itoa(part) + return New(strings.Join(parts, ".")) +} + +func (v *Version) BumpMajor() *Version { + return v.bump(0) +} + +func (v *Version) BumpMinor() *Version { + return v.bump(1) +} + +func (v *Version) BumpPatch() *Version { + return v.bump(2) +} + +func (v *Version) Major() int64 { + return v.semver.Segments64()[0] +} diff --git a/pkg/apis/navigator/types.go b/pkg/apis/navigator/types.go index a7c95ee55..73ed6b47e 100644 --- a/pkg/apis/navigator/types.go +++ b/pkg/apis/navigator/types.go @@ -52,6 +52,7 @@ type CassandraClusterStatus struct { type CassandraClusterNodePoolStatus struct { ReadyReplicas int32 + Version *version.Version } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index 2b19742cb..bc7fbc944 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -10,14 +10,13 @@ import ( ) const ( - ElasticsearchClusterNameLabel = "navigator.jetstack.io/elasticsearch-cluster-name" - ElasticsearchNodePoolNameLabel = "navigator.jetstack.io/elasticsearch-node-pool-name" ElasticsearchNodePoolVersionAnnotation = "navigator.jetstack.io/elasticsearch-version" ElasticsearchRoleLabelPrefix = "navigator.jetstack.io/elasticsearch-role-" - CassandraClusterNameLabel = "navigator.jetstack.io/cassandra-cluster-name" - CassandraNodePoolNameLabel = "navigator.jetstack.io/cassandra-node-pool-name" - PilotLabel = "navigator.jetstack.io/has-pilot" + ClusterTypeLabel = "navigator.jetstack.io/cluster-type" + ClusterNameLabel = "navigator.jetstack.io/cluster-name" + NodePoolNameLabel = "navigator.jetstack.io/node-pool-name" + PilotLabel = "navigator.jetstack.io/has-pilot" ) // +genclient @@ -117,6 +116,12 @@ type CassandraClusterStatus struct { type CassandraClusterNodePoolStatus struct { // The number of replicas in the node pool that are currently 'Ready'. ReadyReplicas int32 `json:"readyReplicas"` + // The lowest version of Cassandra found to be running in this nodepool, + // as reported by the Cassandra process. + // nil or empty if the lowest version can not be determined, + // or if the lowest version has not yet been determined + // +optional + Version *version.Version `json:"version,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go index 516e6fc0e..766be7734 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go @@ -201,6 +201,7 @@ func Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNode func autoConvert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClusterNodePoolStatus(in *CassandraClusterNodePoolStatus, out *navigator.CassandraClusterNodePoolStatus, s conversion.Scope) error { out.ReadyReplicas = in.ReadyReplicas + out.Version = (*version.Version)(unsafe.Pointer(in.Version)) return nil } @@ -211,6 +212,7 @@ func Convert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClust func autoConvert_navigator_CassandraClusterNodePoolStatus_To_v1alpha1_CassandraClusterNodePoolStatus(in *navigator.CassandraClusterNodePoolStatus, out *CassandraClusterNodePoolStatus, s conversion.Scope) error { out.ReadyReplicas = in.ReadyReplicas + out.Version = (*version.Version)(unsafe.Pointer(in.Version)) return nil } diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go index 42d49bf70..c074db7e8 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go @@ -152,6 +152,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) { *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + if *in == nil { + *out = nil + } else { + *out = new(version.Version) + **out = **in + } + } return } @@ -207,7 +216,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) { in, out := &in.NodePools, &out.NodePools *out = make(map[string]CassandraClusterNodePoolStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + newVal := new(CassandraClusterNodePoolStatus) + val.DeepCopyInto(newVal) + (*out)[key] = *newVal } } return diff --git a/pkg/apis/navigator/validation/cassandra.go b/pkg/apis/navigator/validation/cassandra.go index cf17c800d..08bf6f611 100644 --- a/pkg/apis/navigator/validation/cassandra.go +++ b/pkg/apis/navigator/validation/cassandra.go @@ -44,13 +44,13 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. fldPath := field.NewPath("spec") - if !new.Spec.Version.Equal(&old.Spec.Version) { + if new.Spec.Version.LessThan(&old.Spec.Version) { allErrs = append( allErrs, field.Forbidden( fldPath.Child("version"), fmt.Sprintf( - "cannot change the version of an existing cluster. "+ + "cannot perform version downgrades. "+ "old version: %s, new version: %s", old.Spec.Version, new.Spec.Version, ), @@ -58,6 +58,16 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. ) } + if new.Spec.Version.Major() != old.Spec.Version.Major() { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + "cannot perform major version upgrades", + ), + ) + } + npPath := fldPath.Child("nodePools") for i, newNp := range new.Spec.NodePools { idxPath := npPath.Index(i) diff --git a/pkg/apis/navigator/validation/cassandra_test.go b/pkg/apis/navigator/validation/cassandra_test.go index 686169b6b..2873d26e2 100644 --- a/pkg/apis/navigator/validation/cassandra_test.go +++ b/pkg/apis/navigator/validation/cassandra_test.go @@ -178,15 +178,23 @@ func TestValidateCassandraClusterUpdate(t *testing.T) { new: validCassCluster, }, "downgrade not allowed": { - old: setVersion(validCassCluster, lowerVersion), - new: validCassCluster, + old: validCassCluster, + new: setVersion(validCassCluster, lowerVersion), errorExpected: true, }, - "upgrade not allowed": { + "major upgrade not allowed": { old: validCassCluster, - new: setVersion(validCassCluster, higherVersion), + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMajor()), errorExpected: true, }, + "minor version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()), + }, + "patch version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpPatch()), + }, } for title, persistence := range persistenceErrorCases { diff --git a/pkg/apis/navigator/zz_generated.deepcopy.go b/pkg/apis/navigator/zz_generated.deepcopy.go index 45b8402ea..319bd4cb3 100644 --- a/pkg/apis/navigator/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/zz_generated.deepcopy.go @@ -152,6 +152,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) { *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + if *in == nil { + *out = nil + } else { + *out = new(version.Version) + **out = **in + } + } return } @@ -207,7 +216,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) { in, out := &in.NodePools, &out.NodePools *out = make(map[string]CassandraClusterNodePoolStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + newVal := new(CassandraClusterNodePoolStatus) + val.DeepCopyInto(newVal) + (*out)[key] = *newVal } } return diff --git a/pkg/controllers/cassandra/actions/create_nodepool.go b/pkg/controllers/cassandra/actions/create_nodepool.go index 0f9cd0333..3aeadc409 100644 --- a/pkg/controllers/cassandra/actions/create_nodepool.go +++ b/pkg/controllers/cassandra/actions/create_nodepool.go @@ -4,6 +4,8 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/pkg/errors" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" @@ -27,7 +29,7 @@ func (a *CreateNodePool) Execute(s *controllers.State) error { return nil } if err != nil { - return err + return errors.Wrap(err, "unable to create statefulset") } s.Recorder.Eventf( a.Cluster, diff --git a/pkg/controllers/cassandra/actions/update_version.go b/pkg/controllers/cassandra/actions/update_version.go new file mode 100644 index 000000000..9d47e6a73 --- /dev/null +++ b/pkg/controllers/cassandra/actions/update_version.go @@ -0,0 +1,57 @@ +package actions + +import ( + "github.com/golang/glog" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" +) + +type UpdateVersion struct { + Cluster *v1alpha1.CassandraCluster + NodePool *v1alpha1.CassandraClusterNodePool +} + +var _ controllers.Action = &UpdateVersion{} + +func (a *UpdateVersion) Name() string { + return "UpdateVersion" +} + +func (a *UpdateVersion) Execute(s *controllers.State) error { + baseSet := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool) + existingSet, err := s.StatefulSetLister.StatefulSets(baseSet.Namespace).Get(baseSet.Name) + if err != nil { + return errors.Wrap(err, "unable to get statefulset") + } + newImage := baseSet.Spec.Template.Spec.Containers[0].Image + oldImage := existingSet.Spec.Template.Spec.Containers[0].Image + if newImage == oldImage { + glog.V(4).Infof( + "StatefulSet %q already has the desired image %q", + existingSet.Name, newImage, + ) + return nil + } + glog.V(4).Infof( + "Replacing StatefulSet %q image %q with %q", + existingSet.Name, oldImage, newImage, + ) + newSet := existingSet.DeepCopy() + newSet.Spec.Template.Spec.Containers[0].Image = newImage + _, err = s.Clientset.AppsV1beta1().StatefulSets(newSet.Namespace).Update(newSet) + if err != nil { + return errors.Wrap(err, "unable to update statefulset") + } + s.Recorder.Eventf( + a.Cluster, + corev1.EventTypeNormal, + a.Name(), + "UpdateVersion: NodePool=%q, Version=%q, Image=%q", + a.NodePool.Name, a.Cluster.Spec.Version, newImage, + ) + return nil +} diff --git a/pkg/controllers/cassandra/actions/update_version_test.go b/pkg/controllers/cassandra/actions/update_version_test.go new file mode 100644 index 000000000..88e0aff06 --- /dev/null +++ b/pkg/controllers/cassandra/actions/update_version_test.go @@ -0,0 +1,162 @@ +package actions_test + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + "github.com/jetstack/navigator/internal/test/util/generate" + "github.com/jetstack/navigator/pkg/api/version" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" +) + +func TestUpdateVersion(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster generate.CassandraClusterConfig + nodePool generate.CassandraClusterNodePoolConfig + expectedStatefulSet *generate.StatefulSetConfig + expectedErr bool + mutator func(*framework.StateFixture) + } + tests := map[string]testT{ + "Error if StatefulSet not listed": { + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedErr: true, + }, + "Error if clientset.Update fails (e.g. listed but not found)": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedErr: true, + mutator: func(f *framework.StateFixture) { + err := f.KubeClient(). + AppsV1beta1(). + StatefulSets("ns1"). + Delete("cass-cluster1-pool1", &metav1.DeleteOptions{}) + if err != nil { + f.T.Fatal(err) + } + }, + }, + "Idempotent: No error if the image already matches the actual image": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + Version: version.New("3.11.2"), + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + expectedErr: false, + }, + "The image is updated": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.1", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + Version: version.New("3.11.2"), + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + }, + } + + for name, test := range tests { + t.Run( + name, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + if test.mutator != nil { + test.mutator(fixture) + } + a := &actions.UpdateVersion{ + Cluster: generate.CassandraCluster(test.cluster), + NodePool: generate.CassandraClusterNodePool(test.nodePool), + } + err := a.Execute(state) + if err != nil { + t.Logf("The error returned by Execute was: %s", err) + } + if !test.expectedErr && err != nil { + t.Errorf("Unexpected error: %s", err) + } + if test.expectedErr && err == nil { + t.Errorf("Expected an error") + } + if test.expectedStatefulSet != nil { + actualStatefulSet, err := fixture.KubeClient(). + AppsV1beta1(). + StatefulSets(test.expectedStatefulSet.Namespace). + Get(test.expectedStatefulSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error retrieving statefulset: %v", err) + } + actualImage := actualStatefulSet.Spec.Template.Spec.Containers[0].Image + if test.expectedStatefulSet.Image != actualImage { + t.Errorf( + "Unexpected image. Expected: %s. Actual: %s", + test.expectedStatefulSet.Image, actualImage, + ) + } + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/cassandra.go b/pkg/controllers/cassandra/cassandra.go index 64421c1a5..f87494e05 100644 --- a/pkg/controllers/cassandra/cassandra.go +++ b/pkg/controllers/cassandra/cassandra.go @@ -26,6 +26,7 @@ import ( listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" + "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" "github.com/jetstack/navigator/pkg/controllers/cassandra/role" "github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding" "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" @@ -59,6 +60,7 @@ func NewCassandra( naviClient navigatorclientset.Interface, kubeClient kubernetes.Interface, cassClusters navigatorinformers.CassandraClusterInformer, + pilots navigatorinformers.PilotInformer, services coreinformers.ServiceInformer, statefulSets appsinformers.StatefulSetInformer, pods coreinformers.PodInformer, @@ -90,6 +92,12 @@ func NewCassandra( WorkFunc: cc.handleObject, }, ) + // An event handler to trigger status updates when pilots change + pilots.Informer().AddEventHandler( + &controllers.BlockingEventHandler{ + WorkFunc: cc.handleObject, + }, + ) cc.cassLister = cassClusters.Lister() cc.statefulSetLister = statefulSets.Lister() cc.cassListerSynced = cassClusters.Informer().HasSynced @@ -138,6 +146,11 @@ func NewCassandra( pods.Lister(), recorder, ), + pilot.NewControl( + naviClient, + pilots.Lister(), + recorder, + ), recorder, &controllers.State{ Clientset: kubeClient, @@ -312,6 +325,7 @@ func CassandraControllerFromContext(ctx *controllers.Context) *CassandraControll ctx.NavigatorClient, ctx.Client, ctx.SharedInformerFactory.Navigator().V1alpha1().CassandraClusters(), + ctx.SharedInformerFactory.Navigator().V1alpha1().Pilots(), ctx.KubeSharedInformerFactory.Core().V1().Services(), ctx.KubeSharedInformerFactory.Apps().V1beta1().StatefulSets(), ctx.KubeSharedInformerFactory.Core().V1().Pods(), diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index 21568cde7..8c62c7b26 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -2,13 +2,16 @@ package cassandra import ( "github.com/golang/glog" + "github.com/pkg/errors" apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" + "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" "github.com/jetstack/navigator/pkg/controllers/cassandra/role" "github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding" "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" @@ -27,6 +30,7 @@ const ( MessageErrorSyncService = "Error syncing service: %s" MessageErrorSyncNodePools = "Error syncing node pools: %s" MessageErrorSyncSeedLabels = "Error syncing seed labels: %s" + MessageErrorSyncPilots = "Error syncing pilots: %s" MessageErrorSync = "Error syncing: %s" MessageSuccessSync = "Successfully synced CassandraCluster" ) @@ -45,6 +49,7 @@ type defaultCassandraClusterControl struct { roleControl role.Interface roleBindingControl rolebinding.Interface seedLabellerControl seedlabeller.Interface + pilotControl pilot.Interface recorder record.EventRecorder state *controllers.State } @@ -57,6 +62,7 @@ func NewControl( roleControl role.Interface, roleBindingControl rolebinding.Interface, seedlabellerControl seedlabeller.Interface, + pilotControl pilot.Interface, recorder record.EventRecorder, state *controllers.State, ) ControlInterface { @@ -68,6 +74,7 @@ func NewControl( roleControl: roleControl, roleBindingControl: roleBindingControl, seedLabellerControl: seedlabellerControl, + pilotControl: pilotControl, recorder: recorder, state: state, } @@ -184,8 +191,21 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro return err } + err = e.pilotControl.Sync(c) + if err != nil { + e.recorder.Eventf( + c, + apiv1.EventTypeWarning, + ErrorSync, + MessageErrorSyncPilots, + err, + ) + return err + } + a := NextAction(c) if a != nil { + glog.V(4).Infof("Executing action: %#v", a) err = a.Execute(e.state) if err != nil { e.recorder.Eventf( @@ -195,10 +215,9 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSync, err, ) - return err + return errors.Wrap(err, "failure while executing action") } } - e.recorder.Event( c, apiv1.EventTypeNormal, @@ -227,5 +246,25 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { } } } + for _, np := range c.Spec.NodePools { + nps := c.Status.NodePools[np.Name] + if nps.Version == nil { + return nil + } + if c.Spec.Version.LessThan(nps.Version) { + glog.Error("Version downgrades are not supported") + return nil + } + if nps.Version.Major() != c.Spec.Version.Major() { + glog.Error("Major version upgrades are not supported") + return nil + } + if nps.Version.LessThan(&c.Spec.Version) { + return &actions.UpdateVersion{ + Cluster: c, + NodePool: &np, + } + } + } return nil } diff --git a/pkg/controllers/cassandra/cluster_control_test.go b/pkg/controllers/cassandra/cluster_control_test.go index f44ca542c..72da65671 100644 --- a/pkg/controllers/cassandra/cluster_control_test.go +++ b/pkg/controllers/cassandra/cluster_control_test.go @@ -1,60 +1,27 @@ package cassandra_test import ( - "fmt" "math/rand" "reflect" - "strings" "testing" "testing/quick" + "github.com/kr/pretty" + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers/cassandra" "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" ) -func CassandraClusterSummary(c *v1alpha1.CassandraCluster) string { - return fmt.Sprintf( - "%s/%s {Spec: %s, Status: %s}", - c.Namespace, c.Name, - CassandraClusterSpecSummary(c), - CassandraClusterStatusSummary(c), - ) -} - -func CassandraClusterSpecSummary(c *v1alpha1.CassandraCluster) string { - nodepools := make([]string, len(c.Spec.NodePools)) - for i, np := range c.Spec.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d", np.Name, np.Replicas) - } - return fmt.Sprintf( - "{nodepools: %s}", - strings.Join(nodepools, ", "), - ) -} - -func CassandraClusterStatusSummary(c *v1alpha1.CassandraCluster) string { - nodepools := make([]string, len(c.Status.NodePools)) - i := 0 - for title, nps := range c.Status.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d", title, nps.ReadyReplicas) - i++ - } - return fmt.Sprintf( - "{nodepools: %s}", strings.Join(nodepools, ", "), - ) -} - func TestNextAction(t *testing.T) { - f := func(c *v1alpha1.CassandraCluster) bool { - t.Log(CassandraClusterSummary(c)) + f := func(c *v1alpha1.CassandraCluster) (ret bool) { + defer func() { + if !ret { + t.Logf(pretty.Sprint(c)) + } + }() a := cassandra.NextAction(c) - if a != nil { - t.Log("Action:", a.Name()) - } else { - t.Log("No action") - } switch action := a.(type) { case *actions.CreateNodePool: _, found := c.Status.NodePools[action.NodePool.Name] @@ -72,16 +39,30 @@ func TestNextAction(t *testing.T) { t.Errorf("Unexpected attempt to scale up a nodepool with >= ready replicas") return false } + case *actions.UpdateVersion: + nps, found := c.Status.NodePools[action.NodePool.Name] + if !found { + t.Errorf("Unexpected UpdateVersion before status reported") + return false + } + if nps.Version == nil { + t.Errorf("Unexpected UpdateVersion before version reported") + return false + } + if nps.Version.Major() != c.Spec.Version.Major() { + t.Errorf("Unexpected UpdateVersion for major version change") + return false + } } return true } config := &quick.Config{ - MaxCount: 100, + MaxCount: 1000, Values: func(values []reflect.Value, rnd *rand.Rand) { cluster := &v1alpha1.CassandraCluster{} cluster.SetName("cluster1") cluster.SetNamespace("ns1") - casstesting.FuzzCassandraClusterNodePools(cluster, rnd, 0) + casstesting.FuzzCassandraCluster(cluster, rnd, 0) values[0] = reflect.ValueOf(cluster) }, } diff --git a/pkg/controllers/cassandra/nodepool/nodepool.go b/pkg/controllers/cassandra/nodepool/nodepool.go index 044b088f8..ce7cc3efe 100644 --- a/pkg/controllers/cassandra/nodepool/nodepool.go +++ b/pkg/controllers/cassandra/nodepool/nodepool.go @@ -45,7 +45,7 @@ func (e *defaultCassandraClusterNodepoolControl) updateStatus(cluster *v1alpha1. } // Create a NodePoolStatus for each statefulset that is controlled by this cluster. for _, ss := range sets { - npName := ss.Labels[v1alpha1.CassandraNodePoolNameLabel] + npName := ss.Labels[v1alpha1.NodePoolNameLabel] nps := cluster.Status.NodePools[npName] nps.ReadyReplicas = ss.Status.ReadyReplicas cluster.Status.NodePools[npName] = nps diff --git a/pkg/controllers/cassandra/nodepool/resource.go b/pkg/controllers/cassandra/nodepool/resource.go index a87730dd3..39228bde4 100644 --- a/pkg/controllers/cassandra/nodepool/resource.go +++ b/pkg/controllers/cassandra/nodepool/resource.go @@ -39,7 +39,7 @@ func StatefulSetForCluster( nodePoolLabels := util.NodePoolLabels(cluster, np.Name) podLabels := util.NodePoolLabels(cluster, np.Name) podLabels[v1alpha1.PilotLabel] = "" - image := cassImageToUse(&cluster.Spec) + image := CassImageToUse(&cluster.Spec) set := &apps.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/cassandra/nodepool/util.go b/pkg/controllers/cassandra/nodepool/util.go index 8a72ae4e2..cf99e5661 100644 --- a/pkg/controllers/cassandra/nodepool/util.go +++ b/pkg/controllers/cassandra/nodepool/util.go @@ -7,7 +7,7 @@ import ( "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" ) -func cassImageToUse(spec *v1alpha1.CassandraClusterSpec) *v1alpha1.ImageSpec { +func CassImageToUse(spec *v1alpha1.CassandraClusterSpec) *v1alpha1.ImageSpec { if spec.Image == nil { return defaultCassandraImageForVersion(spec.Version) } diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go new file mode 100644 index 000000000..8fbb24fa7 --- /dev/null +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -0,0 +1,151 @@ +package pilot + +import ( + "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + + "github.com/jetstack/navigator/pkg/api/version" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + navigator "github.com/jetstack/navigator/pkg/client/clientset/versioned" + navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers/cassandra/util" +) + +type Interface interface { + Sync(*v1alpha1.CassandraCluster) error +} + +type pilotControl struct { + naviClient navigator.Interface + pilots navlisters.PilotLister + recorder record.EventRecorder +} + +var _ Interface = &pilotControl{} + +func NewControl( + naviClient navigator.Interface, + pilots navlisters.PilotLister, + recorder record.EventRecorder, +) *pilotControl { + return &pilotControl{ + naviClient: naviClient, + pilots: pilots, + recorder: recorder, + } + +} + +// Sync +func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { + return c.updateDiscoveredVersions(cluster) +} + +func (c *pilotControl) updateDiscoveredVersions(cluster *v1alpha1.CassandraCluster) error { + glog.V(4).Infof("updateDiscoveredVersions for cluster: %s/%s", cluster.Namespace, cluster.Name) + selector, err := util.SelectorForCluster(cluster) + if err != nil { + return err + } + pilots, err := c.pilots.List(selector) + if err != nil { + return err + } + if len(pilots) < 1 { + glog.V(4).Infof("No pilots found matching selector: %s", selector) + } + for _, pilot := range pilots { + nodePoolNameForPilot, nodePoolNameFound := pilot.Labels[v1alpha1.NodePoolNameLabel] + if !nodePoolNameFound { + glog.Warningf("Skipping pilot without NodePoolNameLabelKey: %s", pilot.Name) + continue + } + nodePoolStatus := cluster.Status.NodePools[nodePoolNameForPilot] + switch { + case pilot.Status.Cassandra == nil: + glog.V(4).Infof( + "Pilot %s/%s has no status. Setting nodepool version to nil", + pilot.Namespace, pilot.Name, + ) + nodePoolStatus.Version = nil + case pilot.Status.Cassandra.Version == nil: + glog.V(4).Infof( + "Pilot %s/%s has not reported its version. Setting nodepool version to nil", + pilot.Namespace, pilot.Name, + ) + nodePoolStatus.Version = nil + case nodePoolStatus.Version == nil: + nodePoolStatus.Version = pilot.Status.Cassandra.Version + case pilot.Status.Cassandra.Version.LessThan(nodePoolStatus.Version): + glog.V(4).Infof( + "Found lower pilot version: %s, %s", + nodePoolNameForPilot, pilot.Status.Cassandra.Version, + ) + nodePoolStatus.Version = pilot.Status.Cassandra.Version + } + cluster.Status.NodePools[nodePoolNameForPilot] = nodePoolStatus + } + return nil +} + +func UpdateLabels( + o metav1.Object, + newLabels map[string]string, +) { + labels := o.GetLabels() + if labels == nil { + labels = map[string]string{} + } + for key, val := range newLabels { + labels[key] = val + } + o.SetLabels(labels) +} + +type PilotBuilder struct { + pilot *v1alpha1.Pilot +} + +func NewPilotBuilder() *PilotBuilder { + return &PilotBuilder{ + pilot: &v1alpha1.Pilot{}, + } +} + +func (pb *PilotBuilder) ForCluster(cluster metav1.Object) *PilotBuilder { + UpdateLabels(pb.pilot, util.ClusterLabels(cluster)) + pb.pilot.SetNamespace(cluster.GetNamespace()) + pb.pilot.SetOwnerReferences( + append( + pb.pilot.GetOwnerReferences(), + util.NewControllerRef(cluster), + ), + ) + return pb +} + +func (pb *PilotBuilder) ForNodePool(np *v1alpha1.CassandraClusterNodePool) *PilotBuilder { + UpdateLabels( + pb.pilot, + map[string]string{ + v1alpha1.NodePoolNameLabel: np.Name, + }, + ) + return pb +} + +func (pb *PilotBuilder) WithCassandraStatus() *PilotBuilder { + pb.pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} + return pb +} + +func (pb *PilotBuilder) WithDiscoveredCassandraVersion(v string) *PilotBuilder { + pb.WithCassandraStatus() + pb.pilot.Status.Cassandra.Version = version.New(v) + return pb +} + +func (pb *PilotBuilder) Build() *v1alpha1.Pilot { + return pb.pilot +} diff --git a/pkg/controllers/cassandra/pilot/pilot_test.go b/pkg/controllers/cassandra/pilot/pilot_test.go new file mode 100644 index 000000000..1ab61fd22 --- /dev/null +++ b/pkg/controllers/cassandra/pilot/pilot_test.go @@ -0,0 +1,137 @@ +package pilot_test + +import ( + "reflect" + "testing" + + "github.com/kr/pretty" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + "github.com/jetstack/navigator/pkg/api/version" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" + casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" +) + +func AssertClusterEqual(t *testing.T, c1, c2 *v1alpha1.CassandraCluster) { + if !reflect.DeepEqual(c1, c2) { + t.Errorf("Clusters are not equal: %s", pretty.Diff(c1, c2)) + } +} + +func TestStatusUpdate(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster *v1alpha1.CassandraCluster + assertions func(t *testing.T, original, updated *v1alpha1.CassandraCluster) + expectErr bool + } + cluster := casstesting.ClusterForTest() + tests := map[string]testT{ + "no matching pilots": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder().Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "nil cassandra status": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder().ForCluster(cluster).Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "nil cassandra version": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + WithCassandraStatus(). + Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "missing nodepool label": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "set version if missing": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + ForNodePool(&cluster.Spec.NodePools[0]). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: func(t *testing.T, inCluster, outCluster *v1alpha1.CassandraCluster) { + expectedVersion := version.New("3.11.2") + actualVersion := outCluster.Status.NodePools["region-1-zone-a"].Version + if actualVersion == nil || !expectedVersion.Equal(actualVersion) { + t.Errorf("Version mismatch. Expected %s != %s", expectedVersion, actualVersion) + } + }, + }, + "set version if lower": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + ForNodePool(&cluster.Spec.NodePools[0]). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: func(t *testing.T, inCluster, outCluster *v1alpha1.CassandraCluster) { + expectedVersion := version.New("3.11.2") + actualVersion := outCluster.Status.NodePools["region-1-zone-a"].Version + if actualVersion == nil || !expectedVersion.Equal(actualVersion) { + t.Errorf("Version mismatch. Expected %s != %s", expectedVersion, actualVersion) + } + }, + }, + } + + for title, test := range tests { + t.Run( + title, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + c := pilot.NewControl( + state.NavigatorClientset, + state.PilotLister, + state.Recorder, + ) + cluster = test.cluster.DeepCopy() + err := c.Sync(cluster) + if err != nil { + if !test.expectErr { + t.Errorf("Unexpected error: %s", err) + } + } else { + if test.expectErr { + t.Error("Missing error") + } + } + if test.assertions != nil { + test.assertions(t, test.cluster, cluster) + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/testing/gen.go b/pkg/controllers/cassandra/testing/gen.go index ca0155028..ca1eeb92f 100644 --- a/pkg/controllers/cassandra/testing/gen.go +++ b/pkg/controllers/cassandra/testing/gen.go @@ -4,10 +4,36 @@ import ( "fmt" "math/rand" + "github.com/jetstack/navigator/pkg/api/version" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/util/ptr" ) +var versions = []*version.Version{ + version.New("2.0.0"), + version.New("3.11"), + version.New("3.11.1"), + version.New("3.11.2"), + version.New("4.0.0"), +} + +func FuzzCassandraCluster(cluster *v1alpha1.CassandraCluster, rand *rand.Rand, size int) { + cluster.Spec.Version = *versions[rand.Intn(len(versions))] + FuzzCassandraClusterNodePools(cluster, rand, size) + // 20% chance of patch upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpPatch() + } + // 20% chance of minor upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpMinor() + } + // 20% chance of major upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpMajor() + } +} + func FuzzCassandraNodePool(np *v1alpha1.CassandraClusterNodePool, rand *rand.Rand, size int) { np.Replicas = ptr.Int32(rand.Int31n(5)) } @@ -26,6 +52,15 @@ func FuzzCassandraClusterNodePools(cluster *v1alpha1.CassandraCluster, rand *ran FuzzCassandraNodePool(&np, rand, size) nps := v1alpha1.CassandraClusterNodePoolStatus{ ReadyReplicas: *np.Replicas, + Version: version.New(cluster.Spec.Version.String()), + } + // 20% chance of too new version + if rand.Intn(4) == 0 { + nps.Version = nps.Version.BumpMajor() + } + // 20% chance of unreported version + if rand.Intn(4) == 0 { + nps.Version = nil } // 20% chance of ScaleOut if rand.Intn(4) == 0 { diff --git a/pkg/controllers/cassandra/testing/testing.go b/pkg/controllers/cassandra/testing/testing.go index 10fa9414d..93eaaedfe 100644 --- a/pkg/controllers/cassandra/testing/testing.go +++ b/pkg/controllers/cassandra/testing/testing.go @@ -17,6 +17,9 @@ func ClusterForTest() *v1alpha1.CassandraCluster { }, }, }, + Status: v1alpha1.CassandraClusterStatus{ + NodePools: map[string]v1alpha1.CassandraClusterNodePoolStatus{}, + }, } c.SetName("cassandra-1") c.SetNamespace("app-1") diff --git a/pkg/controllers/cassandra/util/util.go b/pkg/controllers/cassandra/util/util.go index 3e3436452..b9a759a54 100644 --- a/pkg/controllers/cassandra/util/util.go +++ b/pkg/controllers/cassandra/util/util.go @@ -19,7 +19,7 @@ const ( kindName = "CassandraCluster" ) -func NewControllerRef(c *v1alpha1.CassandraCluster) metav1.OwnerReference { +func NewControllerRef(c metav1.Object) metav1.OwnerReference { return *metav1.NewControllerRef(c, schema.GroupVersionKind{ Group: navigator.GroupName, Version: "v1alpha1", @@ -47,23 +47,32 @@ func PilotRBACRoleName(c *v1alpha1.CassandraCluster) string { return fmt.Sprintf("%s-pilot", ResourceBaseName(c)) } -func ClusterLabels(c *v1alpha1.CassandraCluster) map[string]string { +func ClusterLabels(c metav1.Object) map[string]string { return map[string]string{ "app": "cassandracluster", - v1alpha1.CassandraClusterNameLabel: c.Name, + v1alpha1.ClusterTypeLabel: kindName, + v1alpha1.ClusterNameLabel: c.GetName(), } } func SelectorForCluster(c *v1alpha1.CassandraCluster) (labels.Selector, error) { + clusterTypeReq, err := labels.NewRequirement( + v1alpha1.ClusterTypeLabel, + selection.Equals, + []string{kindName}, + ) + if err != nil { + return nil, err + } clusterNameReq, err := labels.NewRequirement( - v1alpha1.CassandraClusterNameLabel, + v1alpha1.ClusterNameLabel, selection.Equals, []string{c.Name}, ) if err != nil { return nil, err } - return labels.NewSelector().Add(*clusterNameReq), nil + return labels.NewSelector().Add(*clusterTypeReq, *clusterNameReq), nil } func NodePoolLabels( @@ -71,7 +80,7 @@ func NodePoolLabels( poolName string, ) map[string]string { labels := ClusterLabels(c) - labels[v1alpha1.CassandraNodePoolNameLabel] = poolName + labels[v1alpha1.NodePoolNameLabel] = poolName return labels } diff --git a/pkg/controllers/elasticsearch/actions/scale_test.go b/pkg/controllers/elasticsearch/actions/scale_test.go index 8914cf462..b1fe66466 100644 --- a/pkg/controllers/elasticsearch/actions/scale_test.go +++ b/pkg/controllers/elasticsearch/actions/scale_test.go @@ -411,8 +411,9 @@ func pilotWithNameOwner(name, clusterName, nodePoolName string) *v1alpha1.Pilot if p.Labels == nil { p.Labels = map[string]string{} } - p.Labels[v1alpha1.ElasticsearchClusterNameLabel] = clusterName - p.Labels[v1alpha1.ElasticsearchNodePoolNameLabel] = nodePoolName + p.Labels[v1alpha1.ClusterTypeLabel] = "ElasticsearchCluster" + p.Labels[v1alpha1.ClusterNameLabel] = clusterName + p.Labels[v1alpha1.NodePoolNameLabel] = nodePoolName return p } diff --git a/pkg/controllers/elasticsearch/cluster_control.go b/pkg/controllers/elasticsearch/cluster_control.go index 6e1d054bf..328f0befa 100644 --- a/pkg/controllers/elasticsearch/cluster_control.go +++ b/pkg/controllers/elasticsearch/cluster_control.go @@ -298,7 +298,7 @@ func (e *defaultElasticsearchClusterControl) reconcileNodePools(c *v1alpha1.Elas // of a valid node pool for sets for _, np := range c.Spec.NodePools { for i, ss := range sets { - if ss.Labels != nil && ss.Labels[v1alpha1.ElasticsearchNodePoolNameLabel] == np.Name { + if ss.Labels != nil && ss.Labels[v1alpha1.NodePoolNameLabel] == np.Name { sets = append(sets[:i], sets[i+1:]...) break } diff --git a/pkg/controllers/elasticsearch/util/nodepool.go b/pkg/controllers/elasticsearch/util/nodepool.go index 2f0b98c97..bc547af38 100644 --- a/pkg/controllers/elasticsearch/util/nodepool.go +++ b/pkg/controllers/elasticsearch/util/nodepool.go @@ -11,14 +11,15 @@ import ( func ClusterLabels(c *v1alpha1.ElasticsearchCluster) map[string]string { return map[string]string{ - v1alpha1.ElasticsearchClusterNameLabel: c.Name, + v1alpha1.ClusterTypeLabel: kindName, + v1alpha1.ClusterNameLabel: c.Name, } } func NodePoolLabels(c *v1alpha1.ElasticsearchCluster, poolName string, roles ...v1alpha1.ElasticsearchClusterRole) map[string]string { labels := ClusterLabels(c) if poolName != "" { - labels[v1alpha1.ElasticsearchNodePoolNameLabel] = poolName + labels[v1alpha1.NodePoolNameLabel] = poolName } for _, role := range roles { labels[RoleLabel(role)] = "true" @@ -35,7 +36,7 @@ func NodePoolResourceName(c *v1alpha1.ElasticsearchCluster, np *v1alpha1.Elastic } func SelectorForNodePool(clusterName, poolName string) (labels.Selector, error) { - nodePoolNameReq, err := labels.NewRequirement(v1alpha1.ElasticsearchNodePoolNameLabel, selection.Equals, []string{poolName}) + nodePoolNameReq, err := labels.NewRequirement(v1alpha1.NodePoolNameLabel, selection.Equals, []string{poolName}) if err != nil { return nil, err } diff --git a/pkg/controllers/elasticsearch/util/util.go b/pkg/controllers/elasticsearch/util/util.go index 135afd97c..a233e8363 100644 --- a/pkg/controllers/elasticsearch/util/util.go +++ b/pkg/controllers/elasticsearch/util/util.go @@ -28,9 +28,13 @@ func ResourceBaseName(c *v1alpha1.ElasticsearchCluster) string { } func SelectorForCluster(clusterName string) (labels.Selector, error) { - clusterNameReq, err := labels.NewRequirement(v1alpha1.ElasticsearchClusterNameLabel, selection.Equals, []string{clusterName}) + clusterTypeReq, err := labels.NewRequirement(v1alpha1.ClusterTypeLabel, selection.Equals, []string{kindName}) if err != nil { return nil, err } - return labels.NewSelector().Add(*clusterNameReq), nil + clusterNameReq, err := labels.NewRequirement(v1alpha1.ClusterNameLabel, selection.Equals, []string{clusterName}) + if err != nil { + return nil, err + } + return labels.NewSelector().Add(*clusterTypeReq, *clusterNameReq), nil } diff --git a/pkg/controllers/pilotcontroller/pilot.go b/pkg/controllers/pilotcontroller/pilot.go index 988ec6615..5ffcecbcb 100644 --- a/pkg/controllers/pilotcontroller/pilot.go +++ b/pkg/controllers/pilotcontroller/pilot.go @@ -230,6 +230,11 @@ func PilotForPod(pod *v1.Pod) *v1alpha1.Pilot { }, ), }, + Labels: map[string]string{ + v1alpha1.ClusterTypeLabel: pod.Labels[v1alpha1.ClusterTypeLabel], + v1alpha1.ClusterNameLabel: pod.Labels[v1alpha1.ClusterNameLabel], + v1alpha1.NodePoolNameLabel: pod.Labels[v1alpha1.NodePoolNameLabel], + }, }, } } diff --git a/pkg/controllers/pilotcontroller/pilot_test.go b/pkg/controllers/pilotcontroller/pilot_test.go index 875d17267..890454cd3 100644 --- a/pkg/controllers/pilotcontroller/pilot_test.go +++ b/pkg/controllers/pilotcontroller/pilot_test.go @@ -130,7 +130,7 @@ func TestPilotControllerIntegration(t *testing.T) { Kind: "StatefulSet", }, ObjectMeta: metav1.ObjectMeta{ - Name: "ss1", + Name: "np1", Namespace: "ns1", OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(cc1, cc1.GroupVersionKind()), @@ -140,10 +140,13 @@ func TestPilotControllerIntegration(t *testing.T) { pod1 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "p1", + Name: "np1-0", Namespace: "ns1", Labels: map[string]string{ - v1alpha1.PilotLabel: "", + v1alpha1.ClusterTypeLabel: "CassandraCluster", + v1alpha1.ClusterNameLabel: "cc1", + v1alpha1.NodePoolNameLabel: "np1", + v1alpha1.PilotLabel: "", }, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(ss1, ss1.GroupVersionKind()), diff --git a/pkg/pilot/cassandra/v3/pilot.go b/pkg/pilot/cassandra/v3/pilot.go index 817d9c0e9..0f3a70919 100644 --- a/pkg/pilot/cassandra/v3/pilot.go +++ b/pkg/pilot/cassandra/v3/pilot.go @@ -10,6 +10,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/golang/glog" + "github.com/pkg/errors" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/cassandra/nodetool" @@ -85,11 +86,12 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { if pilot.Status.Cassandra == nil { pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} } - version, err := p.nodeTool.Version() - if err != nil { + if err == nil { + glog.V(4).Infof("Got Cassandra version: %s", version) + } else { + glog.Errorf("Error while getting Cassandra version: %s", err) pilot.Status.Cassandra.Version = nil - glog.Errorf("error while getting Cassandra version: %s", err) } pilot.Status.Cassandra.Version = version return nil @@ -98,7 +100,7 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { func localNodeUpAndNormal(nodeTool nodetool.Interface) error { nodes, err := nodeTool.Status() if err != nil { - return err + return errors.Wrap(err, "unable to get cluster status") } localNode := nodes.LocalNode() if localNode == nil { diff --git a/pkg/pilot/genericpilot/controller.go b/pkg/pilot/genericpilot/controller.go index 8f7fec4e7..8cb5961a8 100644 --- a/pkg/pilot/genericpilot/controller.go +++ b/pkg/pilot/genericpilot/controller.go @@ -2,6 +2,7 @@ package genericpilot import ( "github.com/golang/glog" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -124,7 +125,7 @@ func (g *GenericPilot) updatePilotStatus(pilot *v1alpha1.Pilot) error { // perform update in API _, err := g.client.NavigatorV1alpha1().Pilots(pilot.Namespace).UpdateStatus(pilot) - return err + return errors.Wrap(err, "unable to update pilot status") } func (g *GenericPilot) constructProcess(pilot *v1alpha1.Pilot) error {