diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs index b3847e529636..92dd733318e8 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs @@ -176,38 +176,62 @@ public void Tick(DateTimeOffset now) return; } - UpdateStartingStreams(now); + UpdateStreamTimeouts(now); } - private void UpdateStartingStreams(DateTimeOffset now) + private void UpdateStreamTimeouts(DateTimeOffset now) { + // This method checks for timeouts: + // 1. When a stream first starts and waits to receive headers. + // Uses RequestHeadersTimeout. + // 2. When a stream finished and is waiting for underlying transport to drain. + // Uses MinResponseDataRate. + var ticks = now.Ticks; lock (_streams) { foreach (var stream in _streams.Values) { - if (stream.ReceivedHeader) + if (stream.IsReceivingHeader) { - continue; - } + if (stream.StreamTimeoutTicks == default) + { + // On expiration overflow, use max value. + var expirationTicks = ticks + _context.ServiceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks; + stream.StreamTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue; + } - if (stream.HeaderTimeoutTicks == default) - { - // On expiration overflow, use max value. - var expirationTicks = ticks + _context.ServiceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks; - stream.HeaderTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue; + if (stream.StreamTimeoutTicks < ticks) + { + if (stream.IsRequestStream) + { + stream.Abort(new ConnectionAbortedException(CoreStrings.BadRequest_RequestHeadersTimeout), Http3ErrorCode.RequestRejected); + } + else + { + stream.Abort(new ConnectionAbortedException(CoreStrings.Http3ControlStreamHeaderTimeout), Http3ErrorCode.StreamCreationError); + } + } } - - if (stream.HeaderTimeoutTicks < ticks) + else if (stream.IsDraining) { - if (stream.IsRequestStream) + var minDataRate = _context.ServiceContext.ServerOptions.Limits.MinResponseDataRate; + if (minDataRate == null) { - stream.Abort(new ConnectionAbortedException(CoreStrings.BadRequest_RequestHeadersTimeout), Http3ErrorCode.RequestRejected); + continue; } - else + + if (stream.StreamTimeoutTicks == default) + { + stream.StreamTimeoutTicks = _context.TimeoutControl.GetResponseDrainDeadline(ticks, minDataRate); + } + + if (stream.StreamTimeoutTicks < ticks) { - stream.Abort(new ConnectionAbortedException(CoreStrings.Http3ControlStreamHeaderTimeout), Http3ErrorCode.StreamCreationError); + // Cancel connection to be consistent with other data rate limits. + Log.ResponseMinimumDataRateNotSatisfied(_context.ConnectionId, stream.TraceIdentifier); + Abort(new ConnectionAbortedException(CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied), Http3ErrorCode.InternalError); } } } @@ -396,7 +420,6 @@ public async Task ProcessRequestsAsync(IHttpApplication appl } _context.TimeoutControl.CancelTimeout(); - _context.TimeoutControl.StartDrainTimeout(Limits.MinResponseDataRate, Limits.MaxResponseBufferSize); } catch { @@ -424,7 +447,7 @@ private ConnectionAbortedException CreateConnectionAbortError(Exception? error, if (clientAbort) { - return new ConnectionAbortedException("The client closed the HTTP/3 connection.", error!); + return new ConnectionAbortedException(CoreStrings.ConnectionAbortedByClient, error!); } return new ConnectionAbortedException(CoreStrings.Http3ConnectionFaulted, error!); @@ -648,7 +671,7 @@ void IHttp3StreamLifetimeHandler.OnInboundControlStreamSetting(Http3SettingType void IHttp3StreamLifetimeHandler.OnStreamHeaderReceived(IHttp3Stream stream) { - Debug.Assert(stream.ReceivedHeader); + Debug.Assert(!stream.IsReceivingHeader); } public void HandleRequestHeadersTimeout() diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs index 823c07162bf4..418a52a9d989 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs @@ -67,10 +67,11 @@ private void OnStreamClosed() public PipeReader Input => _context.Transport.Input; public IKestrelTrace Log => _context.ServiceContext.Log; - public long HeaderTimeoutTicks { get; set; } - public bool ReceivedHeader => _headerType >= 0; - + public long StreamTimeoutTicks { get; set; } + public bool IsReceivingHeader => _headerType == -1; + public bool IsDraining => false; public bool IsRequestStream => false; + public string TraceIdentifier => _context.StreamContext.ConnectionId; public void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode) { diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs index 77183f321502..a03e73226702 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs @@ -1,20 +1,15 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using System.Buffers; using System.Diagnostics; using System.IO.Pipelines; using System.Net.Http; using System.Net.Http.QPack; -using System.Net.Quic; using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Hosting.Server; -using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.Extensions.Logging; @@ -73,8 +68,9 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits; public long StreamId => _streamIdFeature.StreamId; - public long HeaderTimeoutTicks { get; set; } - public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received + public long StreamTimeoutTicks { get; set; } + public bool IsReceivingHeader => _appCompleted == null; // TCS is assigned once headers are received + public bool IsDraining => _appCompleted?.Task.IsCompleted ?? false; // Draining starts once app is complete public bool IsRequestStream => true; @@ -97,7 +93,7 @@ public void Initialize(Http3StreamContext context) _totalParsedHeaderSize = 0; _isMethodConnect = false; _completionState = default; - HeaderTimeoutTicks = 0; + StreamTimeoutTicks = 0; if (_frameWriter == null) { @@ -409,7 +405,6 @@ private bool TryClose() return true; } - // TODO make this actually close the Http3Stream by telling quic to close the stream. return false; } @@ -501,13 +496,26 @@ public async Task ProcessRequestAsync(IHttpApplication appli } finally { - ApplyCompletionFlag(StreamCompletionFlags.Completed); + // Drain transports and dispose. + await _context.StreamContext.DisposeAsync(); // Tells the connection to remove the stream from its active collection. + ApplyCompletionFlag(StreamCompletionFlags.Completed); _context.StreamLifetimeHandler.OnStreamCompleted(this); - // Dispose must happen after stream is no longer active. - await _context.StreamContext.DisposeAsync(); + // TODO this is a hack for .NET 6 pooling. + // + // Pooling needs to happen after transports have been drained and stream + // has been completed and is no longer active. All of this logic can't + // be placed in ConnectionContext.DisposeAsync. Instead, QuicStreamContext + // has pooling happen in QuicStreamContext.Dispose. + // + // ConnectionContext only implements IDisposableAsync by default. Only + // QuicStreamContext should pass this check. + if (_context.StreamContext is IDisposable disposableStream) + { + disposableStream.Dispose(); + } } } } @@ -600,8 +608,6 @@ private async Task ProcessHeadersFrameAsync(IHttpApplication case RequestHeaderParsingState.Headers: break; case RequestHeaderParsingState.Trailers: - // trailers - // TODO figure out if there is anything else to do here. return; default: Debug.Fail("Unexpected header parsing state."); @@ -627,6 +633,7 @@ private async Task ProcessHeadersFrameAsync(IHttpApplication } _appCompleted = new TaskCompletionSource(); + StreamTimeoutTicks = default; _context.StreamLifetimeHandler.OnStreamHeaderReceived(this); ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/IHttp3Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/IHttp3Stream.cs index 8e7e1b84e820..62e25d8d39e7 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/IHttp3Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/IHttp3Stream.cs @@ -14,20 +14,30 @@ internal interface IHttp3Stream long StreamId { get; } /// - /// Used to track the timeout between when the stream was started by the client, and getting a header. - /// Value is driven by . + /// Used to track the timeout in two situations: + /// 1. Between when the stream was started by the client, and getting a header. + /// Value is driven by . + /// 2. Between when the request delegate is complete and the transport draining. + /// Value is driven by . /// - long HeaderTimeoutTicks { get; set; } + long StreamTimeoutTicks { get; set; } /// - /// The stream has received and parsed the header frame. + /// The stream is receiving the header frame. /// - Request streams = HEADERS frame. /// - Control streams = unidirectional stream header. /// - bool ReceivedHeader { get; } + bool IsReceivingHeader { get; } + + /// + /// The stream request delegate is complete and the transport is draining. + /// + bool IsDraining { get; } bool IsRequestStream { get; } + string TraceIdentifier { get; } + void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode); } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/ITimeoutControl.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/ITimeoutControl.cs index f956fca57eb7..b393678f383d 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/ITimeoutControl.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/ITimeoutControl.cs @@ -26,5 +26,6 @@ internal interface ITimeoutControl void StartTimingWrite(); void StopTimingWrite(); void BytesWrittenToBuffer(MinDataRate minRate, long count); + long GetResponseDrainDeadline(long ticks, MinDataRate minRate); } } diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimeoutControl.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimeoutControl.cs index 5af0630621f7..2372d81a6157 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimeoutControl.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimeoutControl.cs @@ -330,5 +330,14 @@ void IConnectionTimeoutFeature.ResetTimeout(TimeSpan timeSpan) ResetTimeout(timeSpan.Ticks, TimeoutReason.TimeoutFeature); } + + public long GetResponseDrainDeadline(long ticks, MinDataRate minRate) + { + // On grace period overflow, use max value. + var gracePeriod = ticks + minRate.GracePeriod.Ticks; + gracePeriod = gracePeriod >= 0 ? gracePeriod : long.MaxValue; + + return Math.Max(_writeTimingTimeoutTimestamp, gracePeriod); + } } } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs index 302c975e413f..7be7e6e296db 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Net.Sockets; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; @@ -31,15 +30,18 @@ public void AbortRead(long errorCode, ConnectionAbortedException abortReason) { lock (_shutdownLock) { - if (_stream.CanRead) + if (_stream != null) { - _shutdownReadReason = abortReason; - _log.StreamAbortRead(this, errorCode, abortReason.Message); - _stream.AbortRead(errorCode); - } - else - { - throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading."); + if (_stream.CanRead) + { + _shutdownReadReason = abortReason; + _log.StreamAbortRead(this, errorCode, abortReason.Message); + _stream.AbortRead(errorCode); + } + else + { + throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading."); + } } } } @@ -48,15 +50,18 @@ public void AbortWrite(long errorCode, ConnectionAbortedException abortReason) { lock (_shutdownLock) { - if (_stream.CanWrite) - { - _shutdownWriteReason = abortReason; - _log.StreamAbortWrite(this, errorCode, abortReason.Message); - _stream.AbortWrite(errorCode); - } - else + if (_stream != null) { - throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing."); + if (_stream.CanWrite) + { + _shutdownWriteReason = abortReason; + _log.StreamAbortWrite(this, errorCode, abortReason.Message); + _stream.AbortWrite(errorCode); + } + else + { + throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing."); + } } } } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index bf8c2ae3747b..e4d2b8a641b9 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -17,12 +17,12 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal { - internal partial class QuicStreamContext : TransportConnection, IPooledStream + internal partial class QuicStreamContext : TransportConnection, IPooledStream, IDisposable { // Internal for testing. internal Task _processingTask = Task.CompletedTask; - private QuicStream _stream = default!; + private QuicStream? _stream; private readonly QuicConnectionContext _connection; private readonly QuicTransportContext _context; private readonly Pipe _inputPipe; @@ -140,6 +140,8 @@ public void Start() private async Task StartAsync() { + Debug.Assert(_stream != null); + try { // Spawn send and receive logic @@ -169,6 +171,8 @@ private async Task StartAsync() private async Task DoReceive() { + Debug.Assert(_stream != null); + Exception? error = null; try @@ -282,6 +286,8 @@ private void CancelConnectionClosedToken() private async Task DoSend() { + Debug.Assert(_stream != null); + Exception? shutdownReason = null; Exception? unexpectedError = null; @@ -379,13 +385,16 @@ public override void Abort(ConnectionAbortedException abortReason) lock (_shutdownLock) { - if (_stream.CanRead) - { - _stream.AbortRead(Error); - } - if (_stream.CanWrite) + if (_stream != null) { - _stream.AbortWrite(Error); + if (_stream.CanRead) + { + _stream.AbortRead(Error); + } + if (_stream.CanWrite) + { + _stream.AbortWrite(Error); + } } } @@ -395,6 +404,8 @@ public override void Abort(ConnectionAbortedException abortReason) private void ShutdownWrite(Exception? shutdownReason) { + Debug.Assert(_stream != null); + try { lock (_shutdownLock) @@ -415,6 +426,10 @@ private void ShutdownWrite(Exception? shutdownReason) public override async ValueTask DisposeAsync() { + if (_stream == null) + { + return; + } // Be conservative about what can be pooled. // Only pool bidirectional streams whose pipes have completed successfully and haven't been aborted. CanReuse = _stream.CanRead && _stream.CanWrite @@ -430,9 +445,20 @@ public override async ValueTask DisposeAsync() await _processingTask; - _stream.Dispose(); - _stream = null!; + lock (_shutdownLock) + { + if (!CanReuse) + { + DisposeCore(); + } + _stream.Dispose(); + _stream = null!; + } + } + + public void Dispose() + { if (!_connection.TryReturnStream(this)) { // Dispose when one of: diff --git a/src/Servers/Kestrel/Transport.Quic/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.csproj b/src/Servers/Kestrel/Transport.Quic/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.csproj index f9a3b6d57c35..a1838ce726b8 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.csproj +++ b/src/Servers/Kestrel/Transport.Quic/src/Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.csproj @@ -17,6 +17,7 @@ + diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs index bdc614e1fc02..c43d3fd63fd3 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs @@ -578,6 +578,7 @@ static async Task SendStream(RequestState requestState) // Both send and receive loops have exited. await quicStreamContext._processingTask.DefaultTimeout(); await quicStreamContext.DisposeAsync(); + quicStreamContext.Dispose(); } } @@ -614,6 +615,7 @@ public async Task PersistentState_StreamsReused_StatePersisted() var quicStreamContext1 = Assert.IsType(serverStream1); await quicStreamContext1._processingTask.DefaultTimeout(); await quicStreamContext1.DisposeAsync(); + quicStreamContext1.Dispose(); var clientStream2 = clientConnection.OpenBidirectionalStream(); await clientStream2.WriteAsync(TestData, endStream: true).DefaultTimeout(); @@ -634,6 +636,7 @@ public async Task PersistentState_StreamsReused_StatePersisted() var quicStreamContext2 = Assert.IsType(serverStream2); await quicStreamContext2._processingTask.DefaultTimeout(); await quicStreamContext2.DisposeAsync(); + quicStreamContext2.Dispose(); Assert.Same(quicStreamContext1, quicStreamContext2); diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index d417eb274d0c..3f94d5cb802b 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -88,6 +88,7 @@ public async Task BidirectionalStream_ReadAborted_NotPooled() Assert.True(quicStreamContext.CanRead); await quicStreamContext.DisposeAsync(); + quicStreamContext.Dispose(); var quicConnectionContext = Assert.IsType(serverConnection); @@ -147,6 +148,7 @@ public async Task BidirectionalStream_ServerWritesDataAndDisposes_ClientReadsDat Logger.LogInformation("Server disposing stream."); await quicStreamContext.DisposeAsync().DefaultTimeout(); + quicStreamContext.Dispose(); Logger.LogInformation("Client reading until end of stream."); var data = await clientStream.ReadUntilEndAsync().DefaultTimeout(); diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs index fc5edcfa7004..7a6d989dfcac 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs @@ -130,6 +130,7 @@ public static async Task CreateAndCompleteBidirectionalStream Assert.True(quicStreamContext.CanRead); await quicStreamContext.DisposeAsync(); + quicStreamContext.Dispose(); return quicStreamContext; } diff --git a/src/Servers/Kestrel/perf/Microbenchmarks/Microsoft.AspNetCore.Server.Kestrel.Microbenchmarks.csproj b/src/Servers/Kestrel/perf/Microbenchmarks/Microsoft.AspNetCore.Server.Kestrel.Microbenchmarks.csproj index c55afd7e62e3..f7bfa926cc67 100644 --- a/src/Servers/Kestrel/perf/Microbenchmarks/Microsoft.AspNetCore.Server.Kestrel.Microbenchmarks.csproj +++ b/src/Servers/Kestrel/perf/Microbenchmarks/Microsoft.AspNetCore.Server.Kestrel.Microbenchmarks.csproj @@ -22,6 +22,7 @@ + diff --git a/src/Servers/Kestrel/perf/Microbenchmarks/Mocks/MockTimeoutControl.cs b/src/Servers/Kestrel/perf/Microbenchmarks/Mocks/MockTimeoutControl.cs index ae5d7d1540c2..ae7c68867e50 100644 --- a/src/Servers/Kestrel/perf/Microbenchmarks/Mocks/MockTimeoutControl.cs +++ b/src/Servers/Kestrel/perf/Microbenchmarks/Mocks/MockTimeoutControl.cs @@ -24,6 +24,11 @@ public void CancelTimeout() { } + public long GetResponseDrainDeadline(long ticks, MinDataRate minRate) + { + return 0; + } + public void InitializeHttp2(InputFlowControl connectionInputFlowControl) { } diff --git a/src/Servers/Kestrel/shared/ConnectionCompletion.cs b/src/Servers/Kestrel/shared/ConnectionCompletion.cs new file mode 100644 index 000000000000..f82b67645ebd --- /dev/null +++ b/src/Servers/Kestrel/shared/ConnectionCompletion.cs @@ -0,0 +1,67 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using Microsoft.Extensions.Logging; + +#nullable enable + +namespace Microsoft.AspNetCore.Connections +{ + internal static class ConnectionCompletion + { + public static Task FireOnCompletedAsync(ILogger logger, Stack, object>>? onCompleted) + { + if (onCompleted == null || onCompleted.Count == 0) + { + return Task.CompletedTask; + } + + return CompleteAsyncMayAwait(logger, onCompleted); + } + + private static Task CompleteAsyncMayAwait(ILogger logger, Stack, object>> onCompleted) + { + while (onCompleted.TryPop(out var entry)) + { + try + { + var task = entry.Key.Invoke(entry.Value); + if (!task.IsCompletedSuccessfully) + { + return CompleteAsyncAwaited(task, logger, onCompleted); + } + } + catch (Exception ex) + { + logger.LogError(ex, "An error occurred running an IConnectionCompleteFeature.OnCompleted callback."); + } + } + + return Task.CompletedTask; + } + + private static async Task CompleteAsyncAwaited(Task currentTask, ILogger logger, Stack, object>> onCompleted) + { + try + { + await currentTask; + } + catch (Exception ex) + { + logger.LogError(ex, "An error occurred running an IConnectionCompleteFeature.OnCompleted callback."); + } + + while (onCompleted.TryPop(out var entry)) + { + try + { + await entry.Key.Invoke(entry.Value); + } + catch (Exception ex) + { + logger.LogError(ex, "An error occurred running an IConnectionCompleteFeature.OnCompleted callback."); + } + } + } + } +} diff --git a/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs b/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs index c94d5aaa727e..e3e84d79f7b7 100644 --- a/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs +++ b/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs @@ -608,6 +608,13 @@ internal class Http3RequestStream : Http3StreamBase, IHttpHeadersHandler public bool Disposed => _testStreamContext.Disposed; public Task OnDisposedTask => _testStreamContext.OnDisposedTask; + public Task OnDisposingTask => _testStreamContext.OnDisposingTask; + + public TaskCompletionSource StartStreamDisposeTcs + { + get => _testStreamContext.StartStreamDisposeTcs; + internal set => _testStreamContext.StartStreamDisposeTcs = value; + } public Http3RequestStream(Http3InMemory testBase, Http3Connection connection, TestStreamContext testStreamContext, Http3RequestHeaderHandler headerHandler) : base(testStreamContext) @@ -994,7 +1001,7 @@ public void RequestClose() } } - internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature, IProtocolErrorCodeFeature, IPersistentStateFeature, IStreamAbortFeature + internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature, IProtocolErrorCodeFeature, IPersistentStateFeature, IStreamAbortFeature, IDisposable { private readonly Http3InMemory _testBase; @@ -1010,6 +1017,7 @@ internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, I // Persistent state collection is not reset with a stream by design. private IDictionary _persistentState; + private TaskCompletionSource _disposingTcs; private TaskCompletionSource _disposedTcs; public TestStreamContext(bool canRead, bool canWrite, Http3InMemory testBase) @@ -1066,14 +1074,18 @@ public void Initialize(long streamId) AbortWriteException = null; _disposedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _disposingTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Disposed = false; } + public TaskCompletionSource StartStreamDisposeTcs { get; internal set; } + public ConnectionAbortedException AbortReadException { get; private set; } public ConnectionAbortedException AbortWriteException { get; private set; } public bool Disposed { get; private set; } + public Task OnDisposingTask => _disposingTcs.Task; public Task OnDisposedTask => _disposedTcs.Task; public override string ConnectionId { get; set; } @@ -1108,33 +1120,41 @@ public override void Abort(ConnectionAbortedException abortReason) _pair.Application.Output.Complete(abortReason); } - public override ValueTask DisposeAsync() + public override async ValueTask DisposeAsync() { + _disposingTcs.TrySetResult(); + if (StartStreamDisposeTcs != null) + { + await StartStreamDisposeTcs.Task; + } + _testBase.Logger.LogDebug($"Disposing stream {StreamId}"); var readerCompletedSuccessfully = _transportPipeReader.IsCompletedSuccessfully; var writerCompletedSuccessfully = _transportPipeWriter.IsCompletedSuccessfully; - var canReuse = !_isAborted && + CanReuse = !_isAborted && readerCompletedSuccessfully && writerCompletedSuccessfully; _pair.Transport.Input.Complete(); _pair.Transport.Output.Complete(); + } - if (canReuse) + public void Dispose() + { + if (CanReuse) { _testBase.Logger.LogDebug($"Pooling stream {StreamId} for reuse."); _testBase._streamContextPool.Enqueue(this); } else { - _testBase.Logger.LogDebug($"Can't reuse stream {StreamId}. Aborted: {_isAborted}, Reader completed successfully: {readerCompletedSuccessfully}, Writer completed successfully: {writerCompletedSuccessfully}."); + // Note that completed flags could be out of date at this point. + _testBase.Logger.LogDebug($"Can't reuse stream {StreamId}. Aborted: {_isAborted}, Reader completed successfully: {_transportPipeReader.IsCompletedSuccessfully}, Writer completed successfully: {_transportPipeWriter.IsCompletedSuccessfully}."); } Disposed = true; _disposedTcs.TrySetResult(); - - return ValueTask.CompletedTask; } internal void Complete() @@ -1151,6 +1171,8 @@ IDictionary IPersistentStateFeature.State } } + public bool CanReuse { get; private set; } + void IStreamAbortFeature.AbortRead(long errorCode, ConnectionAbortedException abortReason) { AbortReadException = abortReason; diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs index 5b77a3d19fce..cd313b6b38a4 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs @@ -1384,6 +1384,11 @@ public virtual void Tick(DateTimeOffset now) { _realTimeoutControl.Tick(now); } + + public long GetResponseDrainDeadline(long ticks, MinDataRate minRate) + { + return _realTimeoutControl.GetResponseDrainDeadline(ticks, minRate); + } } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs index fb920856c2e3..e067f6de562e 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs @@ -257,6 +257,8 @@ public async Task ResponseDrain_SlowerThanMinimumDataRate_AbortsConnection() _mockTimeoutHandler.VerifyNoOtherCalls(); _mockConnectionContext.VerifyNoOtherCalls(); + + Assert.Contains(TestSink.Writes, w => w.EventId.Name == "ResponseMinimumDataRateNotSatisfied"); } [Theory] diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TimeoutTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TimeoutTests.cs index af980db07502..affc353e85e1 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TimeoutTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TimeoutTests.cs @@ -41,7 +41,7 @@ public async Task HEADERS_IncompleteFrameReceivedWithinRequestHeadersTimeout_Str Http3Api.TriggerTick(now); Http3Api.TriggerTick(now + limits.RequestHeadersTimeout); - Assert.Equal((now + limits.RequestHeadersTimeout).Ticks, serverRequestStream.HeaderTimeoutTicks); + Assert.Equal((now + limits.RequestHeadersTimeout).Ticks, serverRequestStream.StreamTimeoutTicks); Http3Api.TriggerTick(now + limits.RequestHeadersTimeout + TimeSpan.FromTicks(1)); @@ -76,7 +76,7 @@ public async Task HEADERS_HeaderFrameReceivedWithinRequestHeadersTimeout_Success Http3Api.TriggerTick(now); Http3Api.TriggerTick(now + limits.RequestHeadersTimeout); - Assert.Equal((now + limits.RequestHeadersTimeout).Ticks, serverRequestStream.HeaderTimeoutTicks); + Assert.Equal((now + limits.RequestHeadersTimeout).Ticks, serverRequestStream.StreamTimeoutTicks); await requestStream.SendHeadersAsync(headers).DefaultTimeout(); @@ -118,7 +118,7 @@ public async Task ControlStream_HeaderNotReceivedWithinRequestHeadersTimeout_Str Http3Api.TriggerTick(now); Http3Api.TriggerTick(now + limits.RequestHeadersTimeout); - Assert.Equal((now + limits.RequestHeadersTimeout).Ticks, serverInboundControlStream.HeaderTimeoutTicks); + Assert.Equal((now + limits.RequestHeadersTimeout).Ticks, serverInboundControlStream.StreamTimeoutTicks); Http3Api.TriggerTick(now + limits.RequestHeadersTimeout + TimeSpan.FromTicks(1)); @@ -188,7 +188,7 @@ public async Task ControlStream_RequestHeadersTimeoutMaxValue_ExpirationIsMaxVal Http3Api.TriggerTick(now); - Assert.Equal(TimeSpan.MaxValue.Ticks, serverInboundControlStream.HeaderTimeoutTicks); + Assert.Equal(TimeSpan.MaxValue.Ticks, serverInboundControlStream.StreamTimeoutTicks); } [Fact] @@ -233,41 +233,53 @@ await Http3Api.WaitForConnectionErrorAsync( _mockTimeoutHandler.VerifyNoOtherCalls(); } - /* - * Additional work around closing connections is required before response drain can be supported. [Fact] public async Task ResponseDrain_SlowerThanMinimumDataRate_AbortsConnection() { - var mockSystemClock = _serviceContext.MockSystemClock; + var now = _serviceContext.MockSystemClock.UtcNow; var limits = _serviceContext.ServerOptions.Limits; + var mockSystemClock = _serviceContext.MockSystemClock; - _timeoutControl.Initialize(mockSystemClock.UtcNow.Ticks); + // Use non-default value to ensure the min request and response rates aren't mixed up. + limits.MinResponseDataRate = new MinDataRate(480, TimeSpan.FromSeconds(2.5)); - await InitializeConnectionAsync(_noopApplication); + await Http3Api.InitializeConnectionAsync(_noopApplication); - var inboundControlStream = await GetInboundControlStream(); + var inboundControlStream = await Http3Api.GetInboundControlStream(); await inboundControlStream.ExpectSettingsAsync(); - CloseConnectionGracefully(); + var requestStream = await Http3Api.CreateRequestStream(); - await inboundControlStream.ReceiveFrameAsync().DefaultTimeout(); - await inboundControlStream.ReceiveFrameAsync().DefaultTimeout(); - await inboundControlStream.ReceiveEndAsync().DefaultTimeout(); + requestStream.StartStreamDisposeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - //await WaitForConnectionStopAsync(expectedLastStreamId: VariableLengthIntegerHelper.EightByteLimit, ignoreNonGoAwayFrames: false, expectedErrorCode: Http3ErrorCode.NoError); + await requestStream.SendHeadersAsync(new[] + { + new KeyValuePair(HeaderNames.Path, "/"), + new KeyValuePair(HeaderNames.Scheme, "http"), + new KeyValuePair(HeaderNames.Method, "GET"), + new KeyValuePair(HeaderNames.Authority, "localhost:80"), + }, endStream: true); - AdvanceClock(TimeSpan.FromSeconds(inboundControlStream.BytesReceived / limits.MinResponseDataRate.BytesPerSecond) + - limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5)); + await requestStream.OnDisposingTask.DefaultTimeout(); - _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); + Http3Api.TriggerTick(now); + Assert.Equal(0, requestStream.Error); - AdvanceClock(TimeSpan.FromSeconds(1)); + Http3Api.TriggerTick(now + TimeSpan.FromTicks(1)); + Assert.Equal(0, requestStream.Error); - _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); + Http3Api.TriggerTick(now + limits.MinResponseDataRate.GracePeriod + TimeSpan.FromTicks(1)); - _mockTimeoutHandler.VerifyNoOtherCalls(); + requestStream.StartStreamDisposeTcs.TrySetResult(); + + await Http3Api.WaitForConnectionErrorAsync( + ignoreNonGoAwayFrames: false, + expectedLastStreamId: 4, + Http3ErrorCode.InternalError, + expectedErrorMessage: CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied); + + Assert.Contains(TestSink.Writes, w => w.EventId.Name == "ResponseMinimumDataRateNotSatisfied"); } - */ private class EchoAppWithNotification { diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/InMemory.FunctionalTests.csproj b/src/Servers/Kestrel/test/InMemory.FunctionalTests/InMemory.FunctionalTests.csproj index 82be1b56ae04..da4ffb2ae380 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/InMemory.FunctionalTests.csproj +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/InMemory.FunctionalTests.csproj @@ -20,6 +20,7 @@ +