@@ -15,6 +15,7 @@ public class SocketOutput : ISocketOutput
15
15
{
16
16
private const int _maxPendingWrites = 3 ;
17
17
private const int _maxBytesPreCompleted = 65536 ;
18
+ private const int _maxPooledWriteContexts = 16 ;
18
19
19
20
private readonly KestrelThread _thread ;
20
21
private readonly UvStreamHandle _socket ;
@@ -32,14 +33,16 @@ public class SocketOutput : ISocketOutput
32
33
private Exception _lastWriteError ;
33
34
private WriteContext _nextWriteContext ;
34
35
private readonly Queue < TaskCompletionSource < object > > _tasksPending ;
36
+ private readonly Queue < WriteContext > _writeContexts ;
35
37
36
38
public SocketOutput ( KestrelThread thread , UvStreamHandle socket , long connectionId , IKestrelTrace log )
37
39
{
38
40
_thread = thread ;
39
41
_socket = socket ;
40
42
_connectionId = connectionId ;
41
43
_log = log ;
42
- _tasksPending = new Queue < TaskCompletionSource < object > > ( ) ;
44
+ _tasksPending = new Queue < TaskCompletionSource < object > > ( 16 ) ;
45
+ _writeContexts = new Queue < WriteContext > ( _maxPooledWriteContexts ) ;
43
46
}
44
47
45
48
public Task WriteAsync (
@@ -63,7 +66,14 @@ public Task WriteAsync(
63
66
{
64
67
if ( _nextWriteContext == null )
65
68
{
66
- _nextWriteContext = new WriteContext ( this ) ;
69
+ if ( _writeContexts . Count > 0 )
70
+ {
71
+ _nextWriteContext = _writeContexts . Dequeue ( ) ;
72
+ }
73
+ else
74
+ {
75
+ _nextWriteContext = new WriteContext ( this ) ;
76
+ }
67
77
}
68
78
69
79
if ( buffer . Array != null )
@@ -172,13 +182,13 @@ private void WriteAllPending()
172
182
}
173
183
174
184
// This is called on the libuv event loop
175
- private void OnWriteCompleted ( Queue < ArraySegment < byte > > writtenBuffers , int status , Exception error )
185
+ private void OnWriteCompleted ( WriteContext write )
176
186
{
177
- _log . ConnectionWriteCallback ( _connectionId , status ) ;
187
+ var status = write . WriteStatus ;
178
188
179
189
lock ( _lockObj )
180
190
{
181
- _lastWriteError = error ;
191
+ _lastWriteError = write . WriteError ;
182
192
183
193
if ( _nextWriteContext != null )
184
194
{
@@ -189,7 +199,7 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
189
199
_writesPending -- ;
190
200
}
191
201
192
- foreach ( var writeBuffer in writtenBuffers )
202
+ foreach ( var writeBuffer in write . Buffers )
193
203
{
194
204
// _numBytesPreCompleted can temporarily go negative in the event there are
195
205
// completed writes that we haven't triggered callbacks for yet.
@@ -208,24 +218,33 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
208
218
_numBytesPreCompleted += bytesToWrite ;
209
219
bytesLeftToBuffer -= bytesToWrite ;
210
220
211
- if ( error == null )
221
+ if ( write . WriteError == null )
212
222
{
213
223
ThreadPool . QueueUserWorkItem (
214
224
( o ) => ( ( TaskCompletionSource < object > ) o ) . SetResult ( null ) ,
215
225
tcs ) ;
216
226
}
217
227
else
218
228
{
229
+ var error = write . WriteError ;
219
230
// error is closure captured
220
231
ThreadPool . QueueUserWorkItem (
221
232
( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( error ) ,
222
233
tcs ) ;
223
234
}
224
235
}
225
236
237
+ if ( _writeContexts . Count < _maxPooledWriteContexts )
238
+ {
239
+ write . Reset ( ) ;
240
+ _writeContexts . Enqueue ( write ) ;
241
+ }
242
+
226
243
// Now that the while loop has completed the following invariants should hold true:
227
244
Debug . Assert ( _numBytesPreCompleted >= 0 ) ;
228
245
}
246
+
247
+ _log . ConnectionWriteCallback ( _connectionId , status ) ;
229
248
}
230
249
231
250
void ISocketOutput . Write ( ArraySegment < byte > buffer , bool immediate )
@@ -263,7 +282,7 @@ private class WriteContext
263
282
public WriteContext ( SocketOutput self )
264
283
{
265
284
Self = self ;
266
- Buffers = new Queue < ArraySegment < byte > > ( ) ;
285
+ Buffers = new Queue < ArraySegment < byte > > ( 8 ) ;
267
286
}
268
287
269
288
/// <summary>
@@ -340,7 +359,17 @@ public void DoDisconnectIfNeeded()
340
359
341
360
public void Complete ( )
342
361
{
343
- Self . OnWriteCompleted ( Buffers , WriteStatus , WriteError ) ;
362
+ Self . OnWriteCompleted ( this ) ;
363
+ }
364
+
365
+ public void Reset ( )
366
+ {
367
+ Buffers . Clear ( ) ;
368
+ SocketDisconnect = false ;
369
+ SocketShutdownSend = false ;
370
+ WriteStatus = 0 ;
371
+ WriteError = null ;
372
+ ShutdownSendStatus = 0 ;
344
373
}
345
374
}
346
375
}
0 commit comments