Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Lockless socket output, lower allocs #315

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context

_connectionId = Interlocked.Increment(ref _lastConnectionId);

_rawSocketInput = new SocketInput(Memory2);
_rawSocketOutput = new SocketOutput(Thread, _socket, _connectionId, Log);
_rawSocketInput = new SocketInput(InputMemory);
_rawSocketOutput = new SocketOutput(OutputMemory, Thread, _socket, _connectionId, Log);
}

public void Start()
Expand Down Expand Up @@ -100,7 +100,7 @@ public void Start()

private void ApplyConnectionFilter()
{
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log);
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, InputMemory, Log);

SocketInput = filteredStreamAdapter.SocketInput;
SocketOutput = filteredStreamAdapter.SocketOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ Task<Stream> IHttpUpgradeFeature.UpgradeAsync()
ResponseHeaders["Upgrade"] = values;
}
}
ProduceStartAndFireOnStarting(immediate: true).GetAwaiter().GetResult();
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();
return Task.FromResult(DuplexStream);
}

Expand Down
141 changes: 96 additions & 45 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public partial class Frame : FrameContext, IFrameControl
{
private static readonly Encoding _ascii = Encoding.ASCII;
private static readonly ArraySegment<byte> _endChunkBytes = CreateAsciiByteArraySegment("\r\n");
private static readonly ArraySegment<byte> _endLineBytes = CreateAsciiByteArraySegment("\r\n");
private static readonly ArraySegment<byte> _endChunkBytes = _endLineBytes;
private static readonly ArraySegment<byte> _headerDelimiterBytes = CreateAsciiByteArraySegment(": ");
private static readonly ArraySegment<byte> _spaceBytes = CreateAsciiByteArraySegment(" ");
private static readonly ArraySegment<byte> _endChunkedResponseBytes = CreateAsciiByteArraySegment("0\r\n\r\n");
private static readonly ArraySegment<byte> _continueBytes = CreateAsciiByteArraySegment("HTTP/1.1 100 Continue\r\n\r\n");
private static readonly ArraySegment<byte> _contentLengthZeroBytes = CreateAsciiByteArraySegment("Content-Length: 0\r\n");
private static readonly ArraySegment<byte> _transferEncodingChunkedBytes = CreateAsciiByteArraySegment("Transfer-Encoding: chunked\r\n");
private static readonly ArraySegment<byte> _connectionCloseBytes = CreateAsciiByteArraySegment("Connection: close\r\n\r\n");
private static readonly ArraySegment<byte> _connectionKeepAliveBytes = CreateAsciiByteArraySegment("Connection: keep-alive\r\n\r\n");
private static readonly ArraySegment<byte> _emptyData = new ArraySegment<byte>(new byte[0]);
private static readonly byte[] _hex = Encoding.ASCII.GetBytes("0123456789abcdef");

private readonly object _onStartingSync = new Object();
private readonly object _onCompletedSync = new Object();
private readonly object _onStartingSync = new object();
private readonly object _onCompletedSync = new object();
private readonly FrameRequestHeaders _requestHeaders = new FrameRequestHeaders();
private readonly byte[] _nullBuffer = new byte[4096];
private readonly byte[] _scratchBuffer = new byte[4096];
private readonly FrameResponseHeaders _responseHeaders = new FrameResponseHeaders();

private List<KeyValuePair<Func<object, Task>, object>> _onStarting;
Expand Down Expand Up @@ -189,7 +196,7 @@ public async Task RequestProcessingAsync()
}
}

while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders))
while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders, _scratchBuffer))
{
terminated = SocketInput.RemoteIntakeFin;
if (!terminated)
Expand Down Expand Up @@ -217,6 +224,7 @@ public async Task RequestProcessingAsync()
}
finally
{
var FlushTask = RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length);
// Trigger OnStarting if it hasn't been called yet and the app hasn't
// already failed. If an OnStarting callback throws we can go through
// our normal error handling in ProduceEnd.
Expand All @@ -232,9 +240,10 @@ public async Task RequestProcessingAsync()

await ProduceEnd();

while (await RequestBody.ReadAsync(_nullBuffer, 0, _nullBuffer.Length) != 0)
while (await FlushTask != 0)
{
// Finish reading the request body in case the app did not.
FlushTask = RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length);
}
}

Expand Down Expand Up @@ -342,19 +351,19 @@ private async Task FireOnCompleted()

public void Flush()
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();
SocketOutput.Write(_emptyData, immediate: true);
}

