Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit 9c27066

Browse files
committed
Pool UvWriteReqs instead of SocketOutput.WriteContexts
- This allows all connections accepted by the same thread to share a pool
1 parent 63af023 commit 9c27066

File tree

5 files changed

+57
-80
lines changed

5 files changed

+57
-80
lines changed

src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context
4242
_connectionId = Interlocked.Increment(ref _lastConnectionId);
4343

4444
_rawSocketInput = new SocketInput(Memory2, ThreadPool);
45-
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool);
45+
_rawSocketOutput = new SocketOutput(Thread, _socket, Memory2, this, _connectionId, Log, ThreadPool, WriteReqPool);
4646
}
4747

4848
public void Start()

src/Microsoft.AspNet.Server.Kestrel/Http/Listener.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public void Dispose()
9494
{
9595
var socket = (Listener)tcs2.Task.AsyncState;
9696
socket.ListenSocket.Dispose();
97+
98+
var writeReqPool = socket.WriteReqPool;
99+
while (writeReqPool.Count > 0)
100+
{
101+
writeReqPool.Dequeue().Dispose();
102+
}
103+
97104
tcs2.SetResult(0);
98105
}
99106
catch (Exception ex)

src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4-
using Microsoft.AspNet.Http;
4+
using System.Collections.Generic;
55
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
6+
using Microsoft.AspNet.Server.Kestrel.Networking;
67

78
namespace Microsoft.AspNet.Server.Kestrel.Http
89
{
@@ -17,6 +18,7 @@ public ListenerContext(ServiceContext serviceContext)
1718
: base(serviceContext)
1819
{
1920
Memory2 = new MemoryPool2();
21+
WriteReqPool = new Queue<UvWriteReq>(SocketOutput.MaxPooledWriteReqs);
2022
}
2123

2224
public ListenerContext(ListenerContext listenerContext)
@@ -25,6 +27,7 @@ public ListenerContext(ListenerContext listenerContext)
2527
ServerAddress = listenerContext.ServerAddress;
2628
Thread = listenerContext.Thread;
2729
Memory2 = listenerContext.Memory2;
30+
WriteReqPool = listenerContext.WriteReqPool;
2831
Log = listenerContext.Log;
2932
}
3033

@@ -33,5 +36,7 @@ public ListenerContext(ListenerContext listenerContext)
3336
public KestrelThread Thread { get; set; }
3437

3538
public MemoryPool2 Memory2 { get; set; }
39+
40+
public Queue<UvWriteReq> WriteReqPool { get; set; }
3641
}
3742
}

src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs

