Skip to content

Commit 0f7b96c

Browse files
committed
Add leader election
1 parent c57a28c commit 0f7b96c

File tree

8 files changed

+116
-22
lines changed

8 files changed

+116
-22
lines changed

internal/framework/events/event.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,7 @@ type DeleteEvent struct {
2323
// NamespacedName is the namespace & name of the deleted resource.
2424
NamespacedName types.NamespacedName
2525
}
26+
27+
// NewLeaderEvent represents an NGF Pod becoming leader. This is used to trigger the event handler to batch process
28+
// events and update nginx conf when no resource has been changed.
29+
type NewLeaderEvent struct{}

internal/framework/runnables/runnables.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,32 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
3434
return false
3535
}
3636

37-
// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes
37+
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
3838
// the leader.
39-
type EnableAfterBecameLeader struct {
39+
type CallFunctionsAfterBecameLeader struct {
4040
enable func(context.Context)
41+
leader func()
4142
}
4243

4344
var (
44-
_ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{}
45-
_ manager.Runnable = &EnableAfterBecameLeader{}
45+
_ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{}
46+
_ manager.Runnable = &CallFunctionsAfterBecameLeader{}
4647
)
4748

48-
// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable.
49-
func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader {
50-
return &EnableAfterBecameLeader{
49+
// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
50+
func NewCallFunctionsAfterBecameLeader(enable func(context.Context), leader func()) *CallFunctionsAfterBecameLeader {
51+
return &CallFunctionsAfterBecameLeader{
5152
enable: enable,
53+
leader: leader,
5254
}
5355
}
5456

55-
func (j *EnableAfterBecameLeader) Start(ctx context.Context) error {
57+
func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
5658
j.enable(ctx)
59+
j.leader()
5760
return nil
5861
}
5962

60-
func (j *EnableAfterBecameLeader) NeedLeaderElection() bool {
63+
func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool {
6164
return true
6265
}

internal/framework/runnables/runnables_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,23 @@ func TestLeaderOrNonLeader(t *testing.T) {
2323
g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse())
2424
}
2525

26-
func TestEnableAfterBecameLeader(t *testing.T) {
26+
func TestCallFunctionsAfterBecameLeader(t *testing.T) {
2727
t.Parallel()
2828
enabled := false
29-
enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) {
30-
enabled = true
31-
})
29+
leader := false
30+
31+
callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader(
32+
func(_ context.Context) { enabled = true },
33+
func() { leader = true },
34+
)
3235

3336
g := NewWithT(t)
34-
g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
37+
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
3538
g.Expect(enabled).To(BeFalse())
3639

37-
err := enableAfterBecameLeader.Start(context.Background())
40+
err := callFunctionsAfterBecameLeader.Start(context.Background())
3841
g.Expect(err).ToNot(HaveOccurred())
3942

4043
g.Expect(enabled).To(BeTrue())
44+
g.Expect(leader).To(BeTrue())
4145
}

internal/mode/static/handler.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,31 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
161161
h.cfg.metricsCollector.ObserveLastEventBatchProcessTime(duration)
162162
}()
163163

164+
var newLeader bool
164165
for _, event := range batch {
165-
h.parseAndCaptureEvent(ctx, logger, event)
166+
switch event.(type) {
167+
case *events.NewLeaderEvent:
168+
newLeader = true
169+
default:
170+
h.parseAndCaptureEvent(ctx, logger, event)
171+
}
166172
}
167173

168174
changeType, gr := h.cfg.processor.Process()
169175

176+
// if there is a newLeader event in the EventBatch, we want to generate and update nginx conf,
177+
// so regardless of what came back from Process(), we want to update the nginx conf with the latest graph
178+
if newLeader {
179+
changeType = state.ClusterStateChange
180+
gr = h.cfg.processor.GetLatestGraph()
181+
}
182+
183+
// if this Pod is not the leader or does not have the leader lease yet,
184+
// the nginx conf should not be updated.
185+
if !h.cfg.graphBuiltHealthChecker.leader {
186+
return
187+
}
188+
170189
// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
171190
if !h.cfg.graphBuiltHealthChecker.ready {
172191
h.cfg.graphBuiltHealthChecker.setAsReady()

internal/mode/static/handler_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ var _ = Describe("eventHandler", func() {
127127
updateGatewayClassStatus: true,
128128
})
129129
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse())
130+
131+
handler.cfg.graphBuiltHealthChecker.leader = true
130132
})
131133