public async Task FlushAsync(CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await ProduceStartAndFireOnStarting();
await SocketOutput.WriteAsync(_emptyData, immediate: true);
}

public void Write(ArraySegment<byte> data)
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();

if (_autoChunk)
{
Expand All @@ -372,7 +381,7 @@ public void Write(ArraySegment<byte> data)

public async Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await ProduceStartAndFireOnStarting();

if (_autoChunk)
{
Expand Down Expand Up @@ -455,7 +464,7 @@ public void ProduceContinue()
}
}

public async Task ProduceStartAndFireOnStarting(bool immediate = true)
public async Task ProduceStartAndFireOnStarting()
{
if (_responseStarted) return;

Expand All @@ -468,22 +477,17 @@ public async Task ProduceStartAndFireOnStarting(bool immediate = true)
_applicationException);
}

await ProduceStart(immediate, appCompleted: false);
await ProduceStart(appCompleted: false);
}

private async Task ProduceStart(bool immediate, bool appCompleted)
private Task ProduceStart(bool appCompleted)
{
if (_responseStarted) return;
if (_responseStarted) return TaskUtilities.CompletedTask;
_responseStarted = true;

var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase);

var responseHeader = CreateResponseHeader(status, appCompleted);

using (responseHeader.Item2)
{
await SocketOutput.WriteAsync(responseHeader.Item1, immediate: immediate);
}
return CreateResponseHeader(status, appCompleted);
}

private async Task ProduceEnd()
Expand All @@ -506,7 +510,7 @@ private async Task ProduceEnd()
}
}

await ProduceStart(immediate: true, appCompleted: true);
await ProduceStart(appCompleted: true);

// _autoChunk should be checked after we are sure ProduceStart() has been called
// since ProduceStart() may set _autoChunk to true.
Expand All @@ -521,16 +525,53 @@ private async Task ProduceEnd()
}
}

private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
ArraySegment<byte> ShortAsciiToBytes(string input)
{

var scratch = _scratchBuffer;
var len = input.Length;

var i = 0;
for (; i < scratch.Length; i++)
{
if (i >= len)
{
break;
}
scratch[i] = (byte)input[i];
}
var buffer = new ArraySegment<byte>(scratch, 0, i);
return buffer;
}
bool LongAsciiToBytes(string input, int offset, out int newOffset, out ArraySegment<byte> buffer)
{
var scratch = _scratchBuffer;
var len = input.Length;

newOffset = offset;
var i = 0;
for (; i < scratch.Length; i++)
{
if (newOffset >= len)
{
break;
}
scratch[i] = (byte)input[newOffset];
newOffset++;
}

buffer = new ArraySegment<byte>(scratch, 0, i);
return newOffset < len;
}

private Task CreateResponseHeader(
string status,
bool appCompleted)
{
var writer = new MemoryPoolTextWriter(Memory);
writer.Write(HttpVersion);
writer.Write(' ');
writer.Write(status);
writer.Write('\r');
writer.Write('\n');
SocketOutput.Write(ShortAsciiToBytes(HttpVersion), immediate: false);
SocketOutput.Write(_spaceBytes, immediate: false);
SocketOutput.Write(ShortAsciiToBytes(status), immediate: false);
SocketOutput.Write(_endLineBytes, immediate: false);

var hasConnection = false;
var hasTransferEncoding = false;
Expand All @@ -555,21 +596,33 @@ private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
hasContentLength = true;
}

ArraySegment<byte> buffer;
int inputOffset;
foreach (var value in header.Value)
{
writer.Write(header.Key);
writer.Write(':');
writer.Write(' ');
writer.Write(value);
writer.Write('\r');
writer.Write('\n');
inputOffset = 0;
while (LongAsciiToBytes(header.Key, inputOffset, out inputOffset, out buffer))
{
SocketOutput.Write(buffer, immediate: false);
}
SocketOutput.Write(buffer, immediate: false);

SocketOutput.Write(_headerDelimiterBytes, immediate: false);

inputOffset = 0;
while (LongAsciiToBytes(value, inputOffset, out inputOffset, out buffer))
{
SocketOutput.Write(buffer, immediate: false);
}
SocketOutput.Write(buffer, immediate: false);

SocketOutput.Write(_endLineBytes, immediate: false);

if (isConnection && value.IndexOf("close", StringComparison.OrdinalIgnoreCase) != -1)
{
_keepAlive = false;
}
}

}

