@@ -23,12 +23,18 @@ import (
2323	"fmt" 
2424	"net" 
2525	"strconv" 
26+ 	"sync" 
2627	"testing" 
28+ 	"time" 
2729
30+ 	"google.golang.org/grpc" 
2831	"google.golang.org/grpc/connectivity" 
32+ 	"google.golang.org/grpc/credentials/insecure" 
2933	"google.golang.org/grpc/internal" 
3034	"google.golang.org/grpc/internal/testutils" 
3135	"google.golang.org/grpc/internal/testutils/xds/e2e" 
36+ 	testgrpc "google.golang.org/grpc/interop/grpc_testing" 
37+ 	testpb "google.golang.org/grpc/interop/grpc_testing" 
3238	xdsinternal "google.golang.org/grpc/xds/internal" 
3339	"google.golang.org/grpc/xds/internal/xdsclient" 
3440	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource" 
@@ -151,5 +157,104 @@ func (s) TestListenerWrapper(t *testing.T) {
151157			t .Fatalf ("mode change received: %v, want: %v" , mode , connectivity .ServingModeNotServing )
152158		}
153159	}
160+ }
161+ 
162+ type  testService  struct  {
163+ 	testgrpc.TestServiceServer 
164+ }
165+ 
166+ func  (* testService ) EmptyCall (context.Context , * testpb.Empty ) (* testpb.Empty , error ) {
167+ 	return  & testpb.Empty {}, nil 
168+ }
169+ 
170+ // TestConnsCleanup tests that the listener wrapper clears it's connection 
171+ // references when connections close. It sets up a listener wrapper and gRPC 
172+ // Server, and connects to the server 100 times and makes an RPC each time, and 
173+ // then closes the connection. After these 100 connections Close, the listener 
174+ // wrapper should have no more references to any connections. 
175+ func  (s ) TestConnsCleanup (t  * testing.T ) {
176+ 	mgmtServer , nodeID , _ , _ , xdsC  :=  xdsSetupForTests (t )
177+ 	lis , err  :=  testutils .LocalTCPListener ()
178+ 	if  err  !=  nil  {
179+ 		t .Fatalf ("Failed to create a local TCP listener: %v" , err )
180+ 	}
181+ 
182+ 	ctx , cancel  :=  context .WithTimeout (context .Background (), defaultTestTimeout )
183+ 	defer  cancel ()
184+ 
185+ 	modeCh  :=  make (chan  connectivity.ServingMode , 1 )
186+ 	vm  :=  verifyMode {
187+ 		modeCh : modeCh ,
188+ 	}
189+ 
190+ 	host , port  :=  hostPortFromListener (t , lis )
191+ 	lisResourceName  :=  fmt .Sprintf (e2e .ServerListenerResourceNameTemplate , net .JoinHostPort (host , strconv .Itoa (int (port ))))
192+ 	params  :=  ListenerWrapperParams {
193+ 		Listener :             lis ,
194+ 		ListenerResourceName : lisResourceName ,
195+ 		XDSClient :            xdsC ,
196+ 		ModeCallback :         vm .verifyModeCallback ,
197+ 	}
198+ 	lw  :=  NewListenerWrapper (params )
199+ 	if  lw  ==  nil  {
200+ 		t .Fatalf ("NewListenerWrapper(%+v) returned nil" , params )
201+ 	}
202+ 	defer  lw .Close ()
203+ 
204+ 	resources  :=  e2e.UpdateOptions {
205+ 		NodeID :         nodeID ,
206+ 		Listeners :      []* v3listenerpb.Listener {e2e .DefaultServerListener (host , port , e2e .SecurityLevelNone , route1 )},
207+ 		SkipValidation : true ,
208+ 	}
209+ 	if  err  :=  mgmtServer .Update (ctx , resources ); err  !=  nil  {
210+ 		t .Fatal (err )
211+ 	}
212+ 
213+ 	// Wait for Listener Mode to go serving. 
214+ 	select  {
215+ 	case  <- ctx .Done ():
216+ 		t .Fatalf ("timeout waiting for mode change" )
217+ 	case  mode  :=  <- modeCh :
218+ 		if  mode  !=  connectivity .ServingModeServing  {
219+ 			t .Fatalf ("mode change received: %v, want: %v" , mode , connectivity .ServingModeServing )
220+ 		}
221+ 	}
222+ 
223+ 	server  :=  grpc .NewServer (grpc .Creds (insecure .NewCredentials ()))
224+ 	testgrpc .RegisterTestServiceServer (server , & testService {})
225+ 	wg  :=  sync.WaitGroup {}
226+ 	go  func () {
227+ 		if  err  :=  server .Serve (lw ); err  !=  nil  {
228+ 			t .Errorf ("failed to serve: %v" , err )
229+ 		}
230+ 	}()
231+ 
232+ 	// Make 100 connections to the server, and make an RPC on each one. 
233+ 	for  i  :=  0 ; i  <  100 ; i ++  {
234+ 		cc , err  :=  grpc .NewClient (lw .Addr ().String (), grpc .WithTransportCredentials (insecure .NewCredentials ()))
235+ 		if  err  !=  nil  {
236+ 			t .Fatalf ("grpc.NewClient failed with err: %v" , err )
237+ 		}
238+ 		client  :=  testgrpc .NewTestServiceClient (cc )
239+ 		if  _ , err  :=  client .EmptyCall (ctx , & testpb.Empty {}); err  !=  nil  {
240+ 			t .Fatalf ("client.EmptyCall() failed: %v" , err )
241+ 		}
242+ 		cc .Close ()
243+ 	}
244+ 
245+ 	lisWrapper  :=  lw .(* listenerWrapper )
246+ 	// Eventually when the server processes the connection shutdowns, the 
247+ 	// listener wrapper should clear its references to the wrapped connections. 
248+ 	lenConns  :=  1 
249+ 	for  ; ctx .Err () ==  nil  &&  lenConns  >  0 ; <- time .After (time .Millisecond ) {
250+ 		lisWrapper .mu .Lock ()
251+ 		lenConns  =  len (lisWrapper .conns )
252+ 		lisWrapper .mu .Unlock ()
253+ 	}
254+ 	if  lenConns  >  0  {
255+ 		t .Fatalf ("timeout waiting for lis wrapper conns to clear, size: %v" , lenConns )
256+ 	}
154257
258+ 	server .Stop ()
259+ 	wg .Wait ()
155260}
0 commit comments