@@ -6,32 +6,51 @@ import (
66 "fmt"
77 "time"
88
9+ "github.com/go-logr/logr"
910 pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1011 "google.golang.org/grpc"
12+
13+ agentgrpc "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
14+ grpcContext "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
1115)
1216
13- // commandService handles the connection and subscription to the agent.
17+ // commandService handles the connection and subscription to the data plane agent.
1418type commandService struct {
1519 pb.CommandServiceServer
20+ connTracker * agentgrpc.ConnectionsTracker
21+ // TODO(sberman): all logs are at Info level right now. Adjust appropriately.
22+ logger logr.Logger
1623}
1724
18- func newCommandService () * commandService {
19- return & commandService {}
25+ func newCommandService (logger logr.Logger ) * commandService {
26+ return & commandService {
27+ logger : logger ,
28+ connTracker : agentgrpc .NewConnectionsTracker (),
29+ }
2030}
2131
2232func (cs * commandService ) Register (server * grpc.Server ) {
2333 pb .RegisterCommandServiceServer (server , cs )
2434}
2535
36+ // CreateConnection registers a data plane agent with the control plane.
2637func (cs * commandService ) CreateConnection (
27- _ context.Context ,
38+ ctx context.Context ,
2839 req * pb.CreateConnectionRequest ,
2940) (* pb.CreateConnectionResponse , error ) {
3041 if req == nil {
3142 return nil , errors .New ("empty connection request" )
3243 }
3344
34- fmt .Printf ("Creating connection for nginx pod: %s\n " , req .GetResource ().GetContainerInfo ().GetHostname ())
45+ gi , ok := grpcContext .GrpcInfoFromContext (ctx )
46+ if ! ok {
47+ return nil , agentgrpc .ErrStatusInvalidConnection
48+ }
49+
50+ podName := req .GetResource ().GetContainerInfo ().GetHostname ()
51+
52+ cs .logger .Info (fmt .Sprintf ("Creating connection for nginx pod: %s" , podName ))
53+ cs .connTracker .Track (gi .IPAddress , podName )
3554
3655 return & pb.CreateConnectionResponse {
3756 Response : & pb.CommandResponse {
@@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection(
4059 }, nil
4160}
4261
62+ // Subscribe is a decoupled communication mechanism between the data plane agent and control plane.
4363func (cs * commandService ) Subscribe (in pb.CommandService_SubscribeServer ) error {
44- fmt .Println ("Received subscribe request" )
45-
4664 ctx := in .Context ()
4765
66+ gi , ok := grpcContext .GrpcInfoFromContext (ctx )
67+ if ! ok {
68+ return agentgrpc .ErrStatusInvalidConnection
69+ }
70+
71+ cs .logger .Info (fmt .Sprintf ("Received subscribe request from %q" , gi .IPAddress ))
72+
73+ go cs .listenForDataPlaneResponse (ctx , in )
74+
75+ // wait for the agent to report itself
76+ podName , err := cs .waitForConnection (ctx , gi )
77+ if err != nil {
78+ cs .logger .Error (err , "error waiting for connection" )
79+ return err
80+ }
81+
82+ cs .logger .Info (fmt .Sprintf ("Handling subscription for %s/%s" , podName , gi .IPAddress ))
4883 for {
4984 select {
5085 case <- ctx .Done ():
5186 return ctx .Err ()
5287 case <- time .After (1 * time .Minute ):
5388 dummyRequest := & pb.ManagementPlaneRequest {
54- Request : & pb.ManagementPlaneRequest_StatusRequest {
55- StatusRequest : & pb.StatusRequest {},
89+ Request : & pb.ManagementPlaneRequest_HealthRequest {
90+ HealthRequest : & pb.HealthRequest {},
5691 },
5792 }
58- if err := in .Send (dummyRequest ); err != nil { // will likely need retry logic
59- fmt . Printf ( "ERROR: %v \n " , err )
93+ if err := in .Send (dummyRequest ); err != nil { // TODO(sberman): will likely need retry logic
94+ cs . logger . Error ( err , "error sending request to agent" )
6095 }
6196 }
6297 }
6398}
6499
65- func (cs * commandService ) UpdateDataPlaneStatus (
66- _ context.Context ,
67- req * pb.UpdateDataPlaneStatusRequest ,
68- ) (* pb.UpdateDataPlaneStatusResponse , error ) {
69- fmt .Println ("Updating data plane status" )
100+ // TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call,
101+ // so this fails.
102+ func (cs * commandService ) waitForConnection (ctx context.Context , gi grpcContext.GrpcInfo ) (string , error ) {
103+ var podName string
104+ ticker := time .NewTicker (time .Second )
105+ defer ticker .Stop ()
70106
71- if req == nil {
72- return nil , errors .New ("empty update data plane status request" )
107+ timer := time .NewTimer (30 * time .Second )
108+ defer timer .Stop ()
109+
110+ for {
111+ select {
112+ case <- ctx .Done ():
113+ return "" , ctx .Err ()
114+ case <- timer .C :
115+ return "" , errors .New ("timed out waiting for agent connection" )
116+ case <- ticker .C :
117+ if podName = cs .connTracker .GetConnection (gi .IPAddress ); podName != "" {
118+ return podName , nil
119+ }
120+ }
73121 }
122+ }
74123
75- return & pb.UpdateDataPlaneStatusResponse {}, nil
124+ func (cs * commandService ) listenForDataPlaneResponse (ctx context.Context , in pb.CommandService_SubscribeServer ) {
125+ for {
126+ select {
127+ case <- ctx .Done ():
128+ return
129+ default :
130+ dataPlaneResponse , err := in .Recv ()
131+ cs .logger .Info (fmt .Sprintf ("Received data plane response: %v" , dataPlaneResponse ))
132+ if err != nil {
133+ cs .logger .Error (err , "failed to receive data plane response" )
134+ return
135+ }
136+ }
137+ }
76138}
77139
140+ // UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent.
141+ // TODO(sberman): Is health monitoring the data planes something useful for us to do?
78142func (cs * commandService ) UpdateDataPlaneHealth (
79143 _ context.Context ,
80- req * pb.UpdateDataPlaneHealthRequest ,
144+ _ * pb.UpdateDataPlaneHealthRequest ,
81145) (* pb.UpdateDataPlaneHealthResponse , error ) {
82- fmt .Println ("Updating data plane health" )
83-
84- if req == nil {
85- return nil , errors .New ("empty update dataplane health request" )
86- }
87-
88146 return & pb.UpdateDataPlaneHealthResponse {}, nil
89147}
148+
149+ // UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata,
150+ // instance metadata, or configurations. Since directly changing nginx configuration on the instance
151+ // is not supported, this is a no-op for NGF.
152+ func (cs * commandService ) UpdateDataPlaneStatus (
153+ _ context.Context ,
154+ _ * pb.UpdateDataPlaneStatusRequest ,
155+ ) (* pb.UpdateDataPlaneStatusResponse , error ) {
156+ return & pb.UpdateDataPlaneStatusResponse {}, nil
157+ }
0 commit comments