Skip to content
Merged
Binary file modified docs/proposals/control-data-plane-split/graph-conns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 16 additions & 12 deletions internal/framework/runnables/runnables.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,33 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
return false
}

// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
// the leader.
type EnableAfterBecameLeader struct {
enable func(context.Context)
type CallFunctionsAfterBecameLeader struct {
enableFunctions []func(context.Context)
}

var (
_ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{}
_ manager.Runnable = &EnableAfterBecameLeader{}
_ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{}
_ manager.Runnable = &CallFunctionsAfterBecameLeader{}
)

// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable.
func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader {
return &EnableAfterBecameLeader{
enable: enable,
// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
func NewCallFunctionsAfterBecameLeader(
enableFunctions []func(context.Context),
) *CallFunctionsAfterBecameLeader {
return &CallFunctionsAfterBecameLeader{
enableFunctions: enableFunctions,
}
}

func (j *EnableAfterBecameLeader) Start(ctx context.Context) error {
j.enable(ctx)
func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
for _, f := range j.enableFunctions {
f(ctx)
}
return nil
}

func (j *EnableAfterBecameLeader) NeedLeaderElection() bool {
func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool {
return true
}
22 changes: 14 additions & 8 deletions internal/framework/runnables/runnables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,25 @@ func TestLeaderOrNonLeader(t *testing.T) {
g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse())
}

func TestEnableAfterBecameLeader(t *testing.T) {
func TestCallFunctionsAfterBecameLeader(t *testing.T) {
t.Parallel()
enabled := false
enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) {
enabled = true
statusUpdaterEnabled := false
healthCheckEnableLeader := false
eventHandlerEnabled := false

callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader([]func(ctx context.Context){
func(_ context.Context) { statusUpdaterEnabled = true },
func(_ context.Context) { healthCheckEnableLeader = true },
func(_ context.Context) { eventHandlerEnabled = true },
})

g := NewWithT(t)
g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
g.Expect(enabled).To(BeFalse())
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())

err := enableAfterBecameLeader.Start(context.Background())
err := callFunctionsAfterBecameLeader.Start(context.Background())
g.Expect(err).ToNot(HaveOccurred())

g.Expect(enabled).To(BeTrue())
g.Expect(statusUpdaterEnabled).To(BeTrue())
g.Expect(healthCheckEnableLeader).To(BeTrue())
g.Expect(eventHandlerEnabled).To(BeTrue())
}
30 changes: 25 additions & 5 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,33 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log

changeType, gr := h.cfg.processor.Process()

// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
if !h.cfg.graphBuiltHealthChecker.ready {
h.cfg.graphBuiltHealthChecker.setAsReady()
// Once we've processed resources on startup and built our first graph, mark the Pod as having built the graph.
if !h.cfg.graphBuiltHealthChecker.graphBuilt {
h.cfg.graphBuiltHealthChecker.setGraphBuilt()
}

// TODO(sberman): hardcode this deployment name until we support provisioning data planes
// If no deployments exist, we should just return without doing anything.
// if this Pod is not the leader or does not have the leader lease yet,
// the nginx conf should not be updated.
if !h.cfg.graphBuiltHealthChecker.leader {
return
}

h.sendNginxConfig(ctx, logger, gr, changeType)
}

func (h *eventHandlerImpl) eventHandlerEnable(ctx context.Context) {
// Latest graph is guaranteed to not be nil since the leader election process takes longer than
// the initial call to HandleEventBatch when NGF starts up. And GatewayClass will typically always exist which
// triggers an event.
h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange)
}

func (h *eventHandlerImpl) sendNginxConfig(
ctx context.Context,
logger logr.Logger,
gr *graph.Graph,
changeType state.ChangeType,
) {
deploymentName := types.NamespacedName{
Name: "tmp-nginx-deployment",
Namespace: h.cfg.gatewayPodConfig.Namespace,
Expand Down
28 changes: 22 additions & 6 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ var _ = Describe("eventHandler", func() {
metricsCollector: collectors.NewControllerNoopCollector(),
updateGatewayClassStatus: true,
})
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse())
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeFalse())

handler.cfg.graphBuiltHealthChecker.leader = true
})

