@@ -32,14 +32,16 @@ public class SocketOutput : ISocketOutput
32
32
private Exception _lastWriteError ;
33
33
private WriteContext _nextWriteContext ;
34
34
private readonly Queue < TaskCompletionSource < object > > _tasksPending ;
35
+ private readonly Queue < WriteContext > _writeContexts ;
35
36
36
37
public SocketOutput ( KestrelThread thread , UvStreamHandle socket , long connectionId , IKestrelTrace log )
37
38
{
38
39
_thread = thread ;
39
40
_socket = socket ;
40
41
_connectionId = connectionId ;
41
42
_log = log ;
42
- _tasksPending = new Queue < TaskCompletionSource < object > > ( ) ;
43
+ _tasksPending = new Queue < TaskCompletionSource < object > > ( 16 ) ;
44
+ _writeContexts = new Queue < WriteContext > ( 16 ) ;
43
45
}
44
46
45
47
public Task WriteAsync (
@@ -63,7 +65,14 @@ public Task WriteAsync(
63
65
{
64
66
if ( _nextWriteContext == null )
65
67
{
66
- _nextWriteContext = new WriteContext ( this ) ;
68
+ if ( _writeContexts . Count > 0 )
69
+ {
70
+ _nextWriteContext = _writeContexts . Dequeue ( ) ;
71
+ }
72
+ else
73
+ {
74
+ _nextWriteContext = new WriteContext ( this ) ;
75
+ }
67
76
}
68
77
69
78
if ( buffer . Array != null )
@@ -172,13 +181,13 @@ private void WriteAllPending()
172
181
}
173
182
174
183
// This is called on the libuv event loop
175
- private void OnWriteCompleted ( Queue < ArraySegment < byte > > writtenBuffers , int status , Exception error )
184
+ private void OnWriteCompleted ( WriteContext write )
176
185
{
177
- _log . ConnectionWriteCallback ( _connectionId , status ) ;
186
+ var status = write . WriteStatus ;
178
187
179
188
lock ( _lockObj )
180
189
{
181
- _lastWriteError = error ;
190
+ _lastWriteError = write . WriteError ;
182
191
183
192
if ( _nextWriteContext != null )
184
193
{
@@ -189,7 +198,7 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
189
198
_writesPending -- ;
190
199
}
191
200
192
- foreach ( var writeBuffer in writtenBuffers )
201
+ foreach ( var writeBuffer in write . Buffers )
193
202
{
194
203
// _numBytesPreCompleted can temporarily go negative in the event there are
195
204
// completed writes that we haven't triggered callbacks for yet.
@@ -208,24 +217,30 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
208
217
_numBytesPreCompleted += bytesToWrite ;
209
218
bytesLeftToBuffer -= bytesToWrite ;
210
219
211
- if ( error == null )
220
+ if ( write . WriteError == null )
212
221
{
213
222
ThreadPool . QueueUserWorkItem (
214
223
( o ) => ( ( TaskCompletionSource < object > ) o ) . SetResult ( null ) ,
215
224
tcs ) ;
216
225
}
217
226
else
218
227
{
228
+ var error = write . WriteError ;
219
229
// error is closure captured
220
230
ThreadPool . QueueUserWorkItem (
221
231
( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( error ) ,
222
232
tcs ) ;
223
233
}
224
234
}
225
235
236
+ write . Reset ( ) ;
237
+ _writeContexts . Enqueue ( write ) ;
238
+
226
239
// Now that the while loop has completed the following invariants should hold true:
227
240
Debug . Assert ( _numBytesPreCompleted >= 0 ) ;
228
241
}
242
+
243
+ _log . ConnectionWriteCallback ( _connectionId , status ) ;
229
244
}
230
245
231
246
void ISocketOutput . Write ( ArraySegment < byte > buffer , bool immediate )
@@ -263,7 +278,7 @@ private class WriteContext
263
278
public WriteContext ( SocketOutput self )
264
279
{
265
280
Self = self ;
266
- Buffers = new Queue < ArraySegment < byte > > ( ) ;
281
+ Buffers = new Queue < ArraySegment < byte > > ( 8 ) ;
267
282
}
268
283
269
284
/// <summary>
@@ -340,7 +355,17 @@ public void DoDisconnectIfNeeded()
340
355
341
356
public void Complete ( )
342
357
{
343
- Self . OnWriteCompleted ( Buffers , WriteStatus , WriteError ) ;
358
+ Self . OnWriteCompleted ( this ) ;
359
+ }
360
+
361
+ public void Reset ( )
362
+ {
363
+ Buffers . Clear ( ) ;
364
+ SocketDisconnect = false ;
365
+ SocketShutdownSend = false ;
366
+ WriteStatus = 0 ;
367
+ WriteError = null ;
368
+ ShutdownSendStatus = 0 ;
344
369
}
345
370
}
346
371
}
0 commit comments