Skip to content

Use examined rather than consumed for content length of body. #8223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
public class Http1ContentLengthMessageBody : Http1MessageBody
{
private ReadResult _readResult;
private readonly long _contentLength;
private long _inputLength;
private ReadResult _readResult;
private bool _readCompleted;
private bool _completed;
private bool _isReading;
private int _userCanceled;
private long _totalExaminedInPreviousReadResult;
private bool _finalAdvanceCalled;

public Http1ContentLengthMessageBody(bool keepAlive, long contentLength, Http1Connection context)
: base(context)
Expand All @@ -29,9 +33,14 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
{
ThrowIfCompleted();

if (_inputLength == 0)
if (_isReading)
{
throw new InvalidOperationException("Reading is already in progress.");
}

if (_readCompleted)
{
_readResult = new ReadResult(default, isCanceled: false, isCompleted: true);
_isReading = true;
return _readResult;
}

Expand All @@ -53,6 +62,8 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
try
{
var readAwaitable = _context.Input.ReadAsync(cancellationToken);

_isReading = true;
_readResult = await StartTimingReadAsync(readAwaitable, cancellationToken);
}
catch (ConnectionAbortedException ex)
Expand Down Expand Up @@ -102,9 +113,15 @@ public override bool TryRead(out ReadResult readResult)
{
ThrowIfCompleted();

if (_inputLength == 0)
if (_isReading)
{
readResult = new ReadResult(default, isCanceled: false, isCompleted: true);
throw new InvalidOperationException("Reading is already in progress.");
}

if (_readCompleted)
{
_isReading = true;
readResult = _readResult;
return true;
}

Expand All @@ -126,13 +143,28 @@ public override bool TryRead(out ReadResult readResult)
}
}

// Only set _isReading if we are returing true.
_isReading = true;

CreateReadResultFromConnectionReadResult();

readResult = _readResult;

return true;
}

public override Task ConsumeAsync()
{
TryStart();

if (!_readResult.Buffer.IsEmpty && _inputLength == 0)
{
_context.Input.AdvanceTo(_readResult.Buffer.End);
}

return OnConsumeAsync();
}

private void ThrowIfCompleted()
{
if (_completed)
Expand All @@ -143,13 +175,13 @@ private void ThrowIfCompleted()

private void CreateReadResultFromConnectionReadResult()
{
if (_readResult.Buffer.Length > _inputLength)
{
_readResult = new ReadResult(_readResult.Buffer.Slice(0, _inputLength), _readResult.IsCanceled, isCompleted: true);
}
else if (_readResult.Buffer.Length == _inputLength)
if (_readResult.Buffer.Length >= _inputLength + _totalExaminedInPreviousReadResult)
{
_readResult = new ReadResult(_readResult.Buffer, _readResult.IsCanceled, isCompleted: true);
_readCompleted = true;
_readResult = new ReadResult(
_readResult.Buffer.Slice(0, _inputLength + _totalExaminedInPreviousReadResult),
_readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 1,
_readCompleted);
}

if (_readResult.IsCompleted)
Expand All @@ -165,18 +197,38 @@ public override void AdvanceTo(SequencePosition consumed)

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
if (_inputLength == 0)
if (!_isReading)
{
return;
throw new InvalidOperationException("No reading operation to complete.");
}

var dataLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
_isReading = false;

_inputLength -= dataLength;
if (_readCompleted)
{
_readResult = new ReadResult(_readResult.Buffer.Slice(consumed, _readResult.Buffer.End), Interlocked.Exchange(ref _userCanceled, 0) == 1, _readCompleted);

if (_readResult.Buffer.Length == 0 && !_finalAdvanceCalled)
{
_context.Input.AdvanceTo(consumed);
_finalAdvanceCalled = true;
}

return;
}

var consumedLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
var examinedLength = consumedLength + _readResult.Buffer.Slice(consumed, examined).Length;

_context.Input.AdvanceTo(consumed, examined);

OnDataRead(dataLength);
var newlyExamined = examinedLength - _totalExaminedInPreviousReadResult;

OnDataRead(newlyExamined);
_totalExaminedInPreviousReadResult += newlyExamined;
_inputLength -= newlyExamined;

_totalExaminedInPreviousReadResult -= consumedLength;
}

protected override void OnReadStarting()
Expand Down
1 change: 0 additions & 1 deletion src/Servers/Kestrel/Core/test/MessageBodyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,6 @@ public async Task CopyToAsyncThrowsOnTimeout()

// Add some input and read it to start PumpAsync
input.Add("a");
Assert.Equal(1, (await body.ReadAsync()).Buffer.Length);

// Time out on the next read
input.Http1Connection.SendTimeoutResponse();
Expand Down
164 changes: 164 additions & 0 deletions src/Servers/Kestrel/test/InMemory.FunctionalTests/RequestTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,170 @@ await connection.ReceiveEnd(
}
}

