Skip to content

HTTP/3: Response drain timeout #35322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,38 +176,62 @@ public void Tick(DateTimeOffset now)
return;
}

UpdateStartingStreams(now);
UpdateStreamTimeouts(now);
}

private void UpdateStartingStreams(DateTimeOffset now)
private void UpdateStreamTimeouts(DateTimeOffset now)
{
// This method checks for timeouts:
// 1. When a stream first starts and waits to receive headers.
// Uses RequestHeadersTimeout.
// 2. When a stream finished and is waiting for underlying transport to drain.
// Uses MinResponseDataRate.

var ticks = now.Ticks;

lock (_streams)
{
foreach (var stream in _streams.Values)
{
if (stream.ReceivedHeader)
if (stream.IsReceivingHeader)
{
continue;
}
if (stream.StreamTimeoutTicks == default)
{
// On expiration overflow, use max value.
var expirationTicks = ticks + _context.ServiceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks;
stream.StreamTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue;
}

if (stream.HeaderTimeoutTicks == default)
{
// On expiration overflow, use max value.
var expirationTicks = ticks + _context.ServiceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks;
stream.HeaderTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue;
if (stream.StreamTimeoutTicks < ticks)
{
if (stream.IsRequestStream)
{
stream.Abort(new ConnectionAbortedException(CoreStrings.BadRequest_RequestHeadersTimeout), Http3ErrorCode.RequestRejected);
}
else
{
stream.Abort(new ConnectionAbortedException(CoreStrings.Http3ControlStreamHeaderTimeout), Http3ErrorCode.StreamCreationError);
}
}
}

if (stream.HeaderTimeoutTicks < ticks)
else if (stream.IsDraining)
{
if (stream.IsRequestStream)
var minDataRate = _context.ServiceContext.ServerOptions.Limits.MinResponseDataRate;
if (minDataRate == null)
{
stream.Abort(new ConnectionAbortedException(CoreStrings.BadRequest_RequestHeadersTimeout), Http3ErrorCode.RequestRejected);
continue;
}
else

if (stream.StreamTimeoutTicks == default)
{
stream.StreamTimeoutTicks = _context.TimeoutControl.GetResponseDrainDeadline(ticks, minDataRate);
}

if (stream.StreamTimeoutTicks < ticks)
{
stream.Abort(new ConnectionAbortedException(CoreStrings.Http3ControlStreamHeaderTimeout), Http3ErrorCode.StreamCreationError);
// Cancel connection to be consistent with other data rate limits.
Log.ResponseMinimumDataRateNotSatisfied(_context.ConnectionId, stream.TraceIdentifier);
Abort(new ConnectionAbortedException(CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied), Http3ErrorCode.InternalError);
}
}
}
Expand Down Expand Up @@ -396,7 +420,6 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> appl
}

_context.TimeoutControl.CancelTimeout();
_context.TimeoutControl.StartDrainTimeout(Limits.MinResponseDataRate, Limits.MaxResponseBufferSize);
}
catch
{
Expand Down Expand Up @@ -424,7 +447,7 @@ private ConnectionAbortedException CreateConnectionAbortError(Exception? error,

if (clientAbort)
{
return new ConnectionAbortedException("The client closed the HTTP/3 connection.", error!);
return new ConnectionAbortedException(CoreStrings.ConnectionAbortedByClient, error!);
}

return new ConnectionAbortedException(CoreStrings.Http3ConnectionFaulted, error!);
Expand Down Expand Up @@ -648,7 +671,7 @@ void IHttp3StreamLifetimeHandler.OnInboundControlStreamSetting(Http3SettingType

void IHttp3StreamLifetimeHandler.OnStreamHeaderReceived(IHttp3Stream stream)
{
Debug.Assert(stream.ReceivedHeader);
Debug.Assert(!stream.IsReceivingHeader);
}

public void HandleRequestHeadersTimeout()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ private void OnStreamClosed()
public PipeReader Input => _context.Transport.Input;
public IKestrelTrace Log => _context.ServiceContext.Log;

public long HeaderTimeoutTicks { get; set; }
public bool ReceivedHeader => _headerType >= 0;

public long StreamTimeoutTicks { get; set; }
public bool IsReceivingHeader => _headerType == -1;
public bool IsDraining => false;
public bool IsRequestStream => false;
public string TraceIdentifier => _context.StreamContext.ConnectionId;

public void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode)
{
Expand Down
35 changes: 21 additions & 14 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Http;
using System.Net.Http.QPack;
using System.Net.Quic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -73,8 +68,9 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits;
public long StreamId => _streamIdFeature.StreamId;

public long HeaderTimeoutTicks { get; set; }
public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received
public long StreamTimeoutTicks { get; set; }
public bool IsReceivingHeader => _appCompleted == null; // TCS is assigned once headers are received
public bool IsDraining => _appCompleted?.Task.IsCompleted ?? false; // Draining starts once app is complete

public bool IsRequestStream => true;

Expand All @@ -97,7 +93,7 @@ public void Initialize(Http3StreamContext context)
_totalParsedHeaderSize = 0;
_isMethodConnect = false;
_completionState = default;
HeaderTimeoutTicks = 0;
StreamTimeoutTicks = 0;

if (_frameWriter == null)
{
Expand Down Expand Up @@ -409,7 +405,6 @@ private bool TryClose()
return true;
}

// TODO make this actually close the Http3Stream by telling quic to close the stream.
return false;
}

Expand Down Expand Up @@ -501,13 +496,26 @@ public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> appli
}
finally
{
ApplyCompletionFlag(StreamCompletionFlags.Completed);
// Drain transports and dispose.
await _context.StreamContext.DisposeAsync();

// Tells the connection to remove the stream from its active collection.
ApplyCompletionFlag(StreamCompletionFlags.Completed);
_context.StreamLifetimeHandler.OnStreamCompleted(this);

// Dispose must happen after stream is no longer active.
await _context.StreamContext.DisposeAsync();
// TODO this is a hack for .NET 6 pooling.
//
// Pooling needs to happen after transports have been drained and stream
// has been completed and is no longer active. All of this logic can't
// be placed in ConnectionContext.DisposeAsync. Instead, QuicStreamContext
// has pooling happen in QuicStreamContext.Dispose.
//
// ConnectionContext only implements IDisposableAsync by default. Only
// QuicStreamContext should pass this check.
if (_context.StreamContext is IDisposable disposableStream)
{
disposableStream.Dispose();
}
}
}
}
Expand Down Expand Up @@ -600,8 +608,6 @@ private async Task ProcessHeadersFrameAsync<TContext>(IHttpApplication<TContext>
case RequestHeaderParsingState.Headers:
break;
case RequestHeaderParsingState.Trailers:
// trailers
// TODO figure out if there is anything else to do here.
return;
default:
Debug.Fail("Unexpected header parsing state.");
Expand All @@ -627,6 +633,7 @@ private async Task ProcessHeadersFrameAsync<TContext>(IHttpApplication<TContext>
}

_appCompleted = new TaskCompletionSource();
StreamTimeoutTicks = default;
_context.StreamLifetimeHandler.OnStreamHeaderReceived(this);

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
Expand Down
20 changes: 15 additions & 5 deletions src/Servers/Kestrel/Core/src/Internal/Http3/IHttp3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,30 @@ internal interface IHttp3Stream
long StreamId { get; }

/// <summary>
/// Used to track the timeout between when the stream was started by the client, and getting a header.
/// Value is driven by <see cref="KestrelServerLimits.RequestHeadersTimeout"/>.
/// Used to track the timeout in two situations:
/// 1. Between when the stream was started by the client, and getting a header.
/// Value is driven by <see cref="KestrelServerLimits.RequestHeadersTimeout"/>.
/// 2. Between when the request delegate is complete and the transport draining.
/// Value is driven by <see cref="KestrelServerLimits.MinResponseDataRate"/>.
/// </summary>
long HeaderTimeoutTicks { get; set; }
long StreamTimeoutTicks { get; set; }

/// <summary>
/// The stream has received and parsed the header frame.
/// The stream is receiving the header frame.
/// - Request streams = HEADERS frame.
/// - Control streams = unidirectional stream header.
/// </summary>
bool ReceivedHeader { get; }
bool IsReceivingHeader { get; }

/// <summary>
/// The stream request delegate is complete and the transport is draining.
/// </summary>
bool IsDraining { get; }

bool IsRequestStream { get; }

string TraceIdentifier { get; }

void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ internal interface ITimeoutControl
void StartTimingWrite();
void StopTimingWrite();
void BytesWrittenToBuffer(MinDataRate minRate, long count);
long GetResponseDrainDeadline(long ticks, MinDataRate minRate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,5 +330,14 @@ void IConnectionTimeoutFeature.ResetTimeout(TimeSpan timeSpan)

ResetTimeout(timeSpan.Ticks, TimeoutReason.TimeoutFeature);
}

public long GetResponseDrainDeadline(long ticks, MinDataRate minRate)
{
// On grace period overflow, use max value.
var gracePeriod = ticks + minRate.GracePeriod.Ticks;
gracePeriod = gracePeriod >= 0 ? gracePeriod : long.MaxValue;

return Math.Max(_writeTimingTimeoutTimestamp, gracePeriod);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Net.Sockets;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;

Expand Down Expand Up @@ -31,15 +30,18 @@ public void AbortRead(long errorCode, ConnectionAbortedException abortReason)
{
lock (_shutdownLock)
{
if (_stream.CanRead)
if (_stream != null)
{
_shutdownReadReason = abortReason;
_log.StreamAbortRead(this, errorCode, abortReason.Message);
_stream.AbortRead(errorCode);
}
else
{
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
if (_stream.CanRead)
{
_shutdownReadReason = abortReason;
_log.StreamAbortRead(this, errorCode, abortReason.Message);
_stream.AbortRead(errorCode);
}
else
{
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
}
}
}
}
Expand All @@ -48,15 +50,18 @@ public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
{
lock (_shutdownLock)
{
if (_stream.CanWrite)
{
_shutdownWriteReason = abortReason;
_log.StreamAbortWrite(this, errorCode, abortReason.Message);
_stream.AbortWrite(errorCode);
}
else
if (_stream != null)
{
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
if (_stream.CanWrite)
{
_shutdownWriteReason = abortReason;
_log.StreamAbortWrite(this, errorCode, abortReason.Message);
_stream.AbortWrite(errorCode);
}
else
{
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
}
}
}
}
Expand Down
Loading