Lines changed: 38 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
1414
{
1515
public class SocketOutput : ISocketOutput
1616
{
17+
public const int MaxPooledWriteReqs = 1024;
18+
1719
private const int _maxPendingWrites = 3;
1820
private const int _maxBytesPreCompleted = 65536;
1921
private const int _initialTaskQueues = 64;
20-
private const int _maxPooledWriteContexts = 32;
2122

2223
private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
2324

@@ -39,7 +40,6 @@ public class SocketOutput : ISocketOutput
3940

4041
// This locks access to to all of the below fields
4142
private readonly object _contextLock = new object();
42-
private bool _isDisposed = false;
4343

4444
// The number of write operations that have been scheduled so far
4545
// but have not completed.
@@ -50,7 +50,7 @@ public class SocketOutput : ISocketOutput
5050
private WriteContext _nextWriteContext;
5151
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
5252
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
53-
private readonly Queue<WriteContext> _writeContextPool;
53+
private readonly Queue<UvWriteReq> _writeReqPool;
5454

5555
public SocketOutput(
5656
KestrelThread thread,
@@ -59,7 +59,8 @@ public SocketOutput(
5959
Connection connection,
6060
long connectionId,
6161
IKestrelTrace log,
62-
IThreadPool threadPool)
62+
IThreadPool threadPool,
63+
Queue<UvWriteReq> writeReqPool)
6364
{
6465
_thread = thread;
6566
_socket = socket;
@@ -69,7 +70,7 @@ public SocketOutput(
6970
_threadPool = threadPool;
7071
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
7172
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
72-
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);
73+
_writeReqPool = writeReqPool;
7374

7475
_head = memory.Lease();
7576
_tail = _head;
@@ -96,14 +97,7 @@ public Task WriteAsync(
9697
{
9798
if (_nextWriteContext == null)
9899
{
99-
if (_writeContextPool.Count > 0)
100-
{
101-
_nextWriteContext = _writeContextPool.Dequeue();
102-
}
103-
else
104-
{
105-
_nextWriteContext = new WriteContext(this);
106-
}
100+
_nextWriteContext = new WriteContext(this);
107101
}
108102

109103
if (socketShutdownSend)
@@ -304,7 +298,6 @@ private void OnWriteCompleted(WriteContext writeContext)
304298

305299
lock (_contextLock)
306300
{
307-
PoolWriteContext(writeContext);
308301
if (_nextWriteContext != null)
309302
{
310303
scheduleWrite = true;
@@ -382,32 +375,6 @@ private void ReturnAllBlocks()
382375
}
383376
}
384377

385-
private void PoolWriteContext(WriteContext writeContext)
386-
{
387-
// called inside _contextLock
388-
if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts)
389-
{
390-
writeContext.Reset();
391-
_writeContextPool.Enqueue(writeContext);
392-
}
393-
else
394-
{
395-
writeContext.Dispose();
396-
}
397-
}
398-
399-
private void Dispose()
400-
{
401-
lock (_contextLock)
402-
{
403-
_isDisposed = true;
404-
while (_writeContextPool.Count > 0)
405-
{
406-
_writeContextPool.Dequeue().Dispose();
407-
}
408-
}
409-
}
410-
411378
void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
412379
{
413380
var task = WriteAsync(buffer, immediate);
@@ -449,7 +416,7 @@ private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2
449416
buffers++;
450417
}
451418

452-
private class WriteContext : IDisposable
419+
private class WriteContext
453420
{
454421
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);
455422

@@ -473,8 +440,6 @@ private class WriteContext : IDisposable
473440
public WriteContext(SocketOutput self)
474441
{
475442
Self = self;
476-
_writeReq = new UvWriteReq(Self._log);
477-
_writeReq.Init(Self._thread.Loop);
478443
}
479444

480445
/// <summary>
@@ -495,13 +460,24 @@ public void DoWriteIfNeeded()
495460
var lockedEndBlock = _lockedEnd.Block;
496461
var lockedEndIndex = _lockedEnd.Index;
497462

463+
if (Self._writeReqPool.Count > 0)
464+
{
465+
_writeReq = Self._writeReqPool.Dequeue();
466+
}
467+
else
468+
{
469+
_writeReq = new UvWriteReq(Self._log);
470+
_writeReq.Init(Self._thread.Loop);
471+
}
472+
498473
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
499474
{
500-
var _this = (WriteContext)state;
501-
_this.ScheduleReturnFullyWrittenBlocks();
502-
_this.WriteStatus = status;
503-
_this.WriteError = error;
504-
_this.DoShutdownIfNeeded();
475+
var writeContext = (WriteContext)state;
476+
writeContext.PoolWriteReq(writeContext._writeReq);
477+
writeContext.ScheduleReturnFullyWrittenBlocks();
478+
writeContext.WriteStatus = status;
479+
writeContext.WriteError = error;
480+
writeContext.DoShutdownIfNeeded();
505481
}, this);
506482

507483
Self._head = lockedEndBlock;
@@ -545,14 +521,12 @@ public void DoDisconnectIfNeeded()
545521
}
546522
else if (Self._socket.IsClosed)
547523
{
548-
Self.Dispose();
549524
Complete();
550525
return;
551526
}
552527

553528
Self._socket.Dispose();
554529
Self.ReturnAllBlocks();
555-
Self.Dispose();
556530
Self._log.ConnectionStop(Self._connectionId);
557531
Complete();
558532
}
@@ -561,7 +535,19 @@ public void Complete()
561535
{
562536
Self.OnWriteCompleted(this);
563537
}
564-
538+
539+
private void PoolWriteReq(UvWriteReq writeReq)
540+
{
541+
if (Self._writeReqPool.Count < MaxPooledWriteReqs)
542+
{
543+
Self._writeReqPool.Enqueue(writeReq);
544+
}
545+
else
546+
{
547+
writeReq.Dispose();
548+
}
549+
}
550+
565551
private void ScheduleReturnFullyWrittenBlocks()
566552
{
567553
var block = _lockedStart.Block;
@@ -608,29 +594,8 @@ private void LockWrite()
608594

609595
_lockedStart = new MemoryPoolIterator2(head, head.Start);
610596
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);
611-
612-
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
613-
}
614597

615-
public void Reset()
616-
{
617-
_lockedStart = default(MemoryPoolIterator2);
618-
_lockedEnd = default(MemoryPoolIterator2);
619-
_bufferCount = 0;
620-
ByteCount = 0;
621-
622-
SocketShutdownSend = false;
623-
SocketDisconnect = false;
624-
625-
WriteStatus = 0;
626-
WriteError = null;
627-
628-
ShutdownSendStatus = 0;
629-
}
630-
631-
public void Dispose()
632-
{
633-
_writeReq.Dispose();
598+
BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
634599
}
635600
}
636601
}

test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void CanWrite1MB()
4242
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
4343
var trace = new KestrelTrace(new TestKestrelTrace());
4444
var ltp = new LoggingThreadPool(trace);
45-
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
45+
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
4646

4747
// I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test.
4848
var bufferSize = 1048576;
@@ -89,7 +89,7 @@ public void WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyPreCompleted(
8989
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
9090
var trace = new KestrelTrace(new TestKestrelTrace());
9191
var ltp = new LoggingThreadPool(trace);
92-
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
92+
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
9393

9494
var bufferSize = maxBytesPreCompleted;
9595
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@@ -146,7 +146,7 @@ public void WritesDontCompleteImmediatelyWhenTooManyBytesIncludingNonImmediateAr
146146
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
147147
var trace = new KestrelTrace(new TestKestrelTrace());
148148
var ltp = new LoggingThreadPool(trace);
149-
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
149+
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
150150

151151
var bufferSize = maxBytesPreCompleted;
152152

@@ -227,7 +227,7 @@ public void WritesDontGetCompletedTooQuickly()
227227
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
228228
var trace = new KestrelTrace(new TestKestrelTrace());
229229
var ltp = new LoggingThreadPool(trace);
230-
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
230+
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
231231

232232
var bufferSize = maxBytesPreCompleted;
233233
var buffer = new ArraySegment<byte>(new byte[bufferSize], 0, bufferSize);
@@ -304,7 +304,7 @@ public void ProducingStartAndProducingCompleteCanBeUsedDirectly()
304304
var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace());
305305
var trace = new KestrelTrace(new TestKestrelTrace());
306306
var ltp = new LoggingThreadPool(trace);
307-
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp);
307+
var socketOutput = new SocketOutput(kestrelThread, socket, memory, null, 0, trace, ltp, new Queue<UvWriteReq>());
308308

309309
// block 1
310310
var start = socketOutput.ProducingStart();

0 commit comments

Comments
 (0)