@@ -15,6 +15,8 @@ public class SocketOutput : ISocketOutput
15
15
{
16
16
private const int _maxPendingWrites = 3 ;
17
17
private const int _maxBytesPreCompleted = 65536 ;
18
+ private const int _maxPooledWriteContexts = 16 ;
19
+ private const int _maxPooledBufferQueues = 16 ;
18
20
19
21
private readonly KestrelThread _thread ;
20
22
private readonly UvStreamHandle _socket ;
@@ -32,14 +34,16 @@ public class SocketOutput : ISocketOutput
32
34
private Exception _lastWriteError ;
33
35
private WriteContext _nextWriteContext ;
34
36
private readonly Queue < TaskCompletionSource < object > > _tasksPending ;
37
+ private readonly Queue < WriteContext > _writeContexts ;
35
38
36
39
public SocketOutput ( KestrelThread thread , UvStreamHandle socket , long connectionId , IKestrelTrace log )
37
40
{
38
41
_thread = thread ;
39
42
_socket = socket ;
40
43
_connectionId = connectionId ;
41
44
_log = log ;
42
- _tasksPending = new Queue < TaskCompletionSource < object > > ( ) ;
45
+ _tasksPending = new Queue < TaskCompletionSource < object > > ( 16 ) ;
46
+ _writeContexts = new Queue < WriteContext > ( _maxPooledWriteContexts ) ;
43
47
}
44
48
45
49
public Task WriteAsync (
@@ -63,7 +67,14 @@ public Task WriteAsync(
63
67
{
64
68
if ( _nextWriteContext == null )
65
69
{
66
- _nextWriteContext = new WriteContext ( this ) ;
70
+ if ( _writeContexts . Count > 0 )
71
+ {
72
+ _nextWriteContext = _writeContexts . Dequeue ( ) ;
73
+ }
74
+ else
75
+ {
76
+ _nextWriteContext = new WriteContext ( this ) ;
77
+ }
67
78
}
68
79
69
80
if ( buffer . Array != null )
@@ -172,13 +183,13 @@ private void WriteAllPending()
172
183
}
173
184
174
185
// This is called on the libuv event loop
175
- private void OnWriteCompleted ( Queue < ArraySegment < byte > > writtenBuffers , int status , Exception error )
186
+ private void OnWriteCompleted ( WriteContext write )
176
187
{
177
- _log . ConnectionWriteCallback ( _connectionId , status ) ;
188
+ var status = write . WriteStatus ;
178
189
179
190
lock ( _lockObj )
180
191
{
181
- _lastWriteError = error ;
192
+ _lastWriteError = write . WriteError ;
182
193
183
194
if ( _nextWriteContext != null )
184
195
{
@@ -189,7 +200,7 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
189
200
_writesPending -- ;
190
201
}
191
202
192
- foreach ( var writeBuffer in writtenBuffers )
203
+ foreach ( var writeBuffer in write . Buffers )
193
204
{
194
205
// _numBytesPreCompleted can temporarily go negative in the event there are
195
206
// completed writes that we haven't triggered callbacks for yet.
@@ -208,24 +219,34 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
208
219
_numBytesPreCompleted += bytesToWrite ;
209
220
bytesLeftToBuffer -= bytesToWrite ;
210
221
211
- if ( error == null )
222
+ if ( write . WriteError == null )
212
223
{
213
224
ThreadPool . QueueUserWorkItem (
214
225
( o ) => ( ( TaskCompletionSource < object > ) o ) . SetResult ( null ) ,
215
226
tcs ) ;
216
227
}
217
228
else
218
229
{
230
+ var error = write . WriteError ;
219
231
// error is closure captured
220
232
ThreadPool . QueueUserWorkItem (
221
233
( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( error ) ,
222
234
tcs ) ;
223
235
}
224
236
}
225
237
238
+ if ( _writeContexts . Count < _maxPooledWriteContexts
239
+ && write . Buffers . Count <= _maxPooledBufferQueues )
240
+ {
241
+ write . Reset ( ) ;
242
+ _writeContexts . Enqueue ( write ) ;
243
+ }
244
+
226
245
// Now that the while loop has completed the following invariants should hold true:
227
246
Debug . Assert ( _numBytesPreCompleted >= 0 ) ;
228
247
}
248
+
249
+ _log . ConnectionWriteCallback ( _connectionId , status ) ;
229
250
}
230
251
231
252
void ISocketOutput . Write ( ArraySegment < byte > buffer , bool immediate )
@@ -263,7 +284,7 @@ private class WriteContext
263
284
public WriteContext ( SocketOutput self )
264
285
{
265
286
Self = self ;
266
- Buffers = new Queue < ArraySegment < byte > > ( ) ;
287
+ Buffers = new Queue < ArraySegment < byte > > ( _maxPooledBufferQueues ) ;
267
288
}
268
289
269
290
/// <summary>
@@ -340,7 +361,17 @@ public void DoDisconnectIfNeeded()
340
361
341
362
public void Complete ( )
342
363
{
343
- Self . OnWriteCompleted ( Buffers , WriteStatus , WriteError ) ;
364
+ Self . OnWriteCompleted ( this ) ;
365
+ }
366
+
367
+ public void Reset ( )
368
+ {
369
+ Buffers . Clear ( ) ;
370
+ SocketDisconnect = false ;
371
+ SocketShutdownSend = false ;
372
+ WriteStatus = 0 ;
373
+ WriteError = null ;
374
+ ShutdownSendStatus = 0 ;
344
375
}
345
376
}
346
377
}
0 commit comments