132134
AfterEach(func() {
@@ -193,6 +195,17 @@ var _ = Describe("eventHandler", func() {
193195
expectReconfig(dcfg, fakeCfgFiles)
194196
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
195197
})
198+
199+
It("should process a NewLeaderEvent", func() {
200+
e := &events.NewLeaderEvent{}
201+
202+
batch := []interface{}{e}
203+
204+
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
205+
206+
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
207+
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
208+
})
196209
})
197210

198211
When("a batch has multiple events", func() {
@@ -202,7 +215,8 @@ var _ = Describe("eventHandler", func() {
202215
Type: &gatewayv1.HTTPRoute{},
203216
NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"},
204217
}
205-
batch := []interface{}{upsertEvent, deleteEvent}
218+
newLeaderEvent := &events.NewLeaderEvent{}
219+
batch := []interface{}{upsertEvent, deleteEvent, newLeaderEvent}
206220

207221
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
208222

@@ -502,6 +516,25 @@ var _ = Describe("eventHandler", func() {
502516
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed())
503517
})
504518

519+
It("should not update nginx conf if NGF is not leader", func() {
520+
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
521+
batch := []interface{}{e}
522+
readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh()
523+
524+
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
525+
526+
handler.cfg.graphBuiltHealthChecker.leader = false
527+
528+
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
529+
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
530+
531+
Expect(handler.GetLatestConfiguration()).To(BeNil())
532+
533+
Expect(readyChannel).ShouldNot(BeClosed())
534+
535+
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
536+
})
537+
505538
It("should panic for an unknown event type", func() {
506539
e := &struct{}{}
507540

internal/mode/static/health.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"errors"
55
"net/http"
66
"sync"
7+
8+
"github.com/nginx/nginx-gateway-fabric/internal/framework/events"
79
)
810

911
// newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker.
@@ -13,16 +15,18 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
1315
}
1416
}
1517

16-
// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready.
18+
// graphBuiltHealthChecker is used to check if the initial graph is built, if the NGF Pod is leader, and if the
19+
// NGF Pod is ready.
1720
type graphBuiltHealthChecker struct {
18-
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
1921
readyCh chan struct{}
22+
eventCh chan interface{}
2023
lock sync.RWMutex
2124
ready bool
25+
leader bool
2226
}
2327

2428
// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
25-
// We are considered ready after the first graph is built.
29+
// We are considered ready after the first graph is built and if the NGF Pod is leader.
2630
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
2731
h.lock.RLock()
2832
defer h.lock.RUnlock()
@@ -31,6 +35,10 @@ func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
3135
return errors.New("control plane is not yet ready")
3236
}
3337

38+
if !h.leader {
39+
return errors.New("this NGF Pod is not currently leader")
40+
}
41+
3442
return nil
3543
}
3644

@@ -47,3 +55,12 @@ func (h *graphBuiltHealthChecker) setAsReady() {
4755
func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
4856
return h.readyCh
4957
}
58+
59+
// setAsLeader marks the health check as leader and sends an empty event to the event channel.
60+
func (h *graphBuiltHealthChecker) setAsLeader() {
61+
h.lock.Lock()
62+
defer h.lock.Unlock()
63+
64+
h.leader = true
65+
h.eventCh <- &events.NewLeaderEvent{}
66+
}

internal/mode/static/health_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package static
22

33
import (
4+
"errors"
45
"testing"
56

67
. "github.com/onsi/gomega"
@@ -10,7 +11,14 @@ func TestReadyCheck(t *testing.T) {
1011
t.Parallel()
1112
g := NewWithT(t)
1213
healthChecker := newGraphBuiltHealthChecker()
13-
g.Expect(healthChecker.readyCheck(nil)).ToNot(Succeed())
14+
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("control plane is not yet ready")))
15+
16+
healthChecker.ready = true
17+
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))
18+
19+
healthChecker.ready = false
20+
healthChecker.leader = true
21+
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("control plane is not yet ready")))
1422

1523
healthChecker.ready = true
1624
g.Expect(healthChecker.readyCheck(nil)).To(Succeed())

internal/mode/static/manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,13 @@ func StartManager(cfg config.Config) error {
245245
return fmt.Errorf("cannot register event loop: %w", err)
246246
}
247247

248-
if err = mgr.Add(runnables.NewEnableAfterBecameLeader(groupStatusUpdater.Enable)); err != nil {
248+
// the healthChecker needs the same eventCh as the event handler so it can send a NewLeaderEvent when
249+
// the pod becomes leader, triggering HandleEventBatch to be called.
250+
healthChecker.eventCh = eventCh
251+
if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader(
252+
groupStatusUpdater.Enable,
253+
healthChecker.setAsLeader,
254+
)); err != nil {
249255
return fmt.Errorf("cannot register status updater: %w", err)
250256
}
251257

0 commit comments

Comments
 (0)