Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 126 additions & 33 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public partial class Frame : FrameContext, IFrameControl
private Task _requestProcessingTask;
private volatile bool _requestProcessingStopping; // volatile, see: https://msdn.microsoft.com/en-us/library/x13ttww7.aspx
private volatile bool _requestAborted;
private CancellationTokenSource _disconnectCts = new CancellationTokenSource();
private CancellationTokenSource _requestAbortCts;
private CancellationTokenSource _abortedCts;
private CancellationToken? _manuallySetRequestAbortToken;

private FrameRequestStream _requestBody;
private FrameResponseStream _responseBody;
Expand All @@ -70,6 +70,8 @@ public partial class Frame : FrameContext, IFrameControl
private readonly IPEndPoint _remoteEndPoint;
private readonly Action<IFeatureCollection> _prepareRequest;

private readonly StringPool _stringPool = new StringPool();

public Frame(ConnectionContext context)
: this(context, remoteEndPoint: null, localEndPoint: null, prepareRequest: null)
{
Expand Down Expand Up @@ -135,8 +137,47 @@ public string HttpVersion

public Stream DuplexStream { get; set; }

public CancellationToken RequestAborted { get; set; }
public CancellationToken RequestAborted
{
get
{
// If a request abort token was previously explicitly set, return it.
if (_manuallySetRequestAbortToken.HasValue)
return _manuallySetRequestAbortToken.Value;

// Otherwise, get the abort CTS. If we have one, which would mean that someone previously
// asked for the RequestAborted token, simply return its token. If we don't,
// check to see whether we've already aborted, in which case just return an
// already canceled token. Finally, force a source into existence if we still
// don't have one, and return its token.
var cts = _abortedCts;
return
cts != null ? cts.Token :
_requestAborted ? new CancellationToken(true) :
RequestAbortedSource.Token;
}
set
{
// Set an abort token, overriding one we create internally. This setter and associated
// field exist purely to support IHttpRequestLifetimeFeature.set_RequestAborted.
_manuallySetRequestAbortToken = value;
}
}

private CancellationTokenSource RequestAbortedSource
{
get
{
// Get the abort token, lazily-initializing it if necessary.
// Make sure it's canceled if an abort request already came in.
var cts = LazyInitializer.EnsureInitialized(ref _abortedCts, () => new CancellationTokenSource());
if (_requestAborted)
{
cts.Cancel();
}
return cts;
}
}
public bool HasResponseStarted
{
get { return _responseStarted; }
Expand Down Expand Up @@ -188,7 +229,8 @@ public void Reset()

_prepareRequest?.Invoke(this);

_requestAbortCts?.Dispose();
_manuallySetRequestAbortToken = null;
_abortedCts = null;
}

public void ResetResponseHeaders()
Expand Down Expand Up @@ -244,13 +286,16 @@ public void Abort()
{
ConnectionControl.End(ProduceEndType.SocketDisconnect);
SocketInput.AbortAwaiting();

_disconnectCts.Cancel();
RequestAbortedSource.Cancel();
}
catch (Exception ex)
{
Log.LogError("Abort", ex);
}
finally
{
_abortedCts = null;
}
}

/// <summary>
Expand All @@ -263,28 +308,17 @@ public async Task RequestProcessingAsync()
{
try
{
var terminated = false;
while (!terminated && !_requestProcessingStopping)
while (!_requestProcessingStopping)
{
while (!terminated && !_requestProcessingStopping && !TakeStartLine(SocketInput))
{
terminated = SocketInput.RemoteIntakeFin;
if (!terminated)
{
await SocketInput;
}
}
_stringPool.MarkStart();

while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders))
if (!await ReadStartLineAsync() ||
!await ReadHeadersAsync())
{
terminated = SocketInput.RemoteIntakeFin;
if (!terminated)
{
await SocketInput;
}
break;
}

if (!terminated && !_requestProcessingStopping)
if (!_requestProcessingStopping)
{
var messageBody = MessageBody.For(HttpVersion, _requestHeaders, this);
_keepAlive = messageBody.RequestKeepAlive;
Expand All @@ -294,8 +328,8 @@ public async Task RequestProcessingAsync()
ResponseBody = _responseBody;
DuplexStream = new FrameDuplexStream(RequestBody, ResponseBody);

_requestAbortCts = CancellationTokenSource.CreateLinkedTokenSource(_disconnectCts.Token);
RequestAborted = _requestAbortCts.Token;
_abortedCts = null;
_manuallySetRequestAbortToken = null;

var httpContext = HttpContextFactory.Create(this);
try
Expand Down Expand Up @@ -337,7 +371,10 @@ public async Task RequestProcessingAsync()
_responseBody.StopAcceptingWrites();
}

terminated = !_keepAlive;
if (!_keepAlive)
{
break;
}
}

