@@ -16,6 +16,9 @@ public class SocketOutput : ISocketOutput
16
16
{
17
17
private const int _maxPendingWrites = 3 ;
18
18
private const int _maxBytesPreCompleted = 65536 ;
19
+ private const int _initialTaskQueues = 64 ;
20
+
21
+ private static WaitCallback _returnBlocks = ( state ) => ReturnBlocks ( ( MemoryPoolBlock2 ) state ) ;
19
22
20
23
private readonly KestrelThread _thread ;
21
24
private readonly UvStreamHandle _socket ;
@@ -44,6 +47,7 @@ public class SocketOutput : ISocketOutput
44
47
private Exception _lastWriteError ;
45
48
private WriteContext _nextWriteContext ;
46
49
private readonly Queue < TaskCompletionSource < object > > _tasksPending ;
50
+ private readonly Queue < TaskCompletionSource < object > > _tasksCompleted ;
47
51
48
52
public SocketOutput (
49
53
KestrelThread thread ,
@@ -58,7 +62,8 @@ public SocketOutput(
58
62
_connection = connection ;
59
63
_connectionId = connectionId ;
60
64
_log = log ;
61
- _tasksPending = new Queue < TaskCompletionSource < object > > ( ) ;
65
+ _tasksPending = new Queue < TaskCompletionSource < object > > ( _initialTaskQueues ) ;
66
+ _tasksCompleted = new Queue < TaskCompletionSource < object > > ( _initialTaskQueues ) ;
62
67
63
68
_head = memory . Lease ( ) ;
64
69
_tail = _head ;
@@ -79,6 +84,8 @@ public Task WriteAsync(
79
84
}
80
85
TaskCompletionSource < object > tcs = null ;
81
86
87
+ var scheduleWrite = false ;
88
+
82
89
lock ( _contextLock )
83
90
{
84
91
if ( _nextWriteContext == null )
@@ -118,11 +125,16 @@ public Task WriteAsync(
118
125
119
126
if ( _writesPending < _maxPendingWrites && immediate )
120
127
{
121
- ScheduleWrite ( ) ;
128
+ scheduleWrite = true ;
122
129
_writesPending ++ ;
123
130
}
124
131
}
125
132
133
+ if ( scheduleWrite )
134
+ {
135
+ ScheduleWrite ( ) ;
136
+ }
137
+
126
138
// Return TaskCompletionSource's Task if set, otherwise completed Task
127
139
return tcs ? . Task ?? TaskUtilities . CompletedTask ;
128
140
}
@@ -164,6 +176,9 @@ public MemoryPoolIterator2 ProducingStart()
164
176
165
177
public void ProducingComplete ( MemoryPoolIterator2 end , int count )
166
178
{
179
+ var decreasePreCompleted = false ;
180
+ MemoryPoolBlock2 blockToReturn = null ;
181
+
167
182
lock ( _returnLock )
168
183
{
169
184
Debug . Assert ( _isProducing ) ;
@@ -176,26 +191,40 @@ public void ProducingComplete(MemoryPoolIterator2 end, int count)
176
191
177
192
if ( count != 0 )
178
193
{
179
- lock ( _contextLock )
180
- {
181
- _numBytesPreCompleted += count ;
182
- }
194
+ decreasePreCompleted = true ;
183
195
}
184
196
}
185
197
else
186
198
{
187
- var block = _returnFromOnProducingComplete ;
188
- while ( block != null )
189
- {
190
- var returnBlock = block ;
191
- block = block . Next ;
192
-
193
- returnBlock . Pool ? . Return ( returnBlock ) ;
194
- }
195
-
199
+ blockToReturn = _returnFromOnProducingComplete ;
196
200
_returnFromOnProducingComplete = null ;
197
201
}
198
202
}
203
+
204
+ if ( decreasePreCompleted )
205
+ {
206
+ lock ( _contextLock )
207
+ {
208
+ _numBytesPreCompleted += count ;
209
+ }
210
+ }
211
+
212
+
213
+ if ( blockToReturn != null )
214
+ {
215
+ ThreadPool . QueueUserWorkItem ( _returnBlocks , blockToReturn ) ;
216
+ }
217
+ }
218
+
219
+ private static void ReturnBlocks ( MemoryPoolBlock2 block )
220
+ {
221
+ while ( block != null )
222
+ {
223
+ var returningBlock = block ;
224
+ block = returningBlock . Next ;
225
+
226
+ returningBlock . Pool ? . Return ( returningBlock ) ;
227
+ }
199
228
}
200
229
201
230
private void ScheduleWrite ( )
@@ -252,11 +281,13 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
252
281
_connection . Abort ( ) ;
253
282
}
254
283
284
+ bool scheduleWrite = false ;
285
+
255
286
lock ( _contextLock )
256
287
{
257
288
if ( _nextWriteContext != null )
258
289
{
259
- ScheduleWrite ( ) ;
290
+ scheduleWrite = true ;
260
291
}
261
292
else
262
293
{
@@ -279,21 +310,36 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
279
310
_numBytesPreCompleted += bytesToWrite ;
280
311
bytesLeftToBuffer -= bytesToWrite ;
281
312
282
- if ( _lastWriteError == null )
283
- {
284
- ThreadPool . QueueUserWorkItem (
285
- ( o ) => ( ( TaskCompletionSource < object > ) o ) . SetResult ( null ) ,
286
- tcs ) ;
287
- }
288
- else
289
- {
290
- // error is closure captured
291
- ThreadPool . QueueUserWorkItem (
292
- ( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( _lastWriteError ) ,
293
- tcs ) ;
294
- }
313
+ _tasksCompleted . Enqueue ( tcs ) ;
314
+ }
315
+ }
316
+
317
+ while ( _tasksCompleted . Count > 0 )
318
+ {
319
+ var tcs = _tasksCompleted . Dequeue ( ) ;
320
+ if ( _lastWriteError == null )
321
+ {
322
+ ThreadPool . QueueUserWorkItem (
323
+ ( o ) => ( ( TaskCompletionSource < object > ) o ) . SetResult ( null ) ,
324
+ tcs ) ;
325
+ }
326
+ else
327
+ {
328
+ // error is closure captured
329
+ ThreadPool . QueueUserWorkItem (
330
+ ( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( _lastWriteError ) ,
331
+ tcs ) ;
295
332
}
296
333
}
334
+
335
+ if ( scheduleWrite )
336
+ {
337
+ // ScheduleWrite();
338
+ // on right thread, fairness issues?
339
+ WriteAllPending ( ) ;
340
+ }
341
+
342
+ _tasksCompleted . Clear ( ) ;
297
343
}
298
344
299
345
// This is called on the libuv event loop
@@ -345,6 +391,8 @@ Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, Cancell
345
391
346
392
private class WriteContext
347
393
{
394
+ private static WaitCallback _returnWrittenBlocks = ( state ) => ReturnWrittenBlocks ( ( MemoryPoolBlock2 ) state ) ;
395
+
348
396
private MemoryPoolIterator2 _lockedStart ;
349
397
private MemoryPoolIterator2 _lockedEnd ;
350
398
private int _bufferCount ;
@@ -385,7 +433,7 @@ public void DoWriteIfNeeded()
385
433
{
386
434
_writeReq . Dispose ( ) ;
387
435
var _this = ( WriteContext ) state ;
388
- _this . ReturnFullyWrittenBlocks ( ) ;
436
+ _this . ScheduleReturnFullyWrittenBlocks ( ) ;
389
437
_this . WriteStatus = status ;
390
438
_this . WriteError = error ;
391
439
_this . DoShutdownIfNeeded ( ) ;
@@ -441,20 +489,37 @@ public void Complete()
441
489
{
442
490
Self . OnWriteCompleted ( _byteCount , WriteStatus , WriteError ) ;
443
491
}
444
-
445
- private void ReturnFullyWrittenBlocks ( )
492
+
493
+ private void ScheduleReturnFullyWrittenBlocks ( )
446
494
{
447
495
var block = _lockedStart . Block ;
448
- while ( block != _lockedEnd . Block )
496
+ var end = _lockedEnd . Block ;
497
+ if ( block == end )
498
+ {
499
+ end . Unpin ( ) ;
500
+ return ;
501
+ }
502
+
503
+ while ( block . Next != end )
504
+ {
505
+ block = block . Next ;
506
+ block . Unpin ( ) ;
507
+ }
508
+ block . Next = null ;
509
+
510
+ ThreadPool . QueueUserWorkItem ( _returnWrittenBlocks , _lockedStart . Block ) ;
511
+ }
512
+
513
+ private static void ReturnWrittenBlocks ( MemoryPoolBlock2 block )
514
+ {
515
+ while ( block != null )
449
516
{
450
517
var returnBlock = block ;
451
518
block = block . Next ;
452
519
453
520
returnBlock . Unpin ( ) ;
454
521
returnBlock . Pool ? . Return ( returnBlock ) ;
455
522
}
456
-
457
- _lockedEnd . Block . Unpin ( ) ;
458
523
}
459
524
460
525
private void LockWrite ( )
0 commit comments