From f51bff249709143f29193a74f36d25d2d21a28d5 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Nov 2015 11:10:30 +0000 Subject: [PATCH 1/6] Lockless, lower alloc SocketOutput No locks Use MemoryPool2 Reduced libuv thread activation Improved write buffering Improved buffer to IO timing No alloc in callback paths --- .../Http/Connection.cs | 6 +- .../Http/ListenerContext.cs | 13 +- .../Http/ListenerPrimary.cs | 9 +- .../Http/SocketOutput.cs | 534 ++++++++++-------- .../Infrastructure/MemoryPool2.cs | 2 + .../Networking/UvShutdownReq.cs | 6 +- .../Networking/UvWriteReq.cs | 109 ++-- .../MultipleLoopTests.cs | 18 +- .../NetworkingTests.cs | 10 +- .../SocketOutputTests.cs | 97 ++-- 10 files changed, 447 insertions(+), 357 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs index ac0540d18..3c8c05514 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs @@ -39,8 +39,8 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context _connectionId = Interlocked.Increment(ref _lastConnectionId); - _rawSocketInput = new SocketInput(Memory2); - _rawSocketOutput = new SocketOutput(Thread, _socket, _connectionId, Log); + _rawSocketInput = new SocketInput(InputMemory); + _rawSocketOutput = new SocketOutput(OutputMemory, Thread, _socket, _connectionId, Log); } public void Start() @@ -100,7 +100,7 @@ public void Start() private void ApplyConnectionFilter() { - var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log); + var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, InputMemory, Log); SocketInput = filteredStreamAdapter.SocketInput; SocketOutput = filteredStreamAdapter.SocketOutput; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs index 581720041..abbeddf59 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs @@ -10,13 +10,15 @@ public class ListenerContext : ServiceContext { public ListenerContext() { - Memory2 = new MemoryPool2(); + InputMemory = new MemoryPool2(); + OutputMemory = new MemoryPool2(); } public ListenerContext(ServiceContext serviceContext) : base(serviceContext) { - Memory2 = new MemoryPool2(); + InputMemory = new MemoryPool2(); + OutputMemory = new MemoryPool2(); } public ListenerContext(ListenerContext listenerContext) @@ -25,7 +27,8 @@ public ListenerContext(ListenerContext listenerContext) ServerAddress = listenerContext.ServerAddress; Thread = listenerContext.Thread; Application = listenerContext.Application; - Memory2 = listenerContext.Memory2; + InputMemory = listenerContext.InputMemory; + OutputMemory = listenerContext.OutputMemory; Log = listenerContext.Log; } @@ -35,6 +38,8 @@ public ListenerContext(ListenerContext listenerContext) public RequestDelegate Application { get; set; } - public MemoryPool2 Memory2 { get; set; } + public MemoryPool2 InputMemory { get; set; } + + public MemoryPool2 OutputMemory { get; set; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs index 21c6c5518..6852f0618 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs @@ -22,7 +22,7 @@ abstract public class ListenerPrimary : Listener // this message is passed to write2 because it must be non-zero-length, // but it has no other functional significance - private readonly ArraySegment> _dummyMessage = new ArraySegment>(new[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }); + private readonly byte[] _dummyBuffer = { 1, 2, 3, 4 }; protected ListenerPrimary(ServiceContext serviceContext) : base(serviceContext) { @@ -80,14 +80,17 @@ protected override void DispatchConnection(UvStreamHandle socket) } else { + var msg = MemoryPoolBlock2.Create(new ArraySegment(_dummyBuffer), IntPtr.Zero, null, null); + msg.End = msg.Start + _dummyBuffer.Length; + var dispatchPipe = _dispatchPipes[index]; var write = new UvWriteReq(Log); write.Init(Thread.Loop); write.Write2( dispatchPipe, - _dummyMessage, + new ArraySegment(new[] { msg }), socket, - (write2, status, error, state) => + (write2, status, error, bytesWritten, state) => { write2.Dispose(); ((UvStreamHandle)state).Dispose(); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index d0d218ac5..5f88e6681 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -2,8 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Generic; -using System.Diagnostics; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNet.Server.Kestrel.Infrastructure; @@ -13,104 +12,145 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { - private const int _maxPendingWrites = 3; - private const int _maxBytesPreCompleted = 65536; + // ~64k; act=64512 + internal const int MaxBytesPreCompleted = 2 * (MemoryPool2.WritableBlockSize * UvWriteReq.BUFFER_COUNT) - 1; + + private static MemoryPoolBlock2[] _emptyBlocks = new MemoryPoolBlock2[0]; private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; private readonly long _connectionId; private readonly IKestrelTrace _log; - // This locks access to to all of the below fields - private readonly object _lockObj = new object(); + private readonly MemoryPool2 _memory; + + private WriteBlock _currentWriteBlock; + private ConcurrentQueue _memoryBlocks; - // The number of write operations that have been scheduled so far - // but have not completed. - private int _writesPending = 0; + private long _bytesQueued; + private long _bytesWritten; private int _numBytesPreCompleted = 0; private Exception _lastWriteError; - private WriteContext _nextWriteContext; - private readonly Queue _callbacksPending; + private int _lastStatus = 0; + private readonly ConcurrentQueue _callbacksPending; + + public int ShutdownSendStatus; - public SocketOutput(KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log) + private volatile int pendingWriteBitFlag = 0; + private bool SocketShutdownSent; + + public SocketOutput(MemoryPool2 memory, KestrelThread thread, UvStreamHandle socket, long connectionId, IKestrelTrace log) { _thread = thread; _socket = socket; _connectionId = connectionId; _log = log; - _callbacksPending = new Queue(); + _callbacksPending = new ConcurrentQueue(); + _memory = memory; + _memoryBlocks = new ConcurrentQueue(); } - public void Write( + internal void Write( ArraySegment buffer, - Action callback, + Action callback, object state, bool immediate = true, bool socketShutdownSend = false, bool socketDisconnect = false) { - //TODO: need buffering that works - if (buffer.Array != null) - { - var copy = new byte[buffer.Count]; - Array.Copy(buffer.Array, buffer.Offset, copy, 0, buffer.Count); - buffer = new ArraySegment(copy); - _log.ConnectionWrite(_connectionId, buffer.Count); - } + bool triggerCallbackNow; + var queuedBytes = _bytesQueued; + bool blockFilled = false; - bool triggerCallbackNow = false; + var inputLength = buffer.Array != null ? buffer.Count : 0; + MemoryPoolBlock2 memoryBlock; - lock (_lockObj) + if (inputLength > 0) { - if (_nextWriteContext == null) - { - _nextWriteContext = new WriteContext(this); - } + memoryBlock = Interlocked.Exchange(ref _currentWriteBlock.Block, null); - if (buffer.Array != null) - { - _nextWriteContext.Buffers.Enqueue(buffer); - } - if (socketShutdownSend) - { - _nextWriteContext.SocketShutdownSend = true; - } - if (socketDisconnect) - { - _nextWriteContext.SocketDisconnect = true; - } - // Complete the write task immediately if all previous write tasks have been completed, - // the buffers haven't grown too large, and the last write to the socket succeeded. - triggerCallbackNow = _lastWriteError == null && - _callbacksPending.Count == 0 && - _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted; - if (triggerCallbackNow) - { - _numBytesPreCompleted += buffer.Count; - } - else + _log.ConnectionWrite(_connectionId, inputLength); + + int blockRemaining = memoryBlock != null ? memoryBlock.Data.Count - (memoryBlock.End - memoryBlock.Start) : 0; + + var remaining = inputLength; + var offset = buffer.Offset; + + while (remaining > 0) { - _callbacksPending.Enqueue(new CallbackContext + if (memoryBlock == null) { - Callback = callback, - State = state, - BytesToWrite = buffer.Count - }); + memoryBlock = _memory.Lease(MemoryPool2.WritableBlockSize); + blockRemaining = memoryBlock.Data.Count; + } + + var copyAmount = blockRemaining >= remaining ? remaining : blockRemaining; + Buffer.BlockCopy(buffer.Array, offset, memoryBlock.Array, memoryBlock.End, copyAmount); + + remaining -= copyAmount; + blockRemaining -= copyAmount; + memoryBlock.End += copyAmount; + offset += copyAmount; + + if (blockRemaining == 0) + { + _memoryBlocks.Enqueue(new WriteBlock() { Block = memoryBlock }); + memoryBlock = null; + blockFilled = true; + } } - if (_writesPending < _maxPendingWrites && immediate) + Interlocked.Exchange(ref _currentWriteBlock.Block, memoryBlock); + } + + CallbackContext callbackContext; + var nextPendingBytes = inputLength + _numBytesPreCompleted; + if (_lastWriteError == null && + _callbacksPending.TryPeek(out callbackContext) && + nextPendingBytes <= MaxBytesPreCompleted) + { + triggerCallbackNow = true; + } + else + { + triggerCallbackNow = queuedBytes <= _bytesWritten; + } + + if (!triggerCallbackNow) + { + callbackContext = new CallbackContext { - ScheduleWrite(); - _writesPending++; - } + Callback = callback, + State = state, + BytesWrittenThreshold = queuedBytes + }; + _callbacksPending.Enqueue(callbackContext); + } + + if (immediate) + { + _currentWriteBlock.SocketDisconnect |= socketDisconnect; + _currentWriteBlock.SocketShutdownSend |= socketShutdownSend; + } + if (immediate || blockFilled) + { + SendBufferedData(); } - // Make sure we call user code outside of the lock. if (triggerCallbackNow) { - // callback(error, state, calledInline) - callback(null, state, true); + callback(null, state, 0, true); + } + + Interlocked.Add(ref _numBytesPreCompleted, inputLength); + } + + private void SendBufferedData() + { + if (Interlocked.CompareExchange(ref pendingWriteBitFlag, 1, 0) == 0) + { + _thread.Post(so => so.DoWriteIfNeeded(), this); } } @@ -119,13 +159,13 @@ public void End(ProduceEndType endType) switch (endType) { case ProduceEndType.SocketShutdownSend: - Write(default(ArraySegment), (error, state, calledInline) => { }, null, + Write(default(ArraySegment), (error, state, status, calledInline) => { }, null, immediate: true, socketShutdownSend: true, socketDisconnect: false); break; case ProduceEndType.SocketDisconnect: - Write(default(ArraySegment), (error, state, calledInline) => { }, null, + Write(default(ArraySegment), (error, state, status, calledInline) => { }, null, immediate: true, socketShutdownSend: false, socketDisconnect: true); @@ -133,90 +173,22 @@ public void End(ProduceEndType endType) } } - private void ScheduleWrite() - { - _thread.Post(_this => _this.WriteAllPending(), this); - } - - // This is called on the libuv event loop - private void WriteAllPending() - { - WriteContext writingContext; - - lock (_lockObj) - { - if (_nextWriteContext != null) - { - writingContext = _nextWriteContext; - _nextWriteContext = null; - } - else - { - _writesPending--; - return; - } - } - - try - { - writingContext.DoWriteIfNeeded(); - } - catch - { - lock (_lockObj) - { - // Lock instead of using Interlocked.Decrement so _writesSending - // doesn't change in the middle of executing other synchronized code. - _writesPending--; - } - - throw; - } - } - // This is called on the libuv event loop - private void OnWriteCompleted(Queue> writtenBuffers, int status, Exception error) + private void OnWriteCompleted(int status, Exception error) { - _log.ConnectionWriteCallback(_connectionId, status); + _lastWriteError = error; + _lastStatus = status; - lock (_lockObj) + CallbackContext callbackContext; + while (_callbacksPending.TryPeek(out callbackContext) && callbackContext.BytesWrittenThreshold <= _bytesWritten) { - _lastWriteError = error; - - if (_nextWriteContext != null) - { - ScheduleWrite(); - } - else - { - _writesPending--; - } - - foreach (var writeBuffer in writtenBuffers) - { - // _numBytesPreCompleted can temporarily go negative in the event there are - // completed writes that we haven't triggered callbacks for yet. - _numBytesPreCompleted -= writeBuffer.Count; - } - + _log.ConnectionWriteCallback(_connectionId, status); - // bytesLeftToBuffer can be greater than _maxBytesPreCompleted - // This allows large writes to complete once they've actually finished. - var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; - while (_callbacksPending.Count > 0 && - _callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer) + if (_callbacksPending.TryDequeue(out callbackContext)) { - var callbackContext = _callbacksPending.Dequeue(); - - _numBytesPreCompleted += callbackContext.BytesToWrite; - - // callback(error, state, calledInline) - callbackContext.Callback(_lastWriteError, callbackContext.State, false); + //Callback(error, state, calledInline) + callbackContext.Callback(error, callbackContext.State, status, false); } - - // Now that the while loop has completed the following invariants should hold true: - Debug.Assert(_numBytesPreCompleted >= 0); - Debug.Assert(_numBytesPreCompleted <= _maxBytesPreCompleted); } } @@ -228,9 +200,10 @@ void ISocketOutput.Write(ArraySegment buffer, bool immediate) // to be a subsequent immediate==true call which will go down the following code-path Write( buffer, - (error, state, calledInline) => { }, + (error, state, status, calledInline) => { }, null, immediate: false); + return; } @@ -239,15 +212,16 @@ void ISocketOutput.Write(ArraySegment buffer, bool immediate) Write( buffer, - (error, state, calledInline) => + (error, state, status, calledInline) => { + var cs = (TaskCompletionSource)state; if (error != null) { - tcs.SetException(error); + cs.SetException(error); } else { - tcs.SetResult(0); + cs.SetResult(0); } }, tcs, @@ -267,9 +241,10 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell // to be a subsequent immediate==true call which will go down the following code-path Write( buffer, - (error, state, calledInline) => { }, + (error, state, status, calledInline) => { }, null, immediate: false); + return TaskUtilities.CompletedTask; } @@ -278,13 +253,28 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell Write( buffer, - (error, state, calledInline) => + (error, state2, status, calledInline) => { - if (!calledInline) + var tcs2 = (TaskCompletionSource)state2; + if (status < 0) { - ThreadPool.QueueUserWorkItem(state2 => + if (!calledInline) + { + ThreadPool.QueueUserWorkItem((state3) => + { + var tcs3 = (TaskCompletionSource)state3; + if (error != null) + { + tcs3.SetException(error); + } + else + { + tcs3.SetResult(0); + } + }, tcs2); + } + else { - var tcs2 = (TaskCompletionSource)state2; if (error != null) { tcs2.SetException(error); @@ -293,14 +283,17 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell { tcs2.SetResult(0); } - }, state); + } } else { - var tcs2 = (TaskCompletionSource)state; - if (error != null) + if (!calledInline) { - tcs2.SetException(error); + ThreadPool.QueueUserWorkItem((state3) => + { + var tcs3 = (TaskCompletionSource)state3; + tcs3.SetResult(0); + }, tcs2); } else { @@ -314,118 +307,183 @@ Task ISocketOutput.WriteAsync(ArraySegment buffer, bool immediate, Cancell return tcs.Task; } - private class CallbackContext - { - // callback(error, state, calledInline) - public Action Callback; - public object State; - public int BytesToWrite; - } - - private class WriteContext + private WriteContext GetContext() { - public SocketOutput Self; - - public Queue> Buffers; - public bool SocketShutdownSend; - public bool SocketDisconnect; - - public int WriteStatus; - public Exception WriteError; + MemoryPoolBlock2[] data = null; - public int ShutdownSendStatus; + var count = 0; + var dataLength = 0; - public WriteContext(SocketOutput self) - { - Self = self; - Buffers = new Queue>(); - } - - /// - /// Perform any actions needed by this work item. The individual tasks are non-blocking and - /// will continue through to each other in order. - /// - public void Execute() - { - DoWriteIfNeeded(); - } + bool socketDisconnect = false; + bool socketShutdownSend = false; - /// - /// First step: initiate async write if needed, otherwise go to next step - /// - public void DoWriteIfNeeded() + WriteBlock writeBlock; + while (_memoryBlocks.TryDequeue(out writeBlock)) { - if (Buffers.Count == 0 || Self._socket.IsClosed) + var block = writeBlock.Block; + if (block != null) { - DoShutdownIfNeeded(); - return; + if (data == null) + { + data = new MemoryPoolBlock2[UvWriteReq.BUFFER_COUNT]; + } + var length = block.End - block.Start; + data[count] = block; + dataLength += length; + count++; } - var buffers = new ArraySegment[Buffers.Count]; + socketDisconnect |= writeBlock.SocketDisconnect; + socketShutdownSend |= writeBlock.SocketShutdownSend; - var i = 0; - foreach (var buffer in Buffers) + if (count == UvWriteReq.BUFFER_COUNT) { - buffers[i++] = buffer; + break; } - - var writeReq = new UvWriteReq(Self._log); - writeReq.Init(Self._thread.Loop); - writeReq.Write(Self._socket, new ArraySegment>(buffers), (_writeReq, status, error, state) => - { - _writeReq.Dispose(); - var _this = (WriteContext)state; - _this.WriteStatus = status; - _this.WriteError = error; - DoShutdownIfNeeded(); - }, this); } - /// - /// Second step: initiate async shutdown if needed, otherwise go to next step - /// - public void DoShutdownIfNeeded() + if (count < UvWriteReq.BUFFER_COUNT) { - if (SocketShutdownSend == false || Self._socket.IsClosed) + var block = Interlocked.Exchange(ref _currentWriteBlock.Block, null); + + if (block != null) { - DoDisconnectIfNeeded(); - return; + if (data == null) + { + data = new MemoryPoolBlock2[UvWriteReq.BUFFER_COUNT]; + } + var length = block.End - block.Start; + data[count] = block; + dataLength += length; + count++; } + socketDisconnect |= _currentWriteBlock.SocketDisconnect; + socketShutdownSend |= _currentWriteBlock.SocketShutdownSend; + } - var shutdownReq = new UvShutdownReq(Self._log); - shutdownReq.Init(Self._thread.Loop); - shutdownReq.Shutdown(Self._socket, (_shutdownReq, status, state) => - { - _shutdownReq.Dispose(); - var _this = (WriteContext)state; - _this.ShutdownSendStatus = status; + Interlocked.Add(ref _bytesQueued, dataLength); - Self._log.ConnectionWroteFin(Self._connectionId, status); + return new WriteContext() + { + Data = new ArraySegment(data ?? _emptyBlocks, 0, count), + SocketDisconnect = socketDisconnect, + SocketShutdownSend = socketShutdownSend + }; + } - DoDisconnectIfNeeded(); - }, this); - } + /// + /// First step: initiate async write if needed, otherwise go to next step + /// + public void DoWriteIfNeeded() + { + pendingWriteBitFlag = 0; + Interlocked.MemoryBarrier(); - /// - /// Third step: disconnect socket if needed, otherwise this work item is complete - /// - public void DoDisconnectIfNeeded() + WriteContext context; + while (true) { - if (SocketDisconnect == false || Self._socket.IsClosed) + context = GetContext(); + var data = context.Data; + + if (data.Count == 0 || _socket.IsClosed) { - Complete(); + DoShutdownIfNeeded(context.SocketDisconnect, context.SocketShutdownSend, 0, null); return; } + + var writeReq = new UvWriteReq(_log); + writeReq.Init(_thread.Loop); + + writeReq.SocketDisconnect = context.SocketDisconnect; + writeReq.SocketShutdownSend = context.SocketShutdownSend; + writeReq.Write(_socket, + data, + (_writeReq, status, error, bytesWritten, state) => WriteCallback(_writeReq, status, error, bytesWritten, state), + this); + } + } + + private static void WriteCallback(UvWriteReq writeReq, int status, Exception error, int bytesWritten, object state) + { + var socketOutput = (SocketOutput)state; + writeReq.Dispose(); + + Interlocked.Add(ref socketOutput._bytesWritten, bytesWritten); + Interlocked.Add(ref socketOutput._numBytesPreCompleted, -bytesWritten); + + socketOutput.DoShutdownIfNeeded(writeReq.SocketDisconnect, writeReq.SocketShutdownSend, status, error); + } - Self._socket.Dispose(); - Self._log.ConnectionStop(Self._connectionId); - Complete(); + /// + /// Second step: initiate async shutdown if needed, otherwise go to next step + /// + private void DoShutdownIfNeeded(bool socketDisconnect, bool socketShutdownSend, int status, Exception error) + { + if (socketShutdownSend == false || SocketShutdownSent == true || _socket.IsClosed) + { + DoDisconnectIfNeeded(socketDisconnect, status, error); + return; } + SocketShutdownSent = true; + + var shutdownReq = new UvShutdownReq(_log); + shutdownReq.Init(_thread.Loop); + + shutdownReq.SocketDisconnect = socketDisconnect; + shutdownReq.SocketStatus = status; + shutdownReq.SocketException = error; + + shutdownReq.Shutdown(_socket, + (shutdownReq2, status2, state) => ShutdownCallback(shutdownReq2, status2, state), this); + } + + private static void ShutdownCallback(UvShutdownReq shutdownReq, int status, object state) + { + shutdownReq.Dispose(); + var socketOutput = (SocketOutput)state; + socketOutput.ShutdownSendStatus = status; + + socketOutput._log.ConnectionWroteFin(socketOutput._connectionId, status); - public void Complete() + socketOutput.DoDisconnectIfNeeded(shutdownReq.SocketDisconnect, shutdownReq.SocketStatus, shutdownReq.SocketException); + } + + /// + /// Third step: disconnect socket if needed, otherwise this work item is complete + /// + private void DoDisconnectIfNeeded(bool socketDisconnect, int status, Exception error) + { + if (socketDisconnect == false || _socket.IsClosed) { - Self.OnWriteCompleted(Buffers, WriteStatus, WriteError); + OnWriteCompleted(status, error); + return; } + + _socket.Dispose(); + _log.ConnectionStop(_connectionId); + OnWriteCompleted(status, error); + } + + private struct CallbackContext + { + //Callback(error, state, calledInline) + public Action Callback; + public object State; + public long BytesWrittenThreshold; + } + + private struct WriteContext + { + public ArraySegment Data; + public bool SocketShutdownSend; + public bool SocketDisconnect; + } + + private struct WriteBlock + { + public MemoryPoolBlock2 Block; + public bool SocketShutdownSend; + public bool SocketDisconnect; } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs index 2ee01c0d1..941f24f69 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs @@ -31,6 +31,8 @@ public class MemoryPool2 : IDisposable /// private const int _blockLength = _blockStride - _blockUnused; + public const int WritableBlockSize = _blockLength; + /// /// 4096 * 32 gives you a slabLength of 128k contiguous bytes allocated per slab /// diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs index d6aa2c65a..01dd489cb 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvShutdownReq.cs @@ -11,11 +11,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvShutdownReq : UvRequest { - private readonly static Libuv.uv_shutdown_cb _uv_shutdown_cb = UvShutdownCb; + private readonly static Libuv.uv_shutdown_cb _uv_shutdown_cb = (req, status) => UvShutdownCb(req, status); private Action _callback; private object _state; + public bool SocketDisconnect; + public int SocketStatus; + public Exception SocketException; + public UvShutdownReq(IKestrelTrace logger) : base (logger) { } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 103ddfc6e..28480683f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -14,15 +14,17 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking /// public class UvWriteReq : UvRequest { - private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb; + private readonly static Libuv.uv_write_cb _uv_write_cb = (ptr, status) => UvWriteCb(ptr, status); - private IntPtr _bufs; + private IntPtr _nativePointers; + private ArraySegment _segments; - private Action _callback; + private Action _callback; private object _state; - private const int BUFFER_COUNT = 4; - - private List _pins = new List(); + public const int BUFFER_COUNT = 8; + + public bool SocketShutdownSend; + public bool SocketDisconnect; public UvWriteReq(IKestrelTrace logger) : base(logger) { @@ -32,45 +34,37 @@ public void Init(UvLoopHandle loop) { var requestSize = loop.Libuv.req_size(Libuv.RequestType.WRITE); var bufferSize = Marshal.SizeOf() * BUFFER_COUNT; + CreateMemory( loop.Libuv, loop.ThreadId, requestSize + bufferSize); - _bufs = handle + requestSize; + _nativePointers = handle + requestSize; } public unsafe void Write( UvStreamHandle handle, - ArraySegment> bufs, - Action callback, + ArraySegment segments, + Action callback, object state) { try { + _segments = segments; // add GCHandle to keeps this SafeHandle alive while request processing - _pins.Add(GCHandle.Alloc(this, GCHandleType.Normal)); + Pin(); - var pBuffers = (Libuv.uv_buf_t*)_bufs; - var nBuffers = bufs.Count; - if (nBuffers > BUFFER_COUNT) - { - // create and pin buffer array when it's larger than the pre-allocated one - var bufArray = new Libuv.uv_buf_t[nBuffers]; - var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned); - _pins.Add(gcHandle); - pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject(); - } + var pBuffers = (Libuv.uv_buf_t*)_nativePointers; + var nBuffers = segments.Count; for (var index = 0; index < nBuffers; index++) { + var buf = segments.Array[segments.Offset + index]; + var len = buf.End - buf.Start; // create and pin each segment being written - var buf = bufs.Array[bufs.Offset + index]; - - var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned); - _pins.Add(gcHandle); pBuffers[index] = Libuv.buf_init( - gcHandle.AddrOfPinnedObject() + buf.Offset, - buf.Count); + buf.Pin() - len, + buf.End - buf.Start); } _callback = callback; @@ -81,44 +75,37 @@ public unsafe void Write( { _callback = null; _state = null; - Unpin(this); + Unpin(); + ProcessBlocks(); throw; } } public unsafe void Write2( UvStreamHandle handle, - ArraySegment> bufs, + ArraySegment segments, UvStreamHandle sendHandle, - Action callback, + Action callback, object state) { try { + _segments = segments; // add GCHandle to keeps this SafeHandle alive while request processing - _pins.Add(GCHandle.Alloc(this, GCHandleType.Normal)); + Pin(); - var pBuffers = (Libuv.uv_buf_t*)_bufs; - var nBuffers = bufs.Count; - if (nBuffers > BUFFER_COUNT) - { - // create and pin buffer array when it's larger than the pre-allocated one - var bufArray = new Libuv.uv_buf_t[nBuffers]; - var gcHandle = GCHandle.Alloc(bufArray, GCHandleType.Pinned); - _pins.Add(gcHandle); - pBuffers = (Libuv.uv_buf_t*)gcHandle.AddrOfPinnedObject(); - } + var pBuffers = (Libuv.uv_buf_t*)_nativePointers; + var nBuffers = segments.Count; for (var index = 0; index < nBuffers; index++) { - // create and pin each segment being written - var buf = bufs.Array[bufs.Offset + index]; + var buf = segments.Array[segments.Offset + index]; + var len = buf.End - buf.Start; - var gcHandle = GCHandle.Alloc(buf.Array, GCHandleType.Pinned); - _pins.Add(gcHandle); + // create and pin each segment being written pBuffers[index] = Libuv.buf_init( - gcHandle.AddrOfPinnedObject() + buf.Offset, - buf.Count); + buf.Pin() - len, + buf.End - buf.Start); } _callback = callback; @@ -129,24 +116,36 @@ public unsafe void Write2( { _callback = null; _state = null; - Unpin(this); + Unpin(); + ProcessBlocks(); throw; } } - private static void Unpin(UvWriteReq req) + private int ProcessBlocks() { - foreach (var pin in req._pins) + var bytesWritten = 0; + var end = _segments.Offset + _segments.Count; + for (var i = _segments.Offset; i < end; i++) { - pin.Free(); + var block = _segments.Array[i]; + bytesWritten += block.End - block.Start; + + block.Unpin(); + + if (block.Pool != null) + { + block.Pool.Return(block); + } } - req._pins.Clear(); + + return bytesWritten; } private static void UvWriteCb(IntPtr ptr, int status) { var req = FromIntPtr(ptr); - Unpin(req); + var bytesWritten = req.ProcessBlocks(); var callback = req._callback; req._callback = null; @@ -162,13 +161,17 @@ private static void UvWriteCb(IntPtr ptr, int status) try { - callback(req, status, error, state); + callback(req, status, error, bytesWritten, state); } catch (Exception ex) { req._log.LogError("UvWriteCb", ex); throw; } + finally + { + req.Unpin(); + } } } } \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs index 8293ad09b..b57940454 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/MultipleLoopTests.cs @@ -62,12 +62,16 @@ public void ServerPipeListenForConnections() return; } + var buffer = new byte[] { 1, 2, 3, 4 }; + var msg = MemoryPoolBlock2.Create(new ArraySegment(buffer), IntPtr.Zero, null, null); + msg.End = msg.Start + buffer.Length; + var writeRequest = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); writeRequest.Init(loop); writeRequest.Write( serverConnectionPipe, - new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), - (_3, status2, error2, _4) => + new ArraySegment(new[] { msg }), + (socket2, status2, error2, bytesWritten, state) => { writeRequest.Dispose(); serverConnectionPipe.Dispose(); @@ -156,13 +160,17 @@ public void ServerPipeDispatchConnections() serverConnectionPipeAcceptedEvent.WaitOne(); + var buffer = new byte[] { 1, 2, 3, 4 }; + var msg = MemoryPoolBlock2.Create(new ArraySegment(buffer), IntPtr.Zero, null, null); + msg.End = msg.Start + buffer.Length; + var writeRequest = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); writeRequest.Init(loop); writeRequest.Write2( serverConnectionPipe, - new ArraySegment>(new ArraySegment[] { new ArraySegment(new byte[] { 1, 2, 3, 4 }) }), + new ArraySegment(new[] { msg }), serverConnectionTcp, - (_3, status2, error2, _4) => + (socket2, status2, error2, bytesWritten, state) => { writeRequest.Dispose(); serverConnectionTcp.Dispose(); @@ -234,7 +242,7 @@ public void ServerPipeDispatchConnections() var cb = socket.Receive(new byte[64]); socket.Dispose(); } - catch(Exception ex) + catch (Exception ex) { Console.WriteLine(ex); } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs index fa8a3ada3..e670abc24 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/NetworkingTests.cs @@ -199,14 +199,16 @@ public async Task SocketCanReadAndWrite() { for (var x = 0; x < 2; x++) { + var buffer = new byte[] { 65, 66, 67, 68, 69 }; + var msg = MemoryPoolBlock2.Create(new ArraySegment(buffer), IntPtr.Zero, null, null); + msg.End = msg.Start + buffer.Length; + var req = new UvWriteReq(new KestrelTrace(new TestKestrelTrace())); req.Init(loop); req.Write( tcp2, - new ArraySegment>( - new[] { new ArraySegment(new byte[] { 65, 66, 67, 68, 69 }) } - ), - (_1, _2, _3, _4) => { }, + new ArraySegment(new[] { msg }), + (_1, _2, _3, _4, _5) => { }, null); } } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs index 8e0425ce5..3e1f2784b 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -16,7 +16,7 @@ namespace Microsoft.AspNet.Server.KestrelTests public class SocketOutputTests { [Fact] - public void CanWrite1MB() + public void CanWriteOver1MB() { // This test was added because when initially implementing write-behind buffering in // SocketOutput, the write callback would never be invoked for writes larger than @@ -39,24 +39,26 @@ public void CanWrite1MB() var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, 0, trace); - - // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. - var bufferSize = 1048576; - var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); - var completedWh = new ManualResetEventSlim(); - Action onCompleted = (ex, state, calledInline) => + using (var memory = new MemoryPool2()) { - Assert.Null(ex); - Assert.Null(state); - completedWh.Set(); - }; - - // Act - socketOutput.Write(buffer, onCompleted, null); - - // Assert - Assert.True(completedWh.Wait(1000)); + var socketOutput = new SocketOutput(memory, kestrelThread, socket, 0, trace); + + var bufferSize = 1048576 + SocketOutput.MaxBytesPreCompleted; + var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); + var completedWh = new ManualResetEventSlim(); + Action onCompleted = (ex, state, status, calledInline) => + { + Assert.Null(ex); + Assert.Null(state); + completedWh.Set(); + }; + + // Act + socketOutput.Write(buffer, onCompleted, null); + + // Assert + Assert.True(completedWh.Wait(1000)); + } } } @@ -84,35 +86,38 @@ public void WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyPreCompleted( var kestrelThread = kestrelEngine.Threads[0]; var socket = new MockSocket(kestrelThread.Loop.ThreadId, new TestKestrelTrace()); var trace = new KestrelTrace(new TestKestrelTrace()); - var socketOutput = new SocketOutput(kestrelThread, socket, 0, trace); - - var bufferSize = maxBytesPreCompleted; - var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); - var completedWh = new ManualResetEventSlim(); - Action onCompleted = (ex, state, calledInline) => + using (var memory = new MemoryPool2()) { - Assert.Null(ex); - Assert.Null(state); - completedWh.Set(); - }; - - // Act - socketOutput.Write(buffer, onCompleted, null); - // Assert - // The first write should pre-complete since it is <= _maxBytesPreCompleted. - Assert.True(completedWh.Wait(1000)); - // Arrange - completedWh.Reset(); - // Act - socketOutput.Write(buffer, onCompleted, null); - // Assert - // Too many bytes are already pre-completed for the second write to pre-complete. - Assert.False(completedWh.Wait(1000)); - // Act - completeQueue.Dequeue()(0); - // Assert - // Finishing the first write should allow the second write to pre-complete. - Assert.True(completedWh.Wait(1000)); + var socketOutput = new SocketOutput(memory, kestrelThread, socket, 0, trace); + + var bufferSize = maxBytesPreCompleted; + var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); + var completedWh = new ManualResetEventSlim(); + Action onCompleted = (ex, state, status, calledInline) => + { + Assert.Null(ex); + Assert.Null(state); + completedWh.Set(); + }; + + // Act + socketOutput.Write(buffer, onCompleted, null); + // Assert + // The first write should pre-complete since it is <= _maxBytesPreCompleted. + Assert.True(completedWh.Wait(200)); + // Arrange + completedWh.Reset(); + // Act + socketOutput.Write(buffer, onCompleted, null); + // Assert + // Too many bytes are already pre-completed for the second write to pre-complete. + Assert.False(completedWh.Wait(200)); + // Act + completeQueue.Dequeue()(0); + // Assert + // Finishing the first write should allow the second write to pre-complete. + Assert.True(completedWh.Wait(200)); + } } } From 1877ec9a6c27dc0d461e4826e6060d3078a50af9 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Nov 2015 11:18:25 +0000 Subject: [PATCH 2/6] Pool write requests --- .../Http/SocketOutput.cs | 7 ++- .../Infrastructure/KestrelThread.cs | 43 +++++++++++++++++++ .../Networking/UvWriteReq.cs | 7 +++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 5f88e6681..4403fd98f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -390,9 +390,8 @@ public void DoWriteIfNeeded() DoShutdownIfNeeded(context.SocketDisconnect, context.SocketShutdownSend, 0, null); return; } - - var writeReq = new UvWriteReq(_log); - writeReq.Init(_thread.Loop); + + var writeReq = _thread.LeaseWriteRequest(); writeReq.SocketDisconnect = context.SocketDisconnect; writeReq.SocketShutdownSend = context.SocketShutdownSend; @@ -406,7 +405,7 @@ public void DoWriteIfNeeded() private static void WriteCallback(UvWriteReq writeReq, int status, Exception error, int bytesWritten, object state) { var socketOutput = (SocketOutput)state; - writeReq.Dispose(); + socketOutput._thread.ReturnWriteRequest(writeReq); Interlocked.Add(ref socketOutput._bytesWritten, bytesWritten); Interlocked.Add(ref socketOutput._numBytesPreCompleted, -bytesWritten); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs index 85a4b5062..87941cfb5 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/KestrelThread.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Runtime.ExceptionServices; using System.Threading; @@ -18,6 +19,8 @@ namespace Microsoft.AspNet.Server.Kestrel /// public class KestrelThread { + private const int _maxPooledWriteRequests = 64; + private static Action _objectCallbackAdapter = (callback, state) => ((Action)callback).Invoke(state); private KestrelEngine _engine; private readonly IApplicationLifetime _appLifetime; @@ -28,6 +31,7 @@ public class KestrelThread private Queue _workRunning = new Queue(); private Queue _closeHandleAdding = new Queue(); private Queue _closeHandleRunning = new Queue(); + private ConcurrentQueue _writeRequestPool = new ConcurrentQueue(); private object _workSync = new Object(); private bool _stopImmediate = false; private bool _initCompleted = false; @@ -87,6 +91,18 @@ public void Stop(TimeSpan timeout) private void OnStop(object obj) { + if (_writeRequestPool != null) + { + var writeRequests = _writeRequestPool; + _writeRequestPool = null; + + UvWriteReq writeReq; + while (writeRequests.TryDequeue(out writeReq)) + { + writeReq.Dispose(); + } + } + _post.Unreference(); } @@ -314,6 +330,33 @@ private void DoPostCloseHandle() } } + public UvWriteReq LeaseWriteRequest() + { + UvWriteReq writeReq; + + var writeRequests = _writeRequestPool; + if (writeRequests == null || !writeRequests.TryDequeue(out writeReq)) + { + writeReq = new UvWriteReq(_log); + writeReq.Init(_loop); + } + + return writeReq; + } + + public void ReturnWriteRequest(UvWriteReq writeReq) + { + if ((_writeRequestPool?.Count ?? _maxPooledWriteRequests) < _maxPooledWriteRequests) + { + writeReq.Reset(); + _writeRequestPool.Enqueue(writeReq); + } + else + { + writeReq.Dispose(); + } + } + private struct Work { public Action CallbackAdapter; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs index 28480683f..4c16872d2 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs @@ -173,5 +173,12 @@ private static void UvWriteCb(IntPtr ptr, int status) req.Unpin(); } } + + public void Reset() + { + _callback = null; + _state = null; + _segments = default(ArraySegment); + } } } \ No newline at end of file From b0a27b04d5340fd494ed64790e3504ecea05a2f4 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Nov 2015 11:39:23 +0000 Subject: [PATCH 3/6] Precompute bytes + reduce allocs Precompute standard chunk bytes Write Ascii header data to scratch buffer Use SocketOutput's buffering rather than Memory and MemoryTextWriter Remove Memory and MemoryTextWriter --- .../Http/Frame.cs | 128 ++++++++++----- .../Http/MemoryPool.cs | 125 -------------- .../Http/MemoryPoolTextWriter.cs | 155 ------------------ .../ServiceContext.cs | 4 - .../FrameTests.cs | 4 +- .../TestInput.cs | 2 - 6 files changed, 93 insertions(+), 325 deletions(-) delete mode 100644 src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs delete mode 100644 src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index 36f1d9231..24c37c124 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -22,16 +22,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public partial class Frame : FrameContext, IFrameControl { private static readonly Encoding _ascii = Encoding.ASCII; - private static readonly ArraySegment _endChunkBytes = CreateAsciiByteArraySegment("\r\n"); + private static readonly ArraySegment _endLineBytes = CreateAsciiByteArraySegment("\r\n"); + private static readonly ArraySegment _endChunkBytes = _endLineBytes; + private static readonly ArraySegment _headerDelimiterBytes = CreateAsciiByteArraySegment(": "); + private static readonly ArraySegment _spaceBytes = CreateAsciiByteArraySegment(" "); private static readonly ArraySegment _endChunkedResponseBytes = CreateAsciiByteArraySegment("0\r\n\r\n"); private static readonly ArraySegment _continueBytes = CreateAsciiByteArraySegment("HTTP/1.1 100 Continue\r\n\r\n"); + private static readonly ArraySegment _contentLengthZeroBytes = CreateAsciiByteArraySegment("Content-Length: 0\r\n"); + private static readonly ArraySegment _transferEncodingChunkedBytes = CreateAsciiByteArraySegment("Transfer-Encoding: chunked\r\n"); + private static readonly ArraySegment _connectionCloseBytes = CreateAsciiByteArraySegment("Connection: close\r\n\r\n"); + private static readonly ArraySegment _connectionKeepAliveBytes = CreateAsciiByteArraySegment("Connection: keep-alive\r\n\r\n"); private static readonly ArraySegment _emptyData = new ArraySegment(new byte[0]); private static readonly byte[] _hex = Encoding.ASCII.GetBytes("0123456789abcdef"); - private readonly object _onStartingSync = new Object(); - private readonly object _onCompletedSync = new Object(); + private readonly object _onStartingSync = new object(); + private readonly object _onCompletedSync = new object(); private readonly FrameRequestHeaders _requestHeaders = new FrameRequestHeaders(); - private readonly byte[] _nullBuffer = new byte[4096]; + private readonly byte[] _scratchBuffer = new byte[4096]; private readonly FrameResponseHeaders _responseHeaders = new FrameResponseHeaders(); private List, object>> _onStarting; @@ -189,7 +196,7 @@ public async Task RequestProcessingAsync() } } - while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders)) + while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders, _scratchBuffer)) { terminated = SocketInput.RemoteIntakeFin; if (!terminated) @@ -217,6 +224,7 @@ public async Task RequestProcessingAsync() } finally { + var FlushTask = RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length); // Trigger OnStarting if it hasn't been called yet and the app hasn't // already failed. If an OnStarting callback throws we can go through // our normal error handling in ProduceEnd. @@ -232,9 +240,10 @@ public async Task RequestProcessingAsync() await ProduceEnd(); - while (await RequestBody.ReadAsync(_nullBuffer, 0, _nullBuffer.Length) != 0) + while (await FlushTask != 0) { // Finish reading the request body in case the app did not. + FlushTask = RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length); } } @@ -471,19 +480,14 @@ public async Task ProduceStartAndFireOnStarting(bool immediate = true) await ProduceStart(immediate, appCompleted: false); } - private async Task ProduceStart(bool immediate, bool appCompleted) + private Task ProduceStart(bool immediate, bool appCompleted) { - if (_responseStarted) return; + if (_responseStarted) return TaskUtilities.CompletedTask; _responseStarted = true; var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase); - var responseHeader = CreateResponseHeader(status, appCompleted); - - using (responseHeader.Item2) - { - await SocketOutput.WriteAsync(responseHeader.Item1, immediate: immediate); - } + return CreateResponseHeader(status, appCompleted, immediate); } private async Task ProduceEnd() @@ -521,16 +525,54 @@ private async Task ProduceEnd() } } - private Tuple, IDisposable> CreateResponseHeader( + ArraySegment ShortAsciiToBytes(string input) + { + + var scratch = _scratchBuffer; + var len = input.Length; + + var i = 0; + for (; i < scratch.Length; i++) + { + if (i >= len) + { + break; + } + scratch[i] = (byte)input[i]; + } + var buffer = new ArraySegment(scratch, 0, i); + return buffer; + } + bool LongAsciiToBytes(string input, int offset, out int newOffset, out ArraySegment buffer) + { + var scratch = _scratchBuffer; + var len = input.Length; + + newOffset = offset; + var i = 0; + for (; i < scratch.Length; i++) + { + if (newOffset >= len) + { + break; + } + scratch[i] = (byte)input[newOffset]; + newOffset++; + } + + buffer = new ArraySegment(scratch, 0, i); + return newOffset < len; + } + + private Task CreateResponseHeader( string status, - bool appCompleted) + bool appCompleted, + bool immediate) { - var writer = new MemoryPoolTextWriter(Memory); - writer.Write(HttpVersion); - writer.Write(' '); - writer.Write(status); - writer.Write('\r'); - writer.Write('\n'); + SocketOutput.Write(ShortAsciiToBytes(HttpVersion), immediate: false); + SocketOutput.Write(_spaceBytes, immediate: false); + SocketOutput.Write(ShortAsciiToBytes(status), immediate: false); + SocketOutput.Write(_endLineBytes, immediate: false); var hasConnection = false; var hasTransferEncoding = false; @@ -555,21 +597,33 @@ private Tuple, IDisposable> CreateResponseHeader( hasContentLength = true; } + ArraySegment buffer; + int inputOffset; foreach (var value in header.Value) { - writer.Write(header.Key); - writer.Write(':'); - writer.Write(' '); - writer.Write(value); - writer.Write('\r'); - writer.Write('\n'); + inputOffset = 0; + while (LongAsciiToBytes(header.Key, inputOffset, out inputOffset, out buffer)) + { + SocketOutput.Write(buffer, immediate: false); + } + SocketOutput.Write(buffer, immediate: false); + + SocketOutput.Write(_headerDelimiterBytes, immediate: false); + + inputOffset = 0; + while (LongAsciiToBytes(value, inputOffset, out inputOffset, out buffer)) + { + SocketOutput.Write(buffer, immediate: false); + } + SocketOutput.Write(buffer, immediate: false); + + SocketOutput.Write(_endLineBytes, immediate: false); if (isConnection && value.IndexOf("close", StringComparison.OrdinalIgnoreCase) != -1) { _keepAlive = false; } } - } if (_keepAlive && !hasTransferEncoding && !hasContentLength) @@ -582,7 +636,7 @@ private Tuple, IDisposable> CreateResponseHeader( { // Since the app has completed and we are only now generating // the headers we can safely set the Content-Length to 0. - writer.Write("Content-Length: 0\r\n"); + SocketOutput.Write(_contentLengthZeroBytes, immediate: false); } } else @@ -590,7 +644,7 @@ private Tuple, IDisposable> CreateResponseHeader( if (HttpVersion == "HTTP/1.1") { _autoChunk = true; - writer.Write("Transfer-Encoding: chunked\r\n"); + SocketOutput.Write(_transferEncodingChunkedBytes, immediate: false); } else { @@ -601,19 +655,17 @@ private Tuple, IDisposable> CreateResponseHeader( if (_keepAlive == false && hasConnection == false && HttpVersion == "HTTP/1.1") { - writer.Write("Connection: close\r\n\r\n"); + return SocketOutput.WriteAsync(_connectionCloseBytes, immediate: immediate); } else if (_keepAlive && hasConnection == false && HttpVersion == "HTTP/1.0") { - writer.Write("Connection: keep-alive\r\n\r\n"); + return SocketOutput.WriteAsync(_connectionKeepAliveBytes, immediate: immediate); } else { - writer.Write('\r'); - writer.Write('\n'); + return SocketOutput.WriteAsync(_endLineBytes, immediate: immediate); } - writer.Flush(); - return new Tuple, IDisposable>(writer.Buffer, writer); + } private bool TakeStartLine(SocketInput input) @@ -694,7 +746,7 @@ static string GetString(ArraySegment range, int startIndex, int endIndex) return Encoding.UTF8.GetString(range.Array, range.Offset + startIndex, endIndex - startIndex); } - public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders) + public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders, byte[] scratchBuffer) { var scan = input.ConsumingStart(); var consumed = scan; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs deleted file mode 100644 index 0fc5c390e..000000000 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPool.cs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; - -namespace Microsoft.AspNet.Server.Kestrel.Http -{ - public class MemoryPool : IMemoryPool - { - private static readonly byte[] EmptyArray = new byte[0]; - - private readonly Pool _pool1 = new Pool(); - private readonly Pool _pool2 = new Pool(); - private readonly Pool _pool3 = new Pool(); - - public byte[] Empty - { - get - { - return EmptyArray; - } - } - - public byte[] AllocByte(int minimumSize) - { - if (minimumSize == 0) - { - return EmptyArray; - } - if (minimumSize <= 1024) - { - return _pool1.Alloc(1024); - } - if (minimumSize <= 2048) - { - return _pool2.Alloc(2048); - } - return new byte[minimumSize]; - } - - public void FreeByte(byte[] memory) - { - if (memory == null) - { - return; - } - switch (memory.Length) - { - case 1024: - _pool1.Free(memory, 256); - break; - case 2048: - _pool2.Free(memory, 64); - break; - } - } - - public char[] AllocChar(int minimumSize) - { - if (minimumSize == 0) - { - return new char[0]; - } - if (minimumSize <= 128) - { - return _pool3.Alloc(128); - } - return new char[minimumSize]; - } - - public void FreeChar(char[] memory) - { - if (memory == null) - { - return; - } - switch (memory.Length) - { - case 128: - _pool3.Free(memory, 256); - break; - } - } - - public ArraySegment AllocSegment(int minimumSize) - { - return new ArraySegment(AllocByte(minimumSize)); - } - - public void FreeSegment(ArraySegment segment) - { - FreeByte(segment.Array); - } - - class Pool - { - private readonly Stack _stack = new Stack(); - private readonly object _sync = new object(); - - public T[] Alloc(int size) - { - lock (_sync) - { - if (_stack.Count != 0) - { - return _stack.Pop(); - } - } - return new T[size]; - } - - public void Free(T[] value, int limit) - { - lock (_sync) - { - if (_stack.Count < limit) - { - _stack.Push(value); - } - } - } - } - } -} diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs deleted file mode 100644 index 2f549dc15..000000000 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MemoryPoolTextWriter.cs +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -using System; -using System.IO; -using System.Text; - -namespace Microsoft.AspNet.Server.Kestrel.Http -{ - public class MemoryPoolTextWriter : TextWriter - { - private readonly IMemoryPool _memory; - - private char[] _textArray; - private int _textBegin; - private int _textEnd; - // ReSharper disable InconsistentNaming - private const int _textLength = 128; - // ReSharper restore InconsistentNaming - - private byte[] _dataArray; - private int _dataEnd; - - private readonly Encoder _encoder; - - public MemoryPoolTextWriter(IMemoryPool memory) - { - _memory = memory; - _textArray = _memory.AllocChar(_textLength); - _dataArray = _memory.Empty; - _encoder = Encoding.UTF8.GetEncoder(); - } - - public ArraySegment Buffer - { - get - { - return new ArraySegment(_dataArray, 0, _dataEnd); - } - } - - public override Encoding Encoding - { - get - { - return Encoding.UTF8; - } - } - - protected override void Dispose(bool disposing) - { - try - { - if (disposing) - { - if (_textArray != null) - { - _memory.FreeChar(_textArray); - _textArray = null; - } - if (_dataArray != null) - { - _memory.FreeByte(_dataArray); - _dataArray = null; - } - } - } - finally - { - base.Dispose(disposing); - } - } - - private void Encode(bool flush) - { - var bytesNeeded = _encoder.GetByteCount( - _textArray, - _textBegin, - _textEnd - _textBegin, - flush); - - Grow(bytesNeeded); - - var bytesUsed = _encoder.GetBytes( - _textArray, - _textBegin, - _textEnd - _textBegin, - _dataArray, - _dataEnd, - flush); - - _textBegin = _textEnd = 0; - _dataEnd += bytesUsed; - } - - private void Grow(int minimumAvailable) - { - if (_dataArray.Length - _dataEnd >= minimumAvailable) - { - return; - } - - var newLength = _dataArray.Length + Math.Max(_dataArray.Length, minimumAvailable); - var newArray = _memory.AllocByte(newLength); - Array.Copy(_dataArray, 0, newArray, 0, _dataEnd); - _memory.FreeByte(_dataArray); - _dataArray = newArray; - } - - public override void Write(char value) - { - if (_textLength == _textEnd) - { - Encode(false); - if (_textLength == _textEnd) - { - throw new InvalidOperationException("Unexplainable failure to encode text"); - } - } - - _textArray[_textEnd++] = value; - } - - public override void Write(string value) - { - var sourceIndex = 0; - var sourceLength = value.Length; - while (sourceIndex < sourceLength) - { - if (_textLength == _textEnd) - { - Encode(false); - } - - var count = sourceLength - sourceIndex; - if (count > _textLength - _textEnd) - { - count = _textLength - _textEnd; - } - - value.CopyTo(sourceIndex, _textArray, _textEnd, count); - sourceIndex += count; - _textEnd += count; - } - } - - public override void Flush() - { - while (_textBegin != _textEnd) - { - Encode(true); - } - } - } -} diff --git a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs index d3d1539a5..095f37f85 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/ServiceContext.cs @@ -13,13 +13,11 @@ public class ServiceContext { public ServiceContext() { - Memory = new MemoryPool(); } public ServiceContext(ServiceContext context) { AppLifetime = context.AppLifetime; - Memory = context.Memory; Log = context.Log; HttpContextFactory = context.HttpContextFactory; DateHeaderValueManager = context.DateHeaderValueManager; @@ -29,8 +27,6 @@ public ServiceContext(ServiceContext context) public IApplicationLifetime AppLifetime { get; set; } - public IMemoryPool Memory { get; set; } - public IKestrelTrace Log { get; set; } public IHttpContextFactory HttpContextFactory { get; set; } diff --git a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs index 8266a1c2e..b874b6499 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs @@ -56,7 +56,9 @@ public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders) Buffer.BlockCopy(headerArray, 0, inputBuffer.Data.Array, inputBuffer.Data.Offset, headerArray.Length); socketInput.IncomingComplete(headerArray.Length, null); - var success = Frame.TakeMessageHeaders(socketInput, headerCollection); + var scratchBuffer = new byte[200]; + + var success = Frame.TakeMessageHeaders(socketInput, headerCollection, scratchBuffer); Assert.True(success); Assert.Equal(numHeaders, headerCollection.Count()); diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs index 3db112560..11d18f7c4 100644 --- a/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs @@ -13,12 +13,10 @@ class TestInput : IConnectionControl, IFrameControl { public TestInput() { - var memory = new MemoryPool(); var memory2 = new MemoryPool2(); FrameContext = new FrameContext { SocketInput = new SocketInput(memory2), - Memory = memory, ConnectionControl = this, FrameControl = this }; From 2ca0751223f981bcf4c53de39ff319d2e624cac0 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Nov 2015 11:59:50 +0000 Subject: [PATCH 4/6] Don't allocate for regular header names --- .../Http/Frame.cs | 2 +- .../Infrastructure/MemoryPoolIterator2.cs | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index 24c37c124..747e86e5f 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -836,7 +836,7 @@ public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders req continue; } - var name = beginName.GetArraySegment(endName); + var name = beginName.GetArraySegment(scratchBuffer, endName); var value = beginValue.GetString(endValue); if (wrapping) { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs index 7de5c7fb1..616b2a702 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs @@ -566,7 +566,7 @@ public string GetString(MemoryPoolIterator2 end) } } - public ArraySegment GetArraySegment(MemoryPoolIterator2 end) + public ArraySegment GetArraySegment(byte[] scratchBuffer, MemoryPoolIterator2 end) { if (IsDefault || end.IsDefault) { @@ -578,9 +578,19 @@ public ArraySegment GetArraySegment(MemoryPoolIterator2 end) } var length = GetLength(end); - var array = new byte[length]; - CopyTo(array, 0, length, out length); - return new ArraySegment(array, 0, length); + byte[] buffer; + + if (length > scratchBuffer.Length) + { + buffer = new byte[length]; + } + else + { + buffer = scratchBuffer; + } + + CopyTo(buffer, 0, length, out length); + return new ArraySegment(buffer, 0, length); } public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int actual) From 3a7fa323a3fb59122033fe7ccbffae2c618b40cf Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 2 Nov 2015 12:39:24 +0000 Subject: [PATCH 5/6] Better work stealing, improved test --- .../Http/SocketOutput.cs | 25 ++++++++++++------- .../ResponseTests.cs | 5 +++- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 4403fd98f..da2bf550b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -68,15 +68,12 @@ internal void Write( if (inputLength > 0) { - memoryBlock = Interlocked.Exchange(ref _currentWriteBlock.Block, null); - - _log.ConnectionWrite(_connectionId, inputLength); - - int blockRemaining = memoryBlock != null ? memoryBlock.Data.Count - (memoryBlock.End - memoryBlock.Start) : 0; - var remaining = inputLength; var offset = buffer.Offset; + memoryBlock = Interlocked.Exchange(ref _currentWriteBlock.Block, null); + int blockRemaining = memoryBlock != null ? memoryBlock.Data.Count - (memoryBlock.End - memoryBlock.Start) : 0; + while (remaining > 0) { if (memoryBlock == null) @@ -90,6 +87,7 @@ internal void Write( remaining -= copyAmount; blockRemaining -= copyAmount; + memoryBlock.End += copyAmount; offset += copyAmount; @@ -101,7 +99,10 @@ internal void Write( } } - Interlocked.Exchange(ref _currentWriteBlock.Block, memoryBlock); + if (memoryBlock != null) + { + Interlocked.Exchange(ref _currentWriteBlock.Block, memoryBlock); + } } CallbackContext callbackContext; @@ -133,6 +134,7 @@ internal void Write( _currentWriteBlock.SocketDisconnect |= socketDisconnect; _currentWriteBlock.SocketShutdownSend |= socketShutdownSend; } + if (immediate || blockFilled) { SendBufferedData(); @@ -143,7 +145,11 @@ internal void Write( callback(null, state, 0, true); } - Interlocked.Add(ref _numBytesPreCompleted, inputLength); + if (inputLength > 0) + { + _log.ConnectionWrite(_connectionId, inputLength); + Interlocked.Add(ref _numBytesPreCompleted, inputLength); + } } private void SendBufferedData() @@ -327,12 +333,12 @@ private WriteContext GetContext() { data = new MemoryPoolBlock2[UvWriteReq.BUFFER_COUNT]; } + var length = block.End - block.Start; data[count] = block; dataLength += length; count++; } - socketDisconnect |= writeBlock.SocketDisconnect; socketShutdownSend |= writeBlock.SocketShutdownSend; @@ -344,6 +350,7 @@ private WriteContext GetContext() if (count < UvWriteReq.BUFFER_COUNT) { + // Steal incomplete block var block = Interlocked.Exchange(ref _currentWriteBlock.Block, null); if (block != null) diff --git a/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs b/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs index 245729d94..7a63509b8 100644 --- a/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs +++ b/test/Microsoft.AspNet.Server.Kestrel.FunctionalTests/ResponseTests.cs @@ -29,7 +29,8 @@ public async Task LargeDownload() { app.Run(async context => { - var bytes = new byte[1024]; + // Larger than MemoryPoolBlock2.WritableBlockSize, but power of 2 for verify + var bytes = new byte[8192]; for (int i = 0; i < bytes.Length; i++) { bytes[i] = (byte)i; @@ -65,6 +66,8 @@ public async Task LargeDownload() } count = await responseBody.ReadAsync(bytes, 0, bytes.Length); } + + Assert.Equal(8192 * 1024, total); } } } From 1705ae0d322d104a762b1cc436046db969b5db0f Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 2 Nov 2015 15:32:37 +0000 Subject: [PATCH 6/6] Lower latency response Post work ready at end of headers New buffering and buffer stealing allows more data to be added before libuv wakes up and puts on wire --- .../Http/Frame.FeatureCollection.cs | 2 +- .../Http/Frame.cs | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.FeatureCollection.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.FeatureCollection.cs index 1a4efb32f..48ecf8868 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.FeatureCollection.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.FeatureCollection.cs @@ -290,7 +290,7 @@ Task IHttpUpgradeFeature.UpgradeAsync() ResponseHeaders["Upgrade"] = values; } } - ProduceStartAndFireOnStarting(immediate: true).GetAwaiter().GetResult(); + ProduceStartAndFireOnStarting().GetAwaiter().GetResult(); return Task.FromResult(DuplexStream); } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index 747e86e5f..09ea90b4a 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -351,19 +351,19 @@ private async Task FireOnCompleted() public void Flush() { - ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult(); + ProduceStartAndFireOnStarting().GetAwaiter().GetResult(); SocketOutput.Write(_emptyData, immediate: true); } public async Task FlushAsync(CancellationToken cancellationToken) { - await ProduceStartAndFireOnStarting(immediate: false); + await ProduceStartAndFireOnStarting(); await SocketOutput.WriteAsync(_emptyData, immediate: true); } public void Write(ArraySegment data) { - ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult(); + ProduceStartAndFireOnStarting().GetAwaiter().GetResult(); if (_autoChunk) { @@ -381,7 +381,7 @@ public void Write(ArraySegment data) public async Task WriteAsync(ArraySegment data, CancellationToken cancellationToken) { - await ProduceStartAndFireOnStarting(immediate: false); + await ProduceStartAndFireOnStarting(); if (_autoChunk) { @@ -464,7 +464,7 @@ public void ProduceContinue() } } - public async Task ProduceStartAndFireOnStarting(bool immediate = true) + public async Task ProduceStartAndFireOnStarting() { if (_responseStarted) return; @@ -477,17 +477,17 @@ public async Task ProduceStartAndFireOnStarting(bool immediate = true) _applicationException); } - await ProduceStart(immediate, appCompleted: false); + await ProduceStart(appCompleted: false); } - private Task ProduceStart(bool immediate, bool appCompleted) + private Task ProduceStart(bool appCompleted) { if (_responseStarted) return TaskUtilities.CompletedTask; _responseStarted = true; var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase); - return CreateResponseHeader(status, appCompleted, immediate); + return CreateResponseHeader(status, appCompleted); } private async Task ProduceEnd() @@ -510,7 +510,7 @@ private async Task ProduceEnd() } } - await ProduceStart(immediate: true, appCompleted: true); + await ProduceStart(appCompleted: true); // _autoChunk should be checked after we are sure ProduceStart() has been called // since ProduceStart() may set _autoChunk to true. @@ -566,8 +566,7 @@ bool LongAsciiToBytes(string input, int offset, out int newOffset, out ArraySegm private Task CreateResponseHeader( string status, - bool appCompleted, - bool immediate) + bool appCompleted) { SocketOutput.Write(ShortAsciiToBytes(HttpVersion), immediate: false); SocketOutput.Write(_spaceBytes, immediate: false); @@ -655,15 +654,15 @@ private Task CreateResponseHeader( if (_keepAlive == false && hasConnection == false && HttpVersion == "HTTP/1.1") { - return SocketOutput.WriteAsync(_connectionCloseBytes, immediate: immediate); + return SocketOutput.WriteAsync(_connectionCloseBytes, immediate: true); } else if (_keepAlive && hasConnection == false && HttpVersion == "HTTP/1.0") { - return SocketOutput.WriteAsync(_connectionKeepAliveBytes, immediate: immediate); + return SocketOutput.WriteAsync(_connectionKeepAliveBytes, immediate: true); } else { - return SocketOutput.WriteAsync(_endLineBytes, immediate: immediate); + return SocketOutput.WriteAsync(_endLineBytes, immediate: true); } }