Reset();
Expand All @@ -351,7 +388,7 @@ public async Task RequestProcessingAsync()
{
try
{
_disconnectCts.Dispose();
_abortedCts = null;

// If _requestAborted is set, the connection has already been closed.
if (!_requestAborted)
Expand All @@ -372,6 +409,62 @@ public async Task RequestProcessingAsync()
}
}
}
private Task<bool> ReadStartLineAsync()
{
if (!_requestProcessingStopping && !TakeStartLine(SocketInput))
{
if (SocketInput.RemoteIntakeFin)
{
return TaskUtilities.CompletedFalseTask;
};
return ReadStartLineAwaitAsync();
}
return TaskUtilities.CompletedTrueTask;
}

private async Task<bool> ReadStartLineAwaitAsync()
{
await SocketInput;

while (!_requestProcessingStopping && !TakeStartLine(SocketInput))
{
if (SocketInput.RemoteIntakeFin)
{
return false;
};

await SocketInput;
}
return true;
}

private Task<bool> ReadHeadersAsync()
{
if (!_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders, _stringPool))
{
if (SocketInput.RemoteIntakeFin)
{
return TaskUtilities.CompletedFalseTask;
};
return ReadHeadersAwaitAsync();
}
return TaskUtilities.CompletedTrueTask;
}

private async Task<bool> ReadHeadersAwaitAsync()
{
await SocketInput;

while (!_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders, _stringPool))
{
if (SocketInput.RemoteIntakeFin)
{
return false;
};
await SocketInput;
}
return true;
}

public void OnStarting(Func<object, Task> callback, object state)
{
Expand Down Expand Up @@ -704,7 +797,7 @@ private bool TakeStartLine(SocketInput input)
{
return false;
}
var method = begin.GetAsciiString(scan);
var method = begin.GetAsciiString(scan, _stringPool);

scan.Take();
begin = scan;
Expand All @@ -728,7 +821,7 @@ private bool TakeStartLine(SocketInput input)
{
return false;
}
queryString = begin.GetAsciiString(scan);
queryString = begin.GetAsciiString(scan, _stringPool);
}

scan.Take();
Expand All @@ -737,7 +830,7 @@ private bool TakeStartLine(SocketInput input)
{
return false;
}
var httpVersion = begin.GetAsciiString(scan);
var httpVersion = begin.GetAsciiString(scan, _stringPool);

scan.Take();
if (scan.Take() != '\n')
Expand All @@ -758,7 +851,7 @@ private bool TakeStartLine(SocketInput input)
else
{
// URI wasn't encoded, parse as ASCII
requestUrlPath = pathBegin.GetAsciiString(pathEnd);
requestUrlPath = pathBegin.GetAsciiString(pathEnd, _stringPool);
}

consumed = scan;
Expand All @@ -775,7 +868,7 @@ private bool TakeStartLine(SocketInput input)
}
}

public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders)
public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders, StringPool stringPool)
{
var scan = input.ConsumingStart();
var consumed = scan;
Expand Down Expand Up @@ -866,7 +959,7 @@ public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders req
}

var name = beginName.GetArraySegment(endName);
var value = beginValue.GetAsciiString(endValue);
var value = beginValue.GetAsciiString(endValue, stringPool);
if (wrapping)
{
value = value.Replace("\r\n", " ");
Expand Down
Loading