@@ -15,7 +15,7 @@ import (
15
15
"time"
16
16
)
17
17
18
- func TestLimitListener (t * testing.T ) {
18
+ func TestLimitListenerOverload (t * testing.T ) {
19
19
const (
20
20
max = 5
21
21
attempts = max * 2
@@ -30,6 +30,7 @@ func TestLimitListener(t *testing.T) {
30
30
31
31
var wg sync.WaitGroup
32
32
wg .Add (1 )
33
+ saturated := make (chan struct {})
33
34
go func () {
34
35
defer wg .Done ()
35
36
@@ -40,69 +41,174 @@ func TestLimitListener(t *testing.T) {
40
41
break
41
42
}
42
43
accepted ++
44
+ if accepted == max {
45
+ close (saturated )
46
+ }
43
47
io .WriteString (c , msg )
44
48
45
- defer c .Close () // Leave c open until the listener is closed.
49
+ // Leave c open until the listener is closed.
50
+ defer c .Close ()
46
51
}
47
- if accepted > max {
48
- t .Errorf ("accepted %d simultaneous connections; want at most %d" , accepted , max )
52
+ t .Logf ("with limit %d, accepted %d simultaneous connections" , max , accepted )
53
+ // The listener accounts open connections based on Listener-side Close
54
+ // calls, so even if the client hangs up early (for example, because it
55
+ // was a random dial from another process instead of from this test), we
56
+ // should not end up accepting more connections than expected.
57
+ if accepted != max {
58
+ t .Errorf ("want exactly %d" , max )
49
59
}
50
60
}()
51
61
52
- // connc keeps the client end of the dialed connections alive until the
53
- // test completes.
54
- connc := make (chan []net.Conn , 1 )
55
- connc <- nil
56
-
57
62
dialCtx , cancelDial := context .WithCancel (context .Background ())
58
63
defer cancelDial ()
59
64
dialer := & net.Dialer {}
60
65
61
- var served int32
66
+ var dialed , served int32
67
+ var pendingDials sync.WaitGroup
62
68
for n := attempts ; n > 0 ; n -- {
63
69
wg .Add (1 )
70
+ pendingDials .Add (1 )
64
71
go func () {
65
72
defer wg .Done ()
66
73
67
74
c , err := dialer .DialContext (dialCtx , l .Addr ().Network (), l .Addr ().String ())
75
+ pendingDials .Done ()
68
76
if err != nil {
69
77
t .Log (err )
70
78
return
71
79
}
80
+ atomic .AddInt32 (& dialed , 1 )
72
81
defer c .Close ()
73
82
74
- // Keep this end of the connection alive until after the Listener
75
- // finishes.
76
- conns := append (<- connc , c )
77
- if len (conns ) == max {
78
- go func () {
79
- // Give the server a bit of time to make sure it doesn't exceed its
80
- // limit after serving this connection, then cancel the remaining
81
- // Dials (if any).
82
- time .Sleep (10 * time .Millisecond )
83
- cancelDial ()
84
- l .Close ()
85
- }()
86
- }
87
- connc <- conns
88
-
89
- b := make ([]byte , len (msg ))
90
- if n , err := c .Read (b ); n < len (b ) {
83
+ // The kernel may queue more than max connections (allowing their dials to
84
+ // succeed), but only max of them should actually be accepted by the
85
+ // server. We can distinguish the two based on whether the listener writes
86
+ // anything to the connection — a connection that was queued but not
87
+ // accepted will be closed without transferring any data.
88
+ if b , err := io .ReadAll (c ); len (b ) < len (msg ) {
91
89
t .Log (err )
92
90
return
93
91
}
94
92
atomic .AddInt32 (& served , 1 )
95
93
}()
96
94
}
95
+
96
+ // Give the server a bit of time after it saturates to make sure it doesn't
97
+ // exceed its limit after serving this connection, then cancel the remaining
98
+ // dials (if any).
99
+ <- saturated
100
+ time .Sleep (10 * time .Millisecond )
101
+ cancelDial ()
102
+ // Wait for the dials to complete to ensure that the port isn't reused before
103
+ // the dials are actually attempted.
104
+ pendingDials .Wait ()
105
+ l .Close ()
97
106
wg .Wait ()
98
107
99
- conns := <- connc
100
- for _ , c := range conns {
101
- c .Close ()
108
+ t .Logf ("served %d simultaneous connections (of %d dialed, %d attempted)" , served , dialed , attempts )
109
+
110
+ // If some other process (such as a port scan or another test) happens to dial
111
+ // the listener at the same time, the listener could end up burning its quota
112
+ // on that, resulting in fewer than max test connections being served.
113
+ // But the number served certainly cannot be greater.
114
+ if served > max {
115
+ t .Errorf ("expected at most %d served" , max )
102
116
}
103
- t .Logf ("with limit %d, served %d connections (of %d dialed, %d attempted)" , max , served , len (conns ), attempts )
104
- if served != max {
105
- t .Errorf ("expected exactly %d served" , max )
117
+ }
118
+
119
+ func TestLimitListenerSaturation (t * testing.T ) {
120
+ const (
121
+ max = 5
122
+ attemptsPerWave = max * 2
123
+ waves = 10
124
+ msg = "bye\n "
125
+ )
126
+
127
+ l , err := net .Listen ("tcp" , "127.0.0.1:0" )
128
+ if err != nil {
129
+ t .Fatal (err )
130
+ }
131
+ l = LimitListener (l , max )
132
+
133
+ acceptDone := make (chan struct {})
134
+ defer func () {
135
+ l .Close ()
136
+ <- acceptDone
137
+ }()
138
+ go func () {
139
+ defer close (acceptDone )
140
+
141
+ var open , peakOpen int32
142
+ var (
143
+ saturated = make (chan struct {})
144
+ saturatedOnce sync.Once
145
+ )
146
+ var wg sync.WaitGroup
147
+ for {
148
+ c , err := l .Accept ()
149
+ if err != nil {
150
+ break
151
+ }
152
+ if n := atomic .AddInt32 (& open , 1 ); n > peakOpen {
153
+ peakOpen = n
154
+ if n == max {
155
+ saturatedOnce .Do (func () {
156
+ // Wait a bit to make sure the listener doesn't exceed its limit
157
+ // after accepting this connection, then allow the in-flight
158
+ // connections to write out and close.
159
+ time .AfterFunc (10 * time .Millisecond , func () { close (saturated ) })
160
+ })
161
+ }
162
+ }
163
+ wg .Add (1 )
164
+ go func () {
165
+ <- saturated
166
+ io .WriteString (c , msg )
167
+ atomic .AddInt32 (& open , - 1 )
168
+ c .Close ()
169
+ wg .Done ()
170
+ }()
171
+ }
172
+ wg .Wait ()
173
+
174
+ t .Logf ("with limit %d, accepted a peak of %d simultaneous connections" , max , peakOpen )
175
+ if peakOpen > max {
176
+ t .Errorf ("want at most %d" , max )
177
+ }
178
+ }()
179
+
180
+ for wave := 0 ; wave < waves ; wave ++ {
181
+ var dialed , served int32
182
+ var wg sync.WaitGroup
183
+ for n := attemptsPerWave ; n > 0 ; n -- {
184
+ wg .Add (1 )
185
+ go func () {
186
+ defer wg .Done ()
187
+
188
+ c , err := net .Dial (l .Addr ().Network (), l .Addr ().String ())
189
+ if err != nil {
190
+ t .Log (err )
191
+ return
192
+ }
193
+ atomic .AddInt32 (& dialed , 1 )
194
+ defer c .Close ()
195
+
196
+ if b , err := io .ReadAll (c ); len (b ) < len (msg ) {
197
+ t .Log (err )
198
+ return
199
+ }
200
+ atomic .AddInt32 (& served , 1 )
201
+ }()
202
+ }
203
+ wg .Wait ()
204
+
205
+ t .Logf ("served %d connections (of %d dialed, %d attempted)" , served , dialed , attemptsPerWave )
206
+ // We expect that the kernel can queue at least attemptsPerWave
207
+ // connections at a time (since it's only a small number), so every
208
+ // connection should eventually be served.
209
+ if served != attemptsPerWave {
210
+ t .Errorf ("expected %d served" , attemptsPerWave )
211
+ }
106
212
}
107
213
}
108
214
@@ -160,9 +266,7 @@ func TestLimitListenerClose(t *testing.T) {
160
266
161
267
// Allow the subsequent Accept to block before closing the listener.
162
268
// (Accept should unblock and return.)
163
- timer := time .AfterFunc (10 * time .Millisecond , func () {
164
- ln .Close ()
165
- })
269
+ timer := time .AfterFunc (10 * time .Millisecond , func () { ln .Close () })
166
270
167
271
c , err = ln .Accept ()
168
272
if err == nil {
0 commit comments