@@ -23,12 +23,18 @@ import (
2323 "fmt"
2424 "net"
2525 "strconv"
26+ "sync"
2627 "testing"
28+ "time"
2729
30+ "google.golang.org/grpc"
2831 "google.golang.org/grpc/connectivity"
32+ "google.golang.org/grpc/credentials/insecure"
2933 "google.golang.org/grpc/internal"
3034 "google.golang.org/grpc/internal/testutils"
3135 "google.golang.org/grpc/internal/testutils/xds/e2e"
36+ testgrpc "google.golang.org/grpc/interop/grpc_testing"
37+ testpb "google.golang.org/grpc/interop/grpc_testing"
3238 xdsinternal "google.golang.org/grpc/xds/internal"
3339 "google.golang.org/grpc/xds/internal/xdsclient"
3440 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@@ -151,5 +157,104 @@ func (s) TestListenerWrapper(t *testing.T) {
151157 t .Fatalf ("mode change received: %v, want: %v" , mode , connectivity .ServingModeNotServing )
152158 }
153159 }
160+ }
161+
162+ type testService struct {
163+ testgrpc.TestServiceServer
164+ }
165+
166+ func (* testService ) EmptyCall (context.Context , * testpb.Empty ) (* testpb.Empty , error ) {
167+ return & testpb.Empty {}, nil
168+ }
169+
170+ // TestConnsCleanup tests that the listener wrapper clears it's connection
171+ // references when connections close. It sets up a listener wrapper and gRPC
172+ // Server, and connects to the server 100 times and makes an RPC each time, and
173+ // then closes the connection. After these 100 connections Close, the listener
174+ // wrapper should have no more references to any connections.
175+ func (s ) TestConnsCleanup (t * testing.T ) {
176+ mgmtServer , nodeID , _ , _ , xdsC := xdsSetupForTests (t )
177+ lis , err := testutils .LocalTCPListener ()
178+ if err != nil {
179+ t .Fatalf ("Failed to create a local TCP listener: %v" , err )
180+ }
181+
182+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
183+ defer cancel ()
184+
185+ modeCh := make (chan connectivity.ServingMode , 1 )
186+ vm := verifyMode {
187+ modeCh : modeCh ,
188+ }
189+
190+ host , port := hostPortFromListener (t , lis )
191+ lisResourceName := fmt .Sprintf (e2e .ServerListenerResourceNameTemplate , net .JoinHostPort (host , strconv .Itoa (int (port ))))
192+ params := ListenerWrapperParams {
193+ Listener : lis ,
194+ ListenerResourceName : lisResourceName ,
195+ XDSClient : xdsC ,
196+ ModeCallback : vm .verifyModeCallback ,
197+ }
198+ lw := NewListenerWrapper (params )
199+ if lw == nil {
200+ t .Fatalf ("NewListenerWrapper(%+v) returned nil" , params )
201+ }
202+ defer lw .Close ()
203+
204+ resources := e2e.UpdateOptions {
205+ NodeID : nodeID ,
206+ Listeners : []* v3listenerpb.Listener {e2e .DefaultServerListener (host , port , e2e .SecurityLevelNone , route1 )},
207+ SkipValidation : true ,
208+ }
209+ if err := mgmtServer .Update (ctx , resources ); err != nil {
210+ t .Fatal (err )
211+ }
212+
213+ // Wait for Listener Mode to go serving.
214+ select {
215+ case <- ctx .Done ():
216+ t .Fatalf ("timeout waiting for mode change" )
217+ case mode := <- modeCh :
218+ if mode != connectivity .ServingModeServing {
219+ t .Fatalf ("mode change received: %v, want: %v" , mode , connectivity .ServingModeServing )
220+ }
221+ }
222+
223+ server := grpc .NewServer (grpc .Creds (insecure .NewCredentials ()))
224+ testgrpc .RegisterTestServiceServer (server , & testService {})
225+ wg := sync.WaitGroup {}
226+ go func () {
227+ if err := server .Serve (lw ); err != nil {
228+ t .Errorf ("failed to serve: %v" , err )
229+ }
230+ }()
231+
232+ // Make 100 connections to the server, and make an RPC on each one.
233+ for i := 0 ; i < 100 ; i ++ {
234+ cc , err := grpc .NewClient (lw .Addr ().String (), grpc .WithTransportCredentials (insecure .NewCredentials ()))
235+ if err != nil {
236+ t .Fatalf ("grpc.NewClient failed with err: %v" , err )
237+ }
238+ client := testgrpc .NewTestServiceClient (cc )
239+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err != nil {
240+ t .Fatalf ("client.EmptyCall() failed: %v" , err )
241+ }
242+ cc .Close ()
243+ }
244+
245+ lisWrapper := lw .(* listenerWrapper )
246+ // Eventually when the server processes the connection shutdowns, the
247+ // listener wrapper should clear its references to the wrapped connections.
248+ lenConns := 1
249+ for ; ctx .Err () == nil && lenConns > 0 ; <- time .After (time .Millisecond ) {
250+ lisWrapper .mu .Lock ()
251+ lenConns = len (lisWrapper .conns )
252+ lisWrapper .mu .Unlock ()
253+ }
254+ if lenConns > 0 {
255+ t .Fatalf ("timeout waiting for lis wrapper conns to clear, size: %v" , lenConns )
256+ }
154257
258+ server .Stop ()
259+ wg .Wait ()
155260}
0 commit comments