Skip to content
This repository was archived by the owner on Apr 4, 2023. It is now read-only.

Commit 490933c

Browse files
committed
Add CreateNodePool and ScaleOut actions for Cassandra.
* These become the only changes supported by the Cassandra controller. * ScaleIn and CassandraUpgrade actions will be implemented in followup branches. Fixes: #253
1 parent d7293b7 commit 490933c

File tree

19 files changed

+782
-130
lines changed

19 files changed

+782
-130
lines changed

contrib/charts/navigator/templates/rbac.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ items:
8787
name: "{{ template "fullname" . }}:controller"
8888
rules:
8989
- apiGroups: ["navigator.jetstack.io"]
90-
resources: ["elasticsearchclusters", "pilots", "elasticsearchclusters/status", "pilots/status", "cassandraclusters"]
90+
resources: ["elasticsearchclusters", "pilots", "elasticsearchclusters/status", "pilots/status", "cassandraclusters", "cassandraclusters/status"]
9191
verbs: ["*"]
9292
- apiGroups: [""]
9393
resources: ["services", "configmaps", "serviceaccounts", "pods"]

hack/e2e.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,12 @@ function test_cassandracluster() {
217217
fail_test "Failed to create cassandracluster"
218218
fi
219219

220-
kubectl get cassandraclusters -n "${namespace}" -o yaml
220+
# A NodePool is created
221+
if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \
222+
"navigator-controller:CassandraCluster:Normal:CreateNodePool"
223+
then
224+
fail_test "A CreateNodePool event was not recorded"
225+
fi
221226

222227
# A Pilot is elected leader
223228
if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \
@@ -330,6 +335,13 @@ function test_cassandracluster() {
330335
'$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \
331336
< "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml")
332337

338+
# The NodePool is scaled out
339+
if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \
340+
"navigator-controller:CassandraCluster:Normal:ScaleOut"
341+
then
342+
fail_test "A ScaleOut event was not recorded"
343+
fi
344+
333345
if ! retry TIMEOUT=300 stdout_equals 2 kubectl \
334346
--namespace "${namespace}" \
335347
get statefulsets \

internal/test/util/generate/generate.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package generate
22

33
import (
4+
"testing"
5+
46
"github.com/coreos/go-semver/semver"
57
apps "k8s.io/api/apps/v1beta1"
68
core "k8s.io/api/core/v1"
@@ -118,3 +120,46 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet {
118120
},
119121
}
120122
}
123+
124+
func AssertStatefulSetMatches(t *testing.T, expected StatefulSetConfig, actual *apps.StatefulSet) {
125+
if actual.Name != expected.Name {
126+
t.Errorf("Name %q != %q", expected.Name, actual.Name)
127+
}
128+
if actual.Namespace != expected.Namespace {
129+
t.Errorf("Namespace %q != %q", expected.Namespace, actual.Namespace)
130+
}
131+
if expected.Replicas != nil {
132+
if actual.Spec.Replicas == nil {
133+
t.Errorf("Replicas %d != %v", *expected.Replicas, nil)
134+
} else {
135+
if *actual.Spec.Replicas != *expected.Replicas {
136+
t.Errorf("Replicas %d != %d", *expected.Replicas, *actual.Spec.Replicas)
137+
}
138+
}
139+
}
140+
}
141+
142+
type CassandraClusterConfig struct {
143+
Name, Namespace string
144+
}
145+
146+
func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster {
147+
return &v1alpha1.CassandraCluster{
148+
ObjectMeta: metav1.ObjectMeta{
149+
Name: c.Name,
150+
Namespace: c.Namespace,
151+
},
152+
}
153+
}
154+
155+
type CassandraClusterNodePoolConfig struct {
156+
Name string
157+
Replicas int32
158+
}
159+
160+
func CassandraClusterNodePool(c CassandraClusterNodePoolConfig) *v1alpha1.CassandraClusterNodePool {
161+
return &v1alpha1.CassandraClusterNodePool{
162+
Name: c.Name,
163+
Replicas: c.Replicas,
164+
}
165+
}

pkg/apis/navigator/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type CassandraClusterStatus struct {
4949
}
5050

5151
type CassandraClusterNodePoolStatus struct {
52-
ReadyReplicas int64
52+
ReadyReplicas int32
5353
}
5454

5555
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

pkg/apis/navigator/v1alpha1/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type CassandraClusterSpec struct {
4646
// CassandraClusterNodePool describes a node pool within a CassandraCluster.
4747
type CassandraClusterNodePool struct {
4848
Name string `json:"name"`
49-
Replicas int64 `json:"replicas"`
49+
Replicas int32 `json:"replicas"`
5050

5151
// Persistence specifies the configuration for persistent data for this
5252
// node.
@@ -73,7 +73,7 @@ type CassandraClusterStatus struct {
7373
}
7474

7575
type CassandraClusterNodePoolStatus struct {
76-
ReadyReplicas int64 `json:"readyReplicas"`
76+
ReadyReplicas int32 `json:"readyReplicas"`
7777
}
7878

7979
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

pkg/apis/navigator/v1alpha1/zz_generated.conversion.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func Convert_navigator_CassandraClusterList_To_v1alpha1_CassandraClusterList(in
164164

165165
func autoConvert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNodePool(in *CassandraClusterNodePool, out *navigator.CassandraClusterNodePool, s conversion.Scope) error {
166166
out.Name = in.Name
167-
out.Replicas = in.Replicas
167+
out.Replicas = int64(in.Replicas)
168168
if err := Convert_v1alpha1_PersistenceConfig_To_navigator_PersistenceConfig(&in.Persistence, &out.Persistence, s); err != nil {
169169
return err
170170
}
@@ -181,7 +181,7 @@ func Convert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNode
181181

182182
func autoConvert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNodePool(in *navigator.CassandraClusterNodePool, out *CassandraClusterNodePool, s conversion.Scope) error {
183183
out.Name = in.Name
184-
out.Replicas = in.Replicas
184+
out.Replicas = int32(in.Replicas)
185185
if err := Convert_navigator_PersistenceConfig_To_v1alpha1_PersistenceConfig(&in.Persistence, &out.Persistence, s); err != nil {
186186
return err
187187
}
@@ -220,7 +220,17 @@ func autoConvert_v1alpha1_CassandraClusterSpec_To_navigator_CassandraClusterSpec
220220
if err := Convert_v1alpha1_NavigatorClusterConfig_To_navigator_NavigatorClusterConfig(&in.NavigatorClusterConfig, &out.NavigatorClusterConfig, s); err != nil {
221221
return err
222222
}
223-
out.NodePools = *(*[]navigator.CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools))
223+
if in.NodePools != nil {
224+
in, out := &in.NodePools, &out.NodePools
225+
*out = make([]navigator.CassandraClusterNodePool, len(*in))
226+
for i := range *in {
227+
if err := Convert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNodePool(&(*in)[i], &(*out)[i], s); err != nil {
228+
return err
229+
}
230+
}
231+
} else {
232+
out.NodePools = nil
233+
}
224234
out.Image = (*navigator.ImageSpec)(unsafe.Pointer(in.Image))
225235
out.CqlPort = in.CqlPort
226236
out.Version = in.Version
@@ -236,7 +246,17 @@ func autoConvert_navigator_CassandraClusterSpec_To_v1alpha1_CassandraClusterSpec
236246
if err := Convert_navigator_NavigatorClusterConfig_To_v1alpha1_NavigatorClusterConfig(&in.NavigatorClusterConfig, &out.NavigatorClusterConfig, s); err != nil {
237247
return err
238248
}
239-
out.NodePools = *(*[]CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools))
249+
if in.NodePools != nil {
250+
in, out := &in.NodePools, &out.NodePools
251+
*out = make([]CassandraClusterNodePool, len(*in))
252+
for i := range *in {
253+
if err := Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNodePool(&(*in)[i], &(*out)[i], s); err != nil {
254+
return err
255+
}
256+
}
257+
} else {
258+
out.NodePools = nil
259+
}
240260
out.Version = in.Version
241261
out.Image = (*ImageSpec)(unsafe.Pointer(in.Image))
242262
out.CqlPort = in.CqlPort
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package actions
2+
3+
import (
4+
corev1 "k8s.io/api/core/v1"
5+
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
6+
7+
"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
8+
"github.com/jetstack/navigator/pkg/controllers"
9+
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
10+
)
11+
12+
type CreateNodePool struct {
13+
Cluster *v1alpha1.CassandraCluster
14+
NodePool *v1alpha1.CassandraClusterNodePool
15+
}
16+
17+
var _ controllers.Action = &CreateNodePool{}
18+
19+
func (a *CreateNodePool) Name() string {
20+
return "CreateNodePool"
21+
}
22+
23+
func (a *CreateNodePool) Execute(s *controllers.State) error {
24+
ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool)
25+
_, err := s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Create(ss)
26+
if k8sErrors.IsAlreadyExists(err) {
27+
return nil
28+
}
29+
if err == nil {
30+
s.Recorder.Eventf(
31+
a.Cluster,
32+
corev1.EventTypeNormal,
33+
a.Name(),
34+
"Created node pool %q", a.NodePool.Name,
35+
)
36+
}
37+
return err
38+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package actions_test
2+
3+
import (
4+
"testing"
5+
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
9+
"github.com/jetstack/navigator/internal/test/unit/framework"
10+
"github.com/jetstack/navigator/internal/test/util/generate"
11+
"github.com/jetstack/navigator/pkg/controllers/cassandra/actions"
12+
)
13+
14+
func TestCreateNodePool(t *testing.T) {
15+
type testT struct {
16+
kubeObjects []runtime.Object
17+
navObjects []runtime.Object
18+
cluster generate.CassandraClusterConfig
19+
nodePool generate.CassandraClusterNodePoolConfig
20+
expectedStatefulSet generate.StatefulSetConfig
21+
expectedErr bool
22+
}
23+
tests := map[string]testT{
24+
"A statefulset is created if one does not already exist": {
25+
cluster: generate.CassandraClusterConfig{
26+
Name: "cluster1",
27+
Namespace: "ns1",
28+
},
29+
nodePool: generate.CassandraClusterNodePoolConfig{
30+
Name: "pool1",
31+
},
32+
expectedStatefulSet: generate.StatefulSetConfig{
33+
Name: "cass-cluster1-pool1",
34+
Namespace: "ns1",
35+
Replicas: int32Ptr(0),
36+
},
37+
},
38+
"Idempotent: CreateNodePool can be executed again without error": {
39+
kubeObjects: []runtime.Object{
40+
generate.StatefulSet(
41+
generate.StatefulSetConfig{
42+
Name: "cass-cluster1-pool1",
43+
Namespace: "ns1",
44+
Replicas: int32Ptr(10),
45+
},
46+
),
47+
},
48+
cluster: generate.CassandraClusterConfig{Name: "cluster1", Namespace: "ns1"},
49+
nodePool: generate.CassandraClusterNodePoolConfig{
50+
Name: "pool1",
51+
},
52+
expectedStatefulSet: generate.StatefulSetConfig{
53+
Name: "cass-cluster1-pool1",
54+
Namespace: "ns1",
55+
Replicas: int32Ptr(10),
56+
},
57+
expectedErr: false,
58+
},
59+
}
60+
61+
for name, test := range tests {
62+
t.Run(
63+
name,
64+
func(t *testing.T) {
65+
fixture := &framework.StateFixture{
66+
T: t,
67+
KubeObjects: test.kubeObjects,
68+
NavigatorObjects: test.navObjects,
69+
}
70+
fixture.Start()
71+
defer fixture.Stop()
72+
state := fixture.State()
73+
a := &actions.CreateNodePool{
74+
Cluster: generate.CassandraCluster(test.cluster),
75+
NodePool: generate.CassandraClusterNodePool(test.nodePool),
76+
}
77+
err := a.Execute(state)
78+
if !test.expectedErr && err != nil {
79+
t.Errorf("Unexpected error: %s", err)
80+
}
81+
if test.expectedErr && err == nil {
82+
t.Errorf("Expected an error")
83+
}
84+
actualStatefulSet, err := fixture.KubeClient().
85+
AppsV1beta1().
86+
StatefulSets(test.expectedStatefulSet.Namespace).
87+
Get(test.expectedStatefulSet.Name, metav1.GetOptions{})
88+
if err != nil {
89+
t.Fatalf("Unexpected error retrieving statefulset: %v", err)
90+
}
91+
generate.AssertStatefulSetMatches(t, test.expectedStatefulSet, actualStatefulSet)
92+
},
93+
)
94+
}
95+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package actions
2+
3+
import (
4+
"fmt"
5+
6+
corev1 "k8s.io/api/core/v1"
7+
8+
"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
9+
"github.com/jetstack/navigator/pkg/controllers"
10+
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
11+
)
12+
13+
type ScaleOut struct {
14+
Cluster *v1alpha1.CassandraCluster
15+
NodePool *v1alpha1.CassandraClusterNodePool
16+
}
17+
18+
var _ controllers.Action = &ScaleOut{}
19+
20+
func (a *ScaleOut) Name() string {
21+
return "ScaleOut"
22+
}
23+
24+
func (a *ScaleOut) Execute(s *controllers.State) error {
25+
ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool)
26+
ss, err := s.StatefulSetLister.StatefulSets(ss.Namespace).Get(ss.Name)
27+
if err != nil {
28+
return err
29+
}
30+
ss = ss.DeepCopy()
31+
if ss.Spec.Replicas == nil || *ss.Spec.Replicas < a.NodePool.Replicas {
32+
ss.Spec.Replicas = &a.NodePool.Replicas
33+
_, err = s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Update(ss)
34+
if err == nil {
35+
s.Recorder.Eventf(
36+
a.Cluster,
37+
corev1.EventTypeNormal,
38+
a.Name(),
39+
"Scaled node pool %q to %d replicas", a.NodePool.Name, a.NodePool.Replicas,
40+
)
41+
}
42+
return err
43+
}
44+
if *ss.Spec.Replicas > a.NodePool.Replicas {
45+
return fmt.Errorf(
46+
"the NodePool.Replicas value (%d) "+
47+
"is less than the existing StatefulSet.Replicas value (%d)",
48+
a.NodePool.Replicas, *ss.Spec.Replicas,
49+
)
50+
}
51+
return nil
52+
}

0 commit comments

Comments
 (0)