AfterEach(func() {
Expand Down Expand Up @@ -161,7 +163,7 @@ var _ = Describe("eventHandler", func() {
})

AfterEach(func() {
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeTrue())
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
})

When("a batch has one event", func() {
Expand Down Expand Up @@ -484,22 +486,36 @@ var _ = Describe("eventHandler", func() {
Expect(gr.LatestReloadResult.Error.Error()).To(Equal("status error"))
})

It("should set the health checker status properly", func() {
It("should update nginx conf only when leader", func() {
ctx := context.Background()
handler.cfg.graphBuiltHealthChecker.leader = false

e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}
readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh()

fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})

Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

// graph is built, but since the graphBuiltHealthChecker.leader is false, configuration isn't created and
// the readyCheck fails
Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue())
Expect(handler.GetLatestConfiguration()).To(BeNil())
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
Expect(readyChannel).ShouldNot(BeClosed())

// Once the pod becomes leader, these two functions will be called through the runnables we set in the manager
handler.cfg.graphBuiltHealthChecker.setAsLeader(ctx)
handler.eventHandlerEnable(ctx)

// nginx conf has been set
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())

Expect(readyChannel).To(BeClosed())

// ready check is also set
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed())
Expect(handler.cfg.graphBuiltHealthChecker.getReadyCh()).To(BeClosed())
})

It("should panic for an unknown event type", func() {
Expand Down
87 changes: 76 additions & 11 deletions internal/mode/static/health.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package static

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/nginx/nginx-gateway-fabric/internal/mode/static/config"
)

// newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker.
Expand All @@ -13,37 +21,94 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
}
}

// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready.
// graphBuiltHealthChecker is used to check if the NGF Pod is ready. The NGF Pod is ready if the initial graph has
// been built and if it is leader.
type graphBuiltHealthChecker struct {
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
readyCh chan struct{}
lock sync.RWMutex
ready bool
readyCh chan struct{}
lock sync.RWMutex
graphBuilt bool
leader bool
}

// createHealthProbe creates a Server runnable to serve as our health and readiness checker.
func createHealthProbe(cfg config.Config, healthChecker *graphBuiltHealthChecker) (manager.Server, error) {
// we chose to create our own health probe server instead of using the controller-runtime one because
// of repetitive log which would flood our logs on non-ready non-leader NGF Pods. This health probe is
// similar to the controller-runtime's health probe.

mux := http.NewServeMux()

// copy of controller-runtime sane defaults for new http.Server
s := &http.Server{
Handler: mux,
MaxHeaderBytes: 1 << 20,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second,
}

mux.HandleFunc(readinessEndpointName, healthChecker.readyHandler)

ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.HealthConfig.Port))
if err != nil {
return manager.Server{},
fmt.Errorf("error listening on %s: %w", fmt.Sprintf(":%d", cfg.HealthConfig.Port), err)
}

return manager.Server{
Name: "health probe",
Server: s,
Listener: ln,
}, nil
}

func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) {
if err := h.readyCheck(req); err != nil {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
}

// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
// We are considered ready after the first graph is built.
// We are considered ready after the first graph is built and if the NGF Pod is leader.
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
h.lock.RLock()
defer h.lock.RUnlock()

if !h.ready {
return errors.New("control plane is not yet ready")
if !h.leader {
return errors.New("this Pod is not currently leader")
}

if !h.graphBuilt {
return errors.New("control plane initial graph has not been built")
}

return nil
}

// setAsReady marks the health check as ready.
func (h *graphBuiltHealthChecker) setAsReady() {
// setGraphBuilt marks the health check as having the initial graph built.
func (h *graphBuiltHealthChecker) setGraphBuilt() {
h.lock.Lock()
defer h.lock.Unlock()

h.ready = true
close(h.readyCh)
h.graphBuilt = true
}

// getReadyCh returns a read-only channel, which determines if the NGF Pod is ready.
func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
return h.readyCh
}

// setAsLeader marks the health check as leader.
func (h *graphBuiltHealthChecker) setAsLeader(_ context.Context) {
h.lock.Lock()
defer h.lock.Unlock()

h.leader = true

// setGraphBuilt should already have been called when processing the resources on startup because the leader
// election process takes longer than the initial call to HandleEventBatch. Thus, the NGF Pod should be marked as
// ready and have this channel be closed.
close(h.readyCh)
}
Loading
Loading