[Fact]
public async Task ContentLengthReadAsyncPipeReaderBufferRequestBody()
{
var testContext = new TestServiceContext(LoggerFactory);

using (var server = new TestServer(async httpContext =>
{
var readResult = await httpContext.Request.BodyReader.ReadAsync();
// This will hang if 0 content length is not assumed by the server
Assert.Equal(5, readResult.Buffer.Length);
httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
readResult = await httpContext.Request.BodyReader.ReadAsync();
Assert.Equal(5, readResult.Buffer.Length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any tests that attempt more than one read after getting a completed read result?


}, testContext))
{
using (var connection = server.CreateConnection())
{
await connection.SendAll(
"POST / HTTP/1.0",
"Host:",
"Content-Length: 5",
"",
"hello");
await connection.ReceiveEnd(
"HTTP/1.1 200 OK",
"Connection: close",
$"Date: {testContext.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}

await server.StopAsync();
}
}

[Fact]
public async Task ContentLengthReadAsyncPipeReaderBufferRequestBodyMultipleTimes()
{
var testContext = new TestServiceContext(LoggerFactory);

using (var server = new TestServer(async httpContext =>
{
var readResult = await httpContext.Request.BodyReader.ReadAsync();
// This will hang if 0 content length is not assumed by the server
Assert.Equal(5, readResult.Buffer.Length);
httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

for (var i = 0; i < 2; i++)
{
readResult = await httpContext.Request.BodyReader.ReadAsync();
Assert.Equal(5, readResult.Buffer.Length);
httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
}
}, testContext))
{
using (var connection = server.CreateConnection())
{
await connection.SendAll(
"POST / HTTP/1.0",
"Host:",
"Content-Length: 5",
"",
"hello");
await connection.ReceiveEnd(
"HTTP/1.1 200 OK",
"Connection: close",
$"Date: {testContext.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}

await server.StopAsync();
}
}

[Fact]
public async Task ContentLengthReadAsyncSingleBytesAtATime()
{
var testContext = new TestServiceContext(LoggerFactory);
var tcs = new TaskCompletionSource<object>();
var tcs2 = new TaskCompletionSource<object>();
using (var server = new TestServer(async httpContext =>
{
var readResult = await httpContext.Request.BodyReader.ReadAsync();
Assert.Equal(3, readResult.Buffer.Length);
tcs.SetResult(null);

httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

readResult = await httpContext.Request.BodyReader.ReadAsync();
httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
tcs2.SetResult(null);

readResult = await httpContext.Request.BodyReader.ReadAsync();
Assert.Equal(5, readResult.Buffer.Length);

}, testContext))
{
using (var connection = server.CreateConnection())
{
await connection.Send(
"POST / HTTP/1.0",
"Host:",
"Content-Length: 5",
"",
"fun");
await tcs.Task;
await connection.Send(
"n");
await tcs2.Task;
await connection.Send(
"y");
await connection.ReceiveEnd(
"HTTP/1.1 200 OK",
"Connection: close",
$"Date: {testContext.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}

await server.StopAsync();
}
}

[Fact]
public async Task ContentLengthDoesNotConsumeEntireBufferDoesNotThrow()
{
var testContext = new TestServiceContext(LoggerFactory);
using (var server = new TestServer(async httpContext =>
{
var readResult = await httpContext.Request.BodyReader.ReadAsync();

httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

readResult = await httpContext.Request.BodyReader.ReadAsync();
httpContext.Request.BodyReader.AdvanceTo(readResult.Buffer.Slice(1).Start, readResult.Buffer.End);
}, testContext))
{
using (var connection = server.CreateConnection())
{
await connection.SendAll(
"POST / HTTP/1.0",
"Host:",
"Content-Length: 5",
"",
"funny");

await connection.ReceiveEnd(
"HTTP/1.1 200 OK",
"Connection: close",
$"Date: {testContext.DateHeaderValue}",
"Content-Length: 0",
"",
"");
}

await server.StopAsync();
}
}

[Fact]
public async Task ConnectionClosesWhenFinReceivedBeforeRequestCompletes()
{
Expand Down