From 48c63d47fa9e85b8f2eedd780ccc24136274c579 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Fri, 16 Jul 2021 13:13:03 +1200 Subject: [PATCH 1/7] HTTP/3: Add IStreamAbortFeature --- .../src/Features/IStreamAbortFeature.cs | 25 +++ .../src/PublicAPI.Unshipped.txt | 3 + .../Core/src/Internal/Http3/Http3Stream.cs | 14 +- .../Transport.Quic/src/Internal/IQuicTrace.cs | 2 + .../src/Internal/QuicStreamContext.cs | 176 ++++++++++-------- .../Transport.Quic/src/Internal/QuicTrace.cs | 22 +++ .../test/QuicStreamContextTests.cs | 49 +++++ .../shared/test/Http3/Http3InMemory.cs | 83 +++++---- .../Http3/Http3StreamTests.cs | 4 +- 9 files changed, 260 insertions(+), 118 deletions(-) create mode 100644 src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs diff --git a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs new file mode 100644 index 000000000000..a30265a9ed30 --- /dev/null +++ b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs @@ -0,0 +1,25 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace Microsoft.AspNetCore.Connections.Features +{ + /// + /// Supports aborting one side of a connection stream. + /// + public interface IStreamAbortFeature + { + /// + /// Abort the read side of the connection stream. + /// + /// The error code to send with the abort. + /// An optional describing the reason to abort the read side of the connection stream. + void AbortRead(long errorCode, ConnectionAbortedException abortReason); + + /// + /// Abort the write side of the connection stream. + /// + /// The error code to send with the abort. + /// An optional describing the reason to abort the write side of the connection stream. + void AbortWrite(long errorCode, ConnectionAbortedException abortReason); + } +} diff --git a/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt b/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt index fefd6f14e6df..5020eab47253 100644 --- a/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt +++ b/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt @@ -4,6 +4,9 @@ Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature.Socket.get -> System.Net.Sockets.Socket! Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature.State.get -> System.Collections.Generic.IDictionary! +Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature +Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortRead(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void +Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortWrite(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void Microsoft.AspNetCore.Connections.IConnectionListener.AcceptAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder.ApplicationServices.get -> System.IServiceProvider! diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs index d051d3d0e69a..3600f8288fd1 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs @@ -43,6 +43,7 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH private Http3StreamContext _context = default!; private IProtocolErrorCodeFeature _errorCodeFeature = default!; private IStreamIdFeature _streamIdFeature = default!; + private IStreamAbortFeature _streamAbortFeature = default!; private int _isClosed; private readonly Http3RawFrame _incomingFrame = new Http3RawFrame(); protected RequestHeaderParsingState _requestHeaderParsingState; @@ -58,7 +59,6 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH public bool EndStreamReceived => (_completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived; private bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted; - internal bool RstStreamReceived => (_completionState & StreamCompletionFlags.RstStreamReceived) == StreamCompletionFlags.RstStreamReceived; public Pipe RequestBodyPipe { get; private set; } = default!; @@ -87,6 +87,7 @@ public void Initialize(Http3StreamContext context) _errorCodeFeature = _context.ConnectionFeatures.Get()!; _streamIdFeature = _context.ConnectionFeatures.Get()!; + _streamAbortFeature = _context.ConnectionFeatures.Get()!; _appCompleted = null; _isClosed = 0; @@ -371,7 +372,7 @@ private void CompleteStream(bool errored) Log.RequestBodyNotEntirelyRead(ConnectionIdFeature, TraceIdentifier); } - var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.Aborted); + var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.AbortedRead); if (oldState != newState) { // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-4.1-15 @@ -379,10 +380,7 @@ private void CompleteStream(bool errored) // the request stream, send a complete response, and cleanly close the sending part of the stream. // The error code H3_NO_ERROR SHOULD be used when requesting that the client stop sending on the // request stream. - - // TODO(JamesNK): Abort the read half of the stream with H3_NO_ERROR - // https://github.com/dotnet/aspnetcore/issues/33575 - + _streamAbortFeature.AbortRead((long)Http3ErrorCode.NoError, new ConnectionAbortedException("The application completed without reading the entire request body.")); RequestBodyPipe.Writer.Complete(); } @@ -940,8 +938,8 @@ private enum PseudoHeaderFields private enum StreamCompletionFlags { None = 0, - RstStreamReceived = 1, - EndStreamReceived = 2, + EndStreamReceived = 1, + AbortedRead = 2, Aborted = 4, } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs index 1c9356b00a9b..6a5f2de2b36a 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs @@ -21,5 +21,7 @@ internal interface IQuicTrace : ILogger void StreamShutdownWrite(QuicStreamContext streamContext, string reason); void StreamAborted(QuicStreamContext streamContext, Exception ex); void StreamAbort(QuicStreamContext streamContext, string reason); + void StreamAbortRead(QuicStreamContext streamContext, string reason); + void StreamAbortWrite(QuicStreamContext streamContext, string reason); } } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index bc20dc7a014b..bcef4becd27d 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -35,6 +35,8 @@ internal partial class QuicStreamContext : TransportConnection, IPooledStream private CancellationTokenSource _streamClosedTokenSource = default!; private string? _connectionId; private const int MinAllocBufferSize = 4096; + private volatile Exception? _shutdownReadReason; + private volatile Exception? _shutdownWriteReason; private volatile Exception? _shutdownReason; private bool _streamClosed; private bool _serverAborted; @@ -175,7 +177,42 @@ private async Task DoReceive() try { - await ProcessReceives(); + var input = Input; + while (true) + { + var buffer = Input.GetMemory(MinAllocBufferSize); + var bytesReceived = await _stream.ReadAsync(buffer); + + if (bytesReceived == 0) + { + // Read completed. + break; + } + + input.Advance(bytesReceived); + + var flushTask = input.FlushAsync(); + + var paused = !flushTask.IsCompleted; + + if (paused) + { + _log.StreamPause(this); + } + + var result = await flushTask; + + if (paused) + { + _log.StreamResume(this); + } + + if (result.IsCompleted || result.IsCanceled) + { + // Pipe consumer is shut down, do we stop writing + break; + } + } } catch (QuicStreamAbortedException ex) { @@ -204,7 +241,7 @@ private async Task DoReceive() finally { // If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited. - Input.Complete(_shutdownReason ?? error); + Input.Complete(_shutdownReadReason ?? _shutdownReason ?? error); FireStreamClosed(); @@ -212,46 +249,6 @@ private async Task DoReceive() } } - private async Task ProcessReceives() - { - var input = Input; - while (true) - { - var buffer = Input.GetMemory(MinAllocBufferSize); - var bytesReceived = await _stream.ReadAsync(buffer); - - if (bytesReceived == 0) - { - // Read completed. - break; - } - - input.Advance(bytesReceived); - - var flushTask = input.FlushAsync(); - - var paused = !flushTask.IsCompleted; - - if (paused) - { - _log.StreamPause(this); - } - - var result = await flushTask; - - if (paused) - { - _log.StreamResume(this); - } - - if (result.IsCompleted || result.IsCanceled) - { - // Pipe consumer is shut down, do we stop writing - break; - } - } - } - private void FireStreamClosed() { // Guard against scheduling this multiple times @@ -291,7 +288,34 @@ private async Task DoSend() try { - await ProcessSends(); + // Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance. + var output = Output; + while (true) + { + var result = await output.ReadAsync(); + + if (result.IsCanceled) + { + break; + } + + var buffer = result.Buffer; + + var end = buffer.End; + var isCompleted = result.IsCompleted; + if (!buffer.IsEmpty) + { + await _stream.WriteAsync(buffer, endStream: isCompleted); + } + + output.AdvanceTo(end); + + if (isCompleted) + { + // Once the stream pipe is closed, shutdown the stream. + break; + } + } } catch (QuicStreamAbortedException ex) { @@ -329,38 +353,6 @@ private async Task DoSend() } } - private async Task ProcessSends() - { - // Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance. - var output = Output; - while (true) - { - var result = await output.ReadAsync(); - - if (result.IsCanceled) - { - break; - } - - var buffer = result.Buffer; - - var end = buffer.End; - var isCompleted = result.IsCompleted; - if (!buffer.IsEmpty) - { - await _stream.WriteAsync(buffer, endStream: isCompleted); - } - - output.AdvanceTo(end); - - if (isCompleted) - { - // Once the stream pipe is closed, shutdown the stream. - break; - } - } - } - public override void Abort(ConnectionAbortedException abortReason) { // This abort is called twice, make sure that doesn't happen. @@ -390,6 +382,40 @@ public override void Abort(ConnectionAbortedException abortReason) Output.CancelPendingRead(); } + public void AbortRead(long errorCode, ConnectionAbortedException abortReason) + { + lock (_shutdownLock) + { + if (_stream.CanRead) + { + _shutdownReadReason = abortReason; + _log.StreamAbortRead(this, abortReason.Message); + _stream.AbortRead(errorCode); + } + else + { + throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading."); + } + } + } + + public void AbortWrite(long errorCode, ConnectionAbortedException abortReason) + { + lock (_shutdownLock) + { + if (_stream.CanWrite) + { + _shutdownWriteReason = abortReason; + _log.StreamAbortWrite(this, abortReason.Message); + _stream.AbortWrite(errorCode); + } + else + { + throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing."); + } + } + } + private async ValueTask ShutdownWrite(Exception? shutdownReason) { try diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs index 0fa7a12c144d..14889a38994b 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs @@ -154,6 +154,28 @@ public void StreamAbort(QuicStreamContext streamContext, string reason) } } + [LoggerMessage(12, LogLevel.Debug, @"Stream id ""{ConnectionId}"" read side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)] + private static partial void StreamAbortRead(ILogger logger, string connectionId, string reason); + + public void StreamAbortRead(QuicStreamContext streamContext, string reason) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _streamAbortRead(_logger, streamContext.ConnectionId, reason, null); + } + } + + [LoggerMessage(13, LogLevel.Debug, @"Stream id ""{ConnectionId}"" write side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)] + private static partial void StreamAbortWrite(ILogger logger, string connectionId, string reason); + + public void StreamAbortWrite(QuicStreamContext streamContext, string reason) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _streamAbortWrite(_logger, streamContext.ConnectionId, reason, null); + } + } + private static StreamType GetStreamType(QuicStreamContext streamContext) => streamContext.CanRead && streamContext.CanWrite ? StreamType.Bidirectional diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index b3af0cb478c2..3f53182c8570 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -269,5 +269,54 @@ public async Task ServerToClientUnidirectionalStream_ServerAborts_ClientGetsAbor // Both send and receive loops have exited. await quicStreamContext._processingTask.DefaultTimeout(); } + + [ConditionalFact] + [MsQuicSupported] + public async Task StreamAbortFeature_AbortWrite_ClientReceivesAbort() + { + // Arrange + await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory); + + var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); + using var quicConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); + await quicConnection.ConnectAsync().DefaultTimeout(); + + await using var serverConnection = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout(); + + // Act + await using var clientStream = quicConnection.OpenBidirectionalStream(); + await clientStream.WriteAsync(TestData).DefaultTimeout(); + + await using var serverStream = await serverConnection.AcceptAsync().DefaultTimeout(); + + var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); + serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); + + var serverReadTask = serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).AsTask(); + + var streamAbortFeature = serverStream.Features.Get(); + + streamAbortFeature.AbortRead((long)Http3ErrorCode.InternalError, new ConnectionAbortedException("Test reason")); + + // Assert + + // Server writes data + await serverStream.Transport.Output.WriteAsync(TestData).DefaultTimeout(); + // Server completes its output. + await serverStream.Transport.Output.CompleteAsync().DefaultTimeout(); + + // Client successfully reads data to end + var buffer = new byte[1024]; + var readCount = await clientStream.ReadUntilEndAsync(buffer).DefaultTimeout(); + Assert.Equal(TestData.Length, readCount); + + // Client errors when writing + var clientEx = await Assert.ThrowsAsync(() => clientStream.WriteAsync(buffer).AsTask()).DefaultTimeout(); + Assert.Equal((long)Http3ErrorCode.InternalError, clientEx.ErrorCode); + + // Server errors when reading + var serverEx = await Assert.ThrowsAsync(() => serverReadTask).DefaultTimeout(); + Assert.Equal("Test reason", serverEx.Message); + } } } diff --git a/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs b/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs index 6e510708dec6..ba4c5bb1f6bd 100644 --- a/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs +++ b/src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs @@ -297,7 +297,7 @@ public void OnStreamCompleted(IHttp3Stream stream) if (_http3TestBase._runningStreams.TryRemove(stream.StreamId, out var testStream)) { - testStream._onStreamCompletedTcs.TrySetResult(); + testStream.OnStreamCompletedTcs.TrySetResult(); } } @@ -312,7 +312,7 @@ public void OnStreamCreated(IHttp3Stream stream) if (_http3TestBase._runningStreams.TryGetValue(stream.StreamId, out var testStream)) { - testStream._onStreamCreatedTcs.TrySetResult(); + testStream.OnStreamCreatedTcs.TrySetResult(); } } @@ -322,7 +322,7 @@ public void OnStreamHeaderReceived(IHttp3Stream stream) if (_http3TestBase._runningStreams.TryGetValue(stream.StreamId, out var testStream)) { - testStream._onHeaderReceivedTcs.TrySetResult(); + testStream.OnHeaderReceivedTcs.TrySetResult(); } } } @@ -404,38 +404,39 @@ internal ValueTask CreateRequestStream(Http3RequestHeaderHan } } - internal class Http3StreamBase : IProtocolErrorCodeFeature + internal class Http3StreamBase { - internal TaskCompletionSource _onStreamCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - internal TaskCompletionSource _onStreamCompletedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - internal TaskCompletionSource _onHeaderReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - internal ConnectionContext StreamContext { get; } - internal IProtocolErrorCodeFeature _protocolErrorCodeFeature; - internal DuplexPipe.DuplexPipePair _pair; - internal Http3InMemory _testBase; - internal Http3Connection _connection; + internal TaskCompletionSource OnStreamCreatedTcs { get; } = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + internal TaskCompletionSource OnStreamCompletedTcs { get; } = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + internal TaskCompletionSource OnHeaderReceivedTcs { get; } = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + internal TestStreamContext StreamContext { get; } + internal DuplexPipe.DuplexPipePair Pair { get; } + internal Http3InMemory TestBase { get; private protected set; } + internal Http3Connection Connection { get; private protected set; } public long BytesReceived { get; private set; } public long Error { - get => _protocolErrorCodeFeature.Error; - set => _protocolErrorCodeFeature.Error = value; + get => StreamContext.Error; + set => StreamContext.Error = value; } - public Task OnStreamCreatedTask => _onStreamCreatedTcs.Task; - public Task OnStreamCompletedTask => _onStreamCompletedTcs.Task; - public Task OnHeaderReceivedTask => _onHeaderReceivedTcs.Task; + public Task OnStreamCreatedTask => OnStreamCreatedTcs.Task; + public Task OnStreamCompletedTask => OnStreamCompletedTcs.Task; + public Task OnHeaderReceivedTask => OnHeaderReceivedTcs.Task; + + public ConnectionAbortedException AbortReadException => StreamContext.AbortReadException; + public ConnectionAbortedException AbortWriteException => StreamContext.AbortWriteException; public Http3StreamBase(TestStreamContext testStreamContext) { StreamContext = testStreamContext; - _protocolErrorCodeFeature = testStreamContext; - _pair = testStreamContext._pair; + Pair = testStreamContext._pair; } protected Task SendAsync(ReadOnlySpan span) { - var writableBuffer = _pair.Application.Output; + var writableBuffer = Pair.Application.Output; writableBuffer.Write(span); return FlushAsync(writableBuffer); } @@ -462,12 +463,12 @@ internal async Task ReceiveEndAsync() #if IS_FUNCTIONAL_TESTS protected Task ReadApplicationInputAsync() { - return _pair.Application.Input.ReadAsync().AsTask().DefaultTimeout(); + return Pair.Application.Input.ReadAsync().AsTask().DefaultTimeout(); } #else protected ValueTask ReadApplicationInputAsync() { - return _pair.Application.Input.ReadAsync(); + return Pair.Application.Input.ReadAsync(); } #endif @@ -523,14 +524,14 @@ internal async ValueTask ReceiveFrameAsync(bool expectEnd finally { BytesReceived += copyBuffer.Slice(copyBuffer.Start, consumed).Length; - _pair.Application.Input.AdvanceTo(consumed, examined); + Pair.Application.Input.AdvanceTo(consumed, examined); } } } internal async Task SendFrameAsync(Http3FrameType frameType, Memory data, bool endStream = false) { - var outputWriter = _pair.Application.Output; + var outputWriter = Pair.Application.Output; Http3FrameWriter.WriteHeader(frameType, data.Length, outputWriter); if (!endStream) @@ -547,7 +548,7 @@ internal async Task SendFrameAsync(Http3FrameType frameType, Memory data, internal Task EndStreamAsync(ReadOnlySpan span = default) { - var writableBuffer = _pair.Application.Output; + var writableBuffer = Pair.Application.Output; if (span.Length > 0) { writableBuffer.Write(span); @@ -595,8 +596,8 @@ internal class Http3RequestStream : Http3StreamBase, IHttpHeadersHandler public Http3RequestStream(Http3InMemory testBase, Http3Connection connection, TestStreamContext testStreamContext, Http3RequestHeaderHandler headerHandler) : base(testStreamContext) { - _testBase = testBase; - _connection = connection; + TestBase = testBase; + Connection = connection; _streamId = testStreamContext.StreamId; _testStreamContext = testStreamContext; this._headerHandler = headerHandler; @@ -635,7 +636,7 @@ internal Http3HeadersEnumerator GetHeadersEnumerator(IEnumerable.Empty); } @@ -727,7 +728,7 @@ internal class Http3ControlStream : Http3StreamBase public Http3ControlStream(Http3InMemory testBase, TestStreamContext testStreamContext) : base(testStreamContext) { - _testBase = testBase; + TestBase = testBase; _streamId = testStreamContext.StreamId; } @@ -764,7 +765,7 @@ internal async ValueTask> ExpectSettingsAsync() public async Task WriteStreamIdAsync(int id) { - var writableBuffer = _pair.Application.Output; + var writableBuffer = Pair.Application.Output; void WriteSpan(PipeWriter pw) { @@ -845,7 +846,7 @@ public async ValueTask TryReadStreamIdAsync() } finally { - _pair.Application.Input.AdvanceTo(consumed, examined); + Pair.Application.Input.AdvanceTo(consumed, examined); } } } @@ -938,7 +939,7 @@ public void RequestClose() } } - internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature, IProtocolErrorCodeFeature, IPersistentStateFeature + internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature, IProtocolErrorCodeFeature, IPersistentStateFeature, IStreamAbortFeature { private readonly Http3InMemory _testBase; @@ -999,17 +1000,23 @@ public void Initialize(long streamId) Features.Set(this); Features.Set(this); + Features.Set(this); Features.Set(this); Features.Set(this); StreamId = streamId; _testBase.Logger.LogInformation($"Initializing stream {streamId}"); ConnectionId = "TEST:" + streamId.ToString(); + AbortReadException = null; + AbortWriteException = null; _disposedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Disposed = false; } + public ConnectionAbortedException AbortReadException { get; private set; } + public ConnectionAbortedException AbortWriteException { get; private set; } + public bool Disposed { get; private set; } public Task OnDisposedTask => _disposedTcs.Task; @@ -1076,5 +1083,15 @@ IDictionary IPersistentStateFeature.State return _persistentState ?? (_persistentState = new ConnectionItems()); } } + + void IStreamAbortFeature.AbortRead(long errorCode, ConnectionAbortedException abortReason) + { + AbortReadException = abortReason; + } + + void IStreamAbortFeature.AbortWrite(long errorCode, ConnectionAbortedException abortReason) + { + AbortWriteException = abortReason; + } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs index f58b953b7918..d50a8fe1da5a 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs @@ -2488,8 +2488,8 @@ public async Task MaxRequestBodySize_ContentLengthOver_413() await requestStream.OnStreamCompletedTask.DefaultTimeout(); - // TODO(JamesNK): Check for abort of request side of stream after https://github.com/dotnet/aspnetcore/issues/31970 Assert.Contains(LogMessages, m => m.Message.Contains("the application completed without reading the entire request body.")); + Assert.Equal("The application completed without reading the entire request body.", requestStream.AbortReadException.Message); Assert.Equal(3, receivedHeaders.Count); Assert.Contains("date", receivedHeaders.Keys, StringComparer.OrdinalIgnoreCase); @@ -2817,8 +2817,8 @@ await requestStream.SendHeadersAsync(new[] await requestStream.OnStreamCompletedTask.DefaultTimeout(); - // TODO(JamesNK): Check for abort of request side of stream after https://github.com/dotnet/aspnetcore/issues/31970 Assert.Contains(LogMessages, m => m.Message.Contains("the application completed without reading the entire request body.")); + Assert.Equal("The application completed without reading the entire request body.", requestStream.AbortReadException.Message); } [Fact] From 87c7d49ac5371262668d4a4dfb6780f37c82afef Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 28 Jul 2021 17:35:53 +1200 Subject: [PATCH 2/7] Update --- .../src/Features/IStreamAbortFeature.cs | 2 +- .../QuicStreamContext.FeatureCollection.cs | 37 ++++++++++++++++++- .../src/Internal/QuicStreamContext.cs | 34 ----------------- .../shared/TransportConnection.Generated.cs | 22 +++++++++++ .../TransportConnectionFeatureCollection.cs | 1 + 5 files changed, 60 insertions(+), 36 deletions(-) diff --git a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs index a30265a9ed30..58c37a5a4eef 100644 --- a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs +++ b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs @@ -4,7 +4,7 @@ namespace Microsoft.AspNetCore.Connections.Features { /// - /// Supports aborting one side of a connection stream. + /// Supports aborting individual sides of a connection stream. /// public interface IStreamAbortFeature { diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs index 8c2cfbb31a36..493290113a49 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs @@ -7,7 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal { - internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature + internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature { private IDictionary? _persistentState; @@ -27,12 +27,47 @@ internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStre } } + public void AbortRead(long errorCode, ConnectionAbortedException abortReason) + { + lock (_shutdownLock) + { + if (_stream.CanRead) + { + _shutdownReadReason = abortReason; + _log.StreamAbortRead(this, abortReason.Message); + _stream.AbortRead(errorCode); + } + else + { + throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading."); + } + } + } + + public void AbortWrite(long errorCode, ConnectionAbortedException abortReason) + { + lock (_shutdownLock) + { + if (_stream.CanWrite) + { + _shutdownWriteReason = abortReason; + _log.StreamAbortWrite(this, abortReason.Message); + _stream.AbortWrite(errorCode); + } + else + { + throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing."); + } + } + } + private void InitializeFeatures() { _currentIPersistentStateFeature = this; _currentIStreamDirectionFeature = this; _currentIProtocolErrorCodeFeature = this; _currentIStreamIdFeature = this; + _currentIStreamAbortFeature = this; _currentITlsConnectionFeature = _connection._currentITlsConnectionFeature; } } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index bcef4becd27d..f54b908da1cf 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -382,40 +382,6 @@ public override void Abort(ConnectionAbortedException abortReason) Output.CancelPendingRead(); } - public void AbortRead(long errorCode, ConnectionAbortedException abortReason) - { - lock (_shutdownLock) - { - if (_stream.CanRead) - { - _shutdownReadReason = abortReason; - _log.StreamAbortRead(this, abortReason.Message); - _stream.AbortRead(errorCode); - } - else - { - throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading."); - } - } - } - - public void AbortWrite(long errorCode, ConnectionAbortedException abortReason) - { - lock (_shutdownLock) - { - if (_stream.CanWrite) - { - _shutdownWriteReason = abortReason; - _log.StreamAbortWrite(this, abortReason.Message); - _stream.AbortWrite(errorCode); - } - else - { - throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing."); - } - } - } - private async ValueTask ShutdownWrite(Exception? shutdownReason) { try diff --git a/src/Servers/Kestrel/shared/TransportConnection.Generated.cs b/src/Servers/Kestrel/shared/TransportConnection.Generated.cs index b7e68ec93661..534fc31d687a 100644 --- a/src/Servers/Kestrel/shared/TransportConnection.Generated.cs +++ b/src/Servers/Kestrel/shared/TransportConnection.Generated.cs @@ -33,6 +33,7 @@ internal partial class TransportConnection : IFeatureCollection, internal protected IProtocolErrorCodeFeature? _currentIProtocolErrorCodeFeature; internal protected IStreamDirectionFeature? _currentIStreamDirectionFeature; internal protected IStreamIdFeature? _currentIStreamIdFeature; + internal protected IStreamAbortFeature? _currentIStreamAbortFeature; internal protected ITlsConnectionFeature? _currentITlsConnectionFeature; private int _featureRevision; @@ -52,6 +53,7 @@ private void FastReset() _currentIProtocolErrorCodeFeature = null; _currentIStreamDirectionFeature = null; _currentIStreamIdFeature = null; + _currentIStreamAbortFeature = null; _currentITlsConnectionFeature = null; } @@ -164,6 +166,10 @@ private void ExtraFeatureSet(Type key, object? value) { feature = _currentIStreamIdFeature; } + else if (key == typeof(IStreamAbortFeature)) + { + feature = _currentIStreamAbortFeature; + } else if (key == typeof(ITlsConnectionFeature)) { feature = _currentITlsConnectionFeature; @@ -220,6 +226,10 @@ private void ExtraFeatureSet(Type key, object? value) { _currentIStreamIdFeature = (IStreamIdFeature?)value; } + else if (key == typeof(IStreamAbortFeature)) + { + _currentIStreamAbortFeature = (IStreamAbortFeature?)value; + } else if (key == typeof(ITlsConnectionFeature)) { _currentITlsConnectionFeature = (ITlsConnectionFeature?)value; @@ -278,6 +288,10 @@ private void ExtraFeatureSet(Type key, object? value) { feature = Unsafe.As(ref _currentIStreamIdFeature); } + else if (typeof(TFeature) == typeof(IStreamAbortFeature)) + { + feature = Unsafe.As(ref _currentIStreamAbortFeature); + } else if (typeof(TFeature) == typeof(ITlsConnectionFeature)) { feature = Unsafe.As(ref _currentITlsConnectionFeature); @@ -337,6 +351,10 @@ private void ExtraFeatureSet(Type key, object? value) { _currentIStreamIdFeature = Unsafe.As(ref feature); } + else if (typeof(TFeature) == typeof(IStreamAbortFeature)) + { + _currentIStreamAbortFeature = Unsafe.As(ref feature); + } else if (typeof(TFeature) == typeof(ITlsConnectionFeature)) { _currentITlsConnectionFeature = Unsafe.As(ref feature); @@ -389,6 +407,10 @@ private IEnumerable> FastEnumerable() { yield return new KeyValuePair(typeof(IStreamIdFeature), _currentIStreamIdFeature); } + if (_currentIStreamAbortFeature != null) + { + yield return new KeyValuePair(typeof(IStreamAbortFeature), _currentIStreamAbortFeature); + } if (_currentITlsConnectionFeature != null) { yield return new KeyValuePair(typeof(ITlsConnectionFeature), _currentITlsConnectionFeature); diff --git a/src/Servers/Kestrel/tools/CodeGenerator/TransportConnectionFeatureCollection.cs b/src/Servers/Kestrel/tools/CodeGenerator/TransportConnectionFeatureCollection.cs index c25420ac8d1e..70a479259028 100644 --- a/src/Servers/Kestrel/tools/CodeGenerator/TransportConnectionFeatureCollection.cs +++ b/src/Servers/Kestrel/tools/CodeGenerator/TransportConnectionFeatureCollection.cs @@ -24,6 +24,7 @@ public static string GenerateFile() "IProtocolErrorCodeFeature", "IStreamDirectionFeature", "IStreamIdFeature", + "IStreamAbortFeature", "ITlsConnectionFeature" }; From 6cf9e32b35159269b902e24dfacfa60ddf79b762 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 3 Aug 2021 09:23:33 +1200 Subject: [PATCH 3/7] Review feedback --- .../Connections.Abstractions/src/BaseConnectionContext.cs | 2 +- src/Servers/Connections.Abstractions/src/ConnectionContext.cs | 2 +- .../src/Features/IStreamAbortFeature.cs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Servers/Connections.Abstractions/src/BaseConnectionContext.cs b/src/Servers/Connections.Abstractions/src/BaseConnectionContext.cs index 9b97a73c848a..23c1886faedc 100644 --- a/src/Servers/Connections.Abstractions/src/BaseConnectionContext.cs +++ b/src/Servers/Connections.Abstractions/src/BaseConnectionContext.cs @@ -53,7 +53,7 @@ public abstract class BaseConnectionContext : IAsyncDisposable /// /// Aborts the underlying connection. /// - /// An optional describing the reason the connection is being terminated. + /// A describing the reason the connection is being terminated. public abstract void Abort(ConnectionAbortedException abortReason); /// diff --git a/src/Servers/Connections.Abstractions/src/ConnectionContext.cs b/src/Servers/Connections.Abstractions/src/ConnectionContext.cs index dea270291c67..696a15bc1005 100644 --- a/src/Servers/Connections.Abstractions/src/ConnectionContext.cs +++ b/src/Servers/Connections.Abstractions/src/ConnectionContext.cs @@ -20,7 +20,7 @@ public abstract class ConnectionContext : BaseConnectionContext, IAsyncDisposabl /// /// Aborts the underlying connection. /// - /// An optional describing the reason the connection is being terminated. + /// A describing the reason the connection is being terminated. public override void Abort(ConnectionAbortedException abortReason) { // We expect this to be overridden, but this helps maintain back compat diff --git a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs index 58c37a5a4eef..f7d2b7a239cc 100644 --- a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs +++ b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs @@ -12,14 +12,14 @@ public interface IStreamAbortFeature /// Abort the read side of the connection stream. /// /// The error code to send with the abort. - /// An optional describing the reason to abort the read side of the connection stream. + /// A describing the reason to abort the read side of the connection stream. void AbortRead(long errorCode, ConnectionAbortedException abortReason); /// /// Abort the write side of the connection stream. /// /// The error code to send with the abort. - /// An optional describing the reason to abort the write side of the connection stream. + /// A describing the reason to abort the write side of the connection stream. void AbortWrite(long errorCode, ConnectionAbortedException abortReason); } } From ed7e7097fc3fd990b20ef5317df91b5256b19fe2 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 3 Aug 2021 09:41:54 +1200 Subject: [PATCH 4/7] Fix --- .../Kestrel/Transport.Quic/test/QuicStreamContextTests.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index 3f53182c8570..7fd025f57258 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -306,12 +306,11 @@ public async Task StreamAbortFeature_AbortWrite_ClientReceivesAbort() await serverStream.Transport.Output.CompleteAsync().DefaultTimeout(); // Client successfully reads data to end - var buffer = new byte[1024]; - var readCount = await clientStream.ReadUntilEndAsync(buffer).DefaultTimeout(); - Assert.Equal(TestData.Length, readCount); + var data = await clientStream.ReadUntilEndAsync().DefaultTimeout(); + Assert.Equal(TestData, data); // Client errors when writing - var clientEx = await Assert.ThrowsAsync(() => clientStream.WriteAsync(buffer).AsTask()).DefaultTimeout(); + var clientEx = await Assert.ThrowsAsync(() => clientStream.WriteAsync(data).AsTask()).DefaultTimeout(); Assert.Equal((long)Http3ErrorCode.InternalError, clientEx.ErrorCode); // Server errors when reading From 1e82ddc8823e808e02c804520e24ced3d747aa76 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 3 Aug 2021 10:02:59 +1200 Subject: [PATCH 5/7] Fix --- src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs index 14889a38994b..28ffe37e88a2 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs @@ -161,7 +161,7 @@ public void StreamAbortRead(QuicStreamContext streamContext, string reason) { if (_logger.IsEnabled(LogLevel.Debug)) { - _streamAbortRead(_logger, streamContext.ConnectionId, reason, null); + StreamAbortRead(_logger, streamContext.ConnectionId, reason); } } @@ -172,7 +172,7 @@ public void StreamAbortWrite(QuicStreamContext streamContext, string reason) { if (_logger.IsEnabled(LogLevel.Debug)) { - _streamAbortWrite(_logger, streamContext.ConnectionId, reason, null); + StreamAbortWrite(_logger, streamContext.ConnectionId, reason); } } From 2b8557cba74c8e36e8701475e43fb49b39d43d65 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 3 Aug 2021 10:07:40 +1200 Subject: [PATCH 6/7] Fix --- src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs index 28ffe37e88a2..d9da03f82ad1 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs @@ -154,7 +154,7 @@ public void StreamAbort(QuicStreamContext streamContext, string reason) } } - [LoggerMessage(12, LogLevel.Debug, @"Stream id ""{ConnectionId}"" read side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)] + [LoggerMessage(13, LogLevel.Debug, @"Stream id ""{ConnectionId}"" read side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)] private static partial void StreamAbortRead(ILogger logger, string connectionId, string reason); public void StreamAbortRead(QuicStreamContext streamContext, string reason) @@ -165,7 +165,7 @@ public void StreamAbortRead(QuicStreamContext streamContext, string reason) } } - [LoggerMessage(13, LogLevel.Debug, @"Stream id ""{ConnectionId}"" write side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)] + [LoggerMessage(14, LogLevel.Debug, @"Stream id ""{ConnectionId}"" write side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)] private static partial void StreamAbortWrite(ILogger logger, string connectionId, string reason); public void StreamAbortWrite(QuicStreamContext streamContext, string reason) From 1cd3478732826ca51b872349ba70e5ffd2d67c51 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 3 Aug 2021 17:26:46 +1200 Subject: [PATCH 7/7] Update --- .../src/Features/IStreamAbortFeature.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs index f7d2b7a239cc..f43141001997 100644 --- a/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs +++ b/src/Servers/Connections.Abstractions/src/Features/IStreamAbortFeature.cs @@ -1,5 +1,5 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. namespace Microsoft.AspNetCore.Connections.Features {