diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 82242f039..bbff4a31e 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -17,6 +17,7 @@ public class SocketOutput : ISocketOutput private const int _maxPendingWrites = 3; private const int _maxBytesPreCompleted = 65536; private const int _initialTaskQueues = 64; + private const int _maxPooledWriteContexts = 32; private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state); private static MemoryPoolIterator2 _defaultIterator; @@ -39,6 +40,7 @@ public class SocketOutput : ISocketOutput // This locks access to to all of the below fields private readonly object _contextLock = new object(); + private bool _isDisposed = false; // The number of write operations that have been scheduled so far // but have not completed. @@ -49,6 +51,7 @@ public class SocketOutput : ISocketOutput private WriteContext _nextWriteContext; private readonly Queue> _tasksPending; private readonly Queue> _tasksCompleted; + private readonly Queue _writeContextPool; public SocketOutput( KestrelThread thread, @@ -67,6 +70,7 @@ public SocketOutput( _threadPool = threadPool; _tasksPending = new Queue>(_initialTaskQueues); _tasksCompleted = new Queue>(_initialTaskQueues); + _writeContextPool = new Queue(_maxPooledWriteContexts); _head = memory.Lease(); _tail = _head; @@ -93,7 +97,14 @@ public Task WriteAsync( { if (_nextWriteContext == null) { - _nextWriteContext = new WriteContext(this); + if (_writeContextPool.Count > 0) + { + _nextWriteContext = _writeContextPool.Dequeue(); + } + else + { + _nextWriteContext = new WriteContext(this); + } } if (socketShutdownSend) @@ -275,9 +286,12 @@ private void WriteAllPending() } // This is called on the libuv event loop - private void OnWriteCompleted(int bytesWritten, int status, Exception error) + private void OnWriteCompleted(WriteContext writeContext) { - _log.ConnectionWriteCallback(_connectionId, status); + var bytesWritten = writeContext.ByteCount; + var status = writeContext.WriteStatus; + var error = writeContext.WriteError; + if (error != null) { @@ -291,6 +305,7 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error) lock (_contextLock) { + PoolWriteContext(writeContext); if (_nextWriteContext != null) { scheduleWrite = true; @@ -333,11 +348,11 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error) } } + _log.ConnectionWriteCallback(_connectionId, status); + if (scheduleWrite) { - // ScheduleWrite(); - // on right thread, fairness issues? - WriteAllPending(); + ScheduleWrite(); } _tasksCompleted.Clear(); @@ -368,6 +383,32 @@ private void ReturnAllBlocks() } } + private void PoolWriteContext(WriteContext writeContext) + { + // called inside _contextLock + if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts) + { + writeContext.Reset(); + _writeContextPool.Enqueue(writeContext); + } + else + { + writeContext.Dispose(); + } + } + + private void Dispose() + { + lock (_contextLock) + { + _isDisposed = true; + while (_writeContextPool.Count > 0) + { + _writeContextPool.Dequeue().Dispose(); + } + } + } + void ISocketOutput.Write(ArraySegment buffer, bool immediate) { var task = WriteAsync(buffer, immediate); @@ -409,14 +450,14 @@ private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2 buffers++; } - private class WriteContext + private class WriteContext : IDisposable { private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state); private MemoryPoolIterator2 _lockedStart; private MemoryPoolIterator2 _lockedEnd; private int _bufferCount; - private int _byteCount; + public int ByteCount; public SocketOutput Self; @@ -426,11 +467,15 @@ private class WriteContext public int WriteStatus; public Exception WriteError; + private UvWriteReq _writeReq; + public int ShutdownSendStatus; public WriteContext(SocketOutput self) { Self = self; + _writeReq = new UvWriteReq(Self._log); + _writeReq.Init(Self._thread.Loop); } /// @@ -440,18 +485,19 @@ public void DoWriteIfNeeded() { LockWrite(); - if (_byteCount == 0 || Self._socket.IsClosed) + if (ByteCount == 0 || Self._socket.IsClosed) { DoShutdownIfNeeded(); return; } - var writeReq = new UvWriteReq(Self._log); - writeReq.Init(Self._thread.Loop); + // Sample values locally in case write completes inline + // to allow block to be Reset and still complete this function + var lockedEndBlock = _lockedEnd.Block; + var lockedEndIndex = _lockedEnd.Index; - writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => + _writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) => { - _writeReq.Dispose(); var _this = (WriteContext)state; _this.ScheduleReturnFullyWrittenBlocks(); _this.WriteStatus = status; @@ -459,8 +505,8 @@ public void DoWriteIfNeeded() _this.DoShutdownIfNeeded(); }, this); - Self._head = _lockedEnd.Block; - Self._head.Start = _lockedEnd.Index; + Self._head = lockedEndBlock; + Self._head.Start = lockedEndIndex; } /// @@ -493,21 +539,28 @@ public void DoShutdownIfNeeded() /// public void DoDisconnectIfNeeded() { - if (SocketDisconnect == false || Self._socket.IsClosed) + if (SocketDisconnect == false) { Complete(); return; } + else if (Self._socket.IsClosed) + { + Self.Dispose(); + Complete(); + return; + } Self._socket.Dispose(); Self.ReturnAllBlocks(); + Self.Dispose(); Self._log.ConnectionStop(Self._connectionId); Complete(); } public void Complete() { - Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError); + Self.OnWriteCompleted(this); } private void ScheduleReturnFullyWrittenBlocks() @@ -556,8 +609,29 @@ private void LockWrite() _lockedStart = new MemoryPoolIterator2(head, head.Start); _lockedEnd = new MemoryPoolIterator2(tail, tail.End); + + BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount); + } - BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount); + public void Reset() + { + _lockedStart = default(MemoryPoolIterator2); + _lockedEnd = default(MemoryPoolIterator2); + _bufferCount = 0; + ByteCount = 0; + + SocketShutdownSend = false; + SocketDisconnect = false; + + WriteStatus = 0; + WriteError = null; + + ShutdownSendStatus = 0; + } + + public void Dispose() + { + _writeReq.Dispose(); } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index ee43b9f60..de603b445 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -18,16 +18,21 @@ namespace Microsoft.AspNet.Server.Kestrel /// public class KestrelThread { + // maximum times the work queues swapped and are processed in a single pass + // as completing a task may immediately have write data to put on the network + // otherwise it needs to wait till the next pass of the libuv loop + private const int _maxLoops = 8; + private static Action _threadCallbackAdapter = (callback, state) => ((Action)callback).Invoke((KestrelThread)state); private KestrelEngine _engine; private readonly IApplicationLifetime _appLifetime; private Thread _thread; private UvLoopHandle _loop; private UvAsyncHandle _post; - private Queue _workAdding = new Queue(); - private Queue _workRunning = new Queue(); - private Queue _closeHandleAdding = new Queue(); - private Queue _closeHandleRunning = new Queue(); + private Queue _workAdding = new Queue(1024); + private Queue _workRunning = new Queue(1024); + private Queue _closeHandleAdding = new Queue(256); + private Queue _closeHandleRunning = new Queue(256); private object _workSync = new Object(); private bool _stopImmediate = false; private bool _initCompleted = false; @@ -249,11 +254,17 @@ private void ThreadStart(object parameter) private void OnPost() { - DoPostWork(); - DoPostCloseHandle(); + var loopsRemaining = _maxLoops; + bool wasWork; + do + { + wasWork = DoPostWork(); + wasWork = DoPostCloseHandle() || wasWork; + loopsRemaining--; + } while (wasWork && loopsRemaining > 0); } - private void DoPostWork() + private bool DoPostWork() { Queue queue; lock (_workSync) @@ -262,6 +273,9 @@ private void DoPostWork() _workAdding = _workRunning; _workRunning = queue; } + + bool wasWork = queue.Count > 0; + while (queue.Count != 0) { var work = queue.Dequeue(); @@ -286,8 +300,10 @@ private void DoPostWork() } } } + + return wasWork; } - private void DoPostCloseHandle() + private bool DoPostCloseHandle() { Queue queue; lock (_workSync) @@ -296,6 +312,9 @@ private void DoPostCloseHandle() _closeHandleAdding = _closeHandleRunning; _closeHandleRunning = queue; } + + bool wasWork = queue.Count > 0; + while (queue.Count != 0) { var closeHandle = queue.Dequeue(); @@ -309,6 +328,8 @@ private void DoPostCloseHandle() throw; } } + + return wasWork; } private struct Work diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 2695036f3..21a1aaf77 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvWriteReq : UvRequest { - private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status); private IntPtr _bufs; @@ -22,7 +22,7 @@ public class UvWriteReq : UvRequest private object _state; private const int BUFFER_COUNT = 4; - private List _pins = new List(); + private List _pins = new List(BUFFER_COUNT + 1); public UvWriteReq(IKestrelTrace logger) : base(logger) {