Skip to content
This repository was archived by the owner on Apr 4, 2023. It is now read-only.

Commit 2ad596b

Browse files
Merge pull request #270 from wallrj/seed-pod-label-controller
Automatic merge from submit-queue. Move seed pod labelling to a separate controller * In #256 I want to refactor nodepool.Sync so that its only responsibility is to update the NodePool status. * And the seed labelling seems like it is a separate concern that can live in its own module. * With its own unit tests. **Release note**: ```release-note NONE ```
2 parents 3ad0b4f + e23ce4e commit 2ad596b

File tree

9 files changed

+277
-47
lines changed

9 files changed

+277
-47
lines changed

hack/e2e.sh

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,6 @@ function test_cassandracluster() {
223223
fail_test "Cassandra pilots did not elect a leader"
224224
fi
225225

226-
seed_label=$(kubectl get pods --namespace "${namespace}" \
227-
cass-${CASS_NAME}-ringnodes-0 \
228-
-o jsonpath='{.metadata.labels.seed}')
229-
if [ "$seed_label" != "true" ]; then
230-
fail_test "First cassandra node not marked as seed"
231-
fi
232-
233226
# Wait 5 minutes for cassandra to start and listen for CQL queries.
234227
if ! retry TIMEOUT=300 cql_connect \
235228
"${namespace}" \
@@ -326,6 +319,17 @@ function test_cassandracluster() {
326319
fail_test "Second cassandra node did not become ready"
327320
fi
328321

322+
# TODO: A better test would be to query the endpoints and check that only
323+
# the `-0` pods are included. E.g.
324+
# kubectl -n test-cassandra-1519754828-19864 get ep cass-cassandra-1519754828-19864-cassandra-seedprovider -o "jsonpath={.subsets[*].addresses[*].hostname}"
325+
if ! stdout_equals "cass-${CASS_NAME}-ringnodes-0" \
326+
kubectl get pods --namespace "${namespace}" \
327+
--selector=navigator.jetstack.io/cassandra-seed=true \
328+
--output 'jsonpath={.items[*].metadata.name}'
329+
then
330+
fail_test "First cassandra node not marked as seed"
331+
fi
332+
329333
simulate_unresponsive_cassandra_process \
330334
"${namespace}" \
331335
"cass-${CASS_NAME}-ringnodes-0" \

pkg/controllers/cassandra/cassandra.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
2929
"github.com/jetstack/navigator/pkg/controllers/cassandra/role"
3030
"github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding"
31+
"github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller"
3132
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
3233
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
3334
"github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount"
@@ -110,7 +111,6 @@ func NewCassandra(
110111
nodepool.NewControl(
111112
kubeClient,
112113
statefulSets.Lister(),
113-
pods.Lister(),
114114
recorder,
115115
),
116116
pilot.NewControl(
@@ -135,6 +135,12 @@ func NewCassandra(
135135
roleBindings.Lister(),
136136
recorder,
137137
),
138+
seedlabeller.NewControl(
139+
kubeClient,
140+
statefulSets.Lister(),
141+
pods.Lister(),
142+
recorder,
143+
),
138144
recorder,
139145
)
140146
cc.recorder = recorder

pkg/controllers/cassandra/cluster_control.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
1111
"github.com/jetstack/navigator/pkg/controllers/cassandra/role"
1212
"github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding"
13+
"github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller"
1314
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
1415
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
1516
"github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount"
@@ -27,6 +28,7 @@ const (
2728
MessageErrorSyncService = "Error syncing service: %s"
2829
MessageErrorSyncNodePools = "Error syncing node pools: %s"
2930
MessageErrorSyncPilots = "Error syncing pilots: %s"
31+
MessageErrorSyncSeedLabels = "Error syncing seed labels: %s"
3032
MessageSuccessSync = "Successfully synced CassandraCluster"
3133
)
3234

@@ -44,6 +46,7 @@ type defaultCassandraClusterControl struct {
4446
serviceAccountControl serviceaccount.Interface
4547
roleControl role.Interface
4648
roleBindingControl rolebinding.Interface
49+
seedLabellerControl seedlabeller.Interface
4750
recorder record.EventRecorder
4851
}
4952

@@ -55,6 +58,7 @@ func NewControl(
5558
serviceAccountControl serviceaccount.Interface,
5659
roleControl role.Interface,
5760
roleBindingControl rolebinding.Interface,
61+
seedlabellerControl seedlabeller.Interface,
5862
recorder record.EventRecorder,
5963
) ControlInterface {
6064
return &defaultCassandraClusterControl{
@@ -65,6 +69,7 @@ func NewControl(
6569
serviceAccountControl: serviceAccountControl,
6670
roleControl: roleControl,
6771
roleBindingControl: roleBindingControl,
72+
seedLabellerControl: seedlabellerControl,
6873
recorder: recorder,
6974
}
7075
}
@@ -148,6 +153,17 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro
148153
)
149154
return err
150155
}
156+
err = e.seedLabellerControl.Sync(c)
157+
if err != nil {
158+
e.recorder.Eventf(
159+
c,
160+
apiv1.EventTypeWarning,
161+
ErrorSync,
162+
MessageErrorSyncSeedLabels,
163+
err,
164+
)
165+
return err
166+
}
151167
e.recorder.Event(
152168
c,
153169
apiv1.EventTypeNormal,

pkg/controllers/cassandra/nodepool/nodepool.go

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
package nodepool
22

33
import (
4-
"fmt"
5-
6-
"github.com/golang/glog"
7-
appsv1beta1 "k8s.io/api/apps/v1beta1"
84
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
95
"k8s.io/client-go/kubernetes"
106
appslisters "k8s.io/client-go/listers/apps/v1beta1"
11-
corelisters "k8s.io/client-go/listers/core/v1"
127
"k8s.io/client-go/tools/record"
138

149
v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
@@ -22,7 +17,6 @@ type Interface interface {
2217
type defaultCassandraClusterNodepoolControl struct {
2318
kubeClient kubernetes.Interface
2419
statefulSetLister appslisters.StatefulSetLister
25-
pods corelisters.PodLister
2620
recorder record.EventRecorder
2721
}
2822

@@ -31,13 +25,11 @@ var _ Interface = &defaultCassandraClusterNodepoolControl{}
3125
func NewControl(
3226
kubeClient kubernetes.Interface,
3327
statefulSetLister appslisters.StatefulSetLister,
34-
pods corelisters.PodLister,
3528
recorder record.EventRecorder,
3629
) Interface {
3730
return &defaultCassandraClusterNodepoolControl{
3831
kubeClient: kubeClient,
3932
statefulSetLister: statefulSetLister,
40-
pods: pods,
4133
recorder: recorder,
4234
}
4335
}
@@ -76,30 +68,6 @@ func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets(
7668
return nil
7769
}
7870

79-
func (e *defaultCassandraClusterNodepoolControl) labelSeedNodes(
80-
cluster *v1alpha1.CassandraCluster,
81-
set *appsv1beta1.StatefulSet,
82-
) error {
83-
// TODO: make number of seed nodes configurable
84-
pod, err := e.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, 0))
85-
if err != nil {
86-
glog.Warningf("Couldn't get stateful set pod: %v", err)
87-
return nil
88-
}
89-
90-
// only label if the current label is incorrect
91-
if pod.Labels["seed"] != "true" {
92-
podCopy := pod.DeepCopy()
93-
podCopy.Labels["seed"] = "true"
94-
_, err := e.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy)
95-
if err != nil {
96-
return err
97-
}
98-
}
99-
100-
return nil
101-
}
102-
10371
func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet(
10472
cluster *v1alpha1.CassandraCluster,
10573
nodePool *v1alpha1.CassandraClusterNodePool,
@@ -120,11 +88,6 @@ func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet(
12088
return err
12189
}
12290

123-
err = e.labelSeedNodes(cluster, existingSet)
124-
if err != nil {
125-
return err
126-
}
127-
12891
_, err = client.Update(desiredSet)
12992
return err
13093
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package seedlabeller
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/golang/glog"
7+
appsv1beta1 "k8s.io/api/apps/v1beta1"
8+
"k8s.io/client-go/kubernetes"
9+
appslisters "k8s.io/client-go/listers/apps/v1beta1"
10+
corelisters "k8s.io/client-go/listers/core/v1"
11+
"k8s.io/client-go/tools/record"
12+
13+
"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
14+
"github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
15+
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
16+
)
17+
18+
type Interface interface {
19+
Sync(*v1alpha1.CassandraCluster) error
20+
}
21+
22+
type defaultSeedLabeller struct {
23+
kubeClient kubernetes.Interface
24+
statefulSetLister appslisters.StatefulSetLister
25+
pods corelisters.PodLister
26+
recorder record.EventRecorder
27+
}
28+
29+
var _ Interface = &defaultSeedLabeller{}
30+
31+
func NewControl(
32+
kubeClient kubernetes.Interface,
33+
statefulSetLister appslisters.StatefulSetLister,
34+
pods corelisters.PodLister,
35+
recorder record.EventRecorder,
36+
) Interface {
37+
return &defaultSeedLabeller{
38+
kubeClient: kubeClient,
39+
statefulSetLister: statefulSetLister,
40+
pods: pods,
41+
recorder: recorder,
42+
}
43+
}
44+
45+
func (c *defaultSeedLabeller) labelSeedNodes(
46+
cluster *v1alpha1.CassandraCluster,
47+
set *appsv1beta1.StatefulSet,
48+
) error {
49+
// TODO: make number of seed nodes configurable
50+
pod, err := c.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, 0))
51+
if err != nil {
52+
glog.Warningf("Couldn't get stateful set pod: %v", err)
53+
return nil
54+
}
55+
labels := pod.Labels
56+
value := labels[seedprovider.SeedLabelKey]
57+
if value == seedprovider.SeedLabelValue {
58+
return nil
59+
}
60+
if labels == nil {
61+
labels = map[string]string{}
62+
}
63+
labels[seedprovider.SeedLabelKey] = seedprovider.SeedLabelValue
64+
podCopy := pod.DeepCopy()
65+
podCopy.SetLabels(labels)
66+
_, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy)
67+
return err
68+
}
69+
70+
func (c *defaultSeedLabeller) Sync(cluster *v1alpha1.CassandraCluster) error {
71+
sets, err := util.StatefulSetsForCluster(cluster, c.statefulSetLister)
72+
if err != nil {
73+
return err
74+
}
75+
for _, s := range sets {
76+
err = c.labelSeedNodes(cluster, s)
77+
if err != nil {
78+
return err
79+
}
80+
}
81+
return nil
82+
}

0 commit comments

Comments
 (0)