if (_keepAlive && !hasTransferEncoding && !hasContentLength)
Expand All @@ -582,15 +635,15 @@ private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
{
// Since the app has completed and we are only now generating
// the headers we can safely set the Content-Length to 0.
writer.Write("Content-Length: 0\r\n");
SocketOutput.Write(_contentLengthZeroBytes, immediate: false);
}
}
else
{
if (HttpVersion == "HTTP/1.1")
{
_autoChunk = true;
writer.Write("Transfer-Encoding: chunked\r\n");
SocketOutput.Write(_transferEncodingChunkedBytes, immediate: false);
}
else
{
Expand All @@ -601,19 +654,17 @@ private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(

if (_keepAlive == false && hasConnection == false && HttpVersion == "HTTP/1.1")
{
writer.Write("Connection: close\r\n\r\n");
return SocketOutput.WriteAsync(_connectionCloseBytes, immediate: true);
}
else if (_keepAlive && hasConnection == false && HttpVersion == "HTTP/1.0")
{
writer.Write("Connection: keep-alive\r\n\r\n");
return SocketOutput.WriteAsync(_connectionKeepAliveBytes, immediate: true);
}
else
{
writer.Write('\r');
writer.Write('\n');
return SocketOutput.WriteAsync(_endLineBytes, immediate: true);
}
writer.Flush();
return new Tuple<ArraySegment<byte>, IDisposable>(writer.Buffer, writer);

}

private bool TakeStartLine(SocketInput input)
Expand Down Expand Up @@ -694,7 +745,7 @@ static string GetString(ArraySegment<byte> range, int startIndex, int endIndex)
return Encoding.UTF8.GetString(range.Array, range.Offset + startIndex, endIndex - startIndex);
}

public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders)
public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders, byte[] scratchBuffer)
{
var scan = input.ConsumingStart();
var consumed = scan;
Expand Down Expand Up @@ -784,7 +835,7 @@ public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders req
continue;
}

var name = beginName.GetArraySegment(endName);
var name = beginName.GetArraySegment(scratchBuffer, endName);
var value = beginValue.GetString(endValue);
if (wrapping)
{
Expand Down
13 changes: 9 additions & 4 deletions src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ public class ListenerContext : ServiceContext
{
public ListenerContext()
{
Memory2 = new MemoryPool2();
InputMemory = new MemoryPool2();
OutputMemory = new MemoryPool2();
}

public ListenerContext(ServiceContext serviceContext)
: base(serviceContext)
{
Memory2 = new MemoryPool2();
InputMemory = new MemoryPool2();
OutputMemory = new MemoryPool2();
}

public ListenerContext(ListenerContext listenerContext)
Expand All @@ -25,7 +27,8 @@ public ListenerContext(ListenerContext listenerContext)
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Application = listenerContext.Application;
Memory2 = listenerContext.Memory2;
InputMemory = listenerContext.InputMemory;
OutputMemory = listenerContext.OutputMemory;
Log = listenerContext.Log;
}

Expand All @@ -35,6 +38,8 @@ public ListenerContext(ListenerContext listenerContext)

public RequestDelegate Application { get; set; }

public MemoryPool2 Memory2 { get; set; }
public MemoryPool2 InputMemory { get; set; }

public MemoryPool2 OutputMemory { get; set; }
}
}
9 changes: 6 additions & 3 deletions src/Microsoft.AspNet.Server.Kestrel/Http/ListenerPrimary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract public class ListenerPrimary : Listener

// this message is passed to write2 because it must be non-zero-length,
// but it has no other functional significance
private readonly ArraySegment<ArraySegment<byte>> _dummyMessage = new ArraySegment<ArraySegment<byte>>(new[] { new ArraySegment<byte>(new byte[] { 1, 2, 3, 4 }) });
private readonly byte[] _dummyBuffer = { 1, 2, 3, 4 };

protected ListenerPrimary(ServiceContext serviceContext) : base(serviceContext)
{
Expand Down Expand Up @@ -80,14 +80,17 @@ protected override void DispatchConnection(UvStreamHandle socket)
}
else
{
var msg = MemoryPoolBlock2.Create(new ArraySegment<byte>(_dummyBuffer), IntPtr.Zero, null, null);
msg.End = msg.Start + _dummyBuffer.Length;

var dispatchPipe = _dispatchPipes[index];
var write = new UvWriteReq(Log);
write.Init(Thread.Loop);
write.Write2(
dispatchPipe,
_dummyMessage,
new ArraySegment<MemoryPoolBlock2>(new[] { msg }),
socket,
(write2, status, error, state) =>
(write2, status, error, bytesWritten, state) =>
{
write2.Dispose();
((UvStreamHandle)state).Dispose();
Expand Down
Loading