Skip to content

HTTP/3: Add IStreamAbortFeature #34409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class BaseConnectionContext : IAsyncDisposable
/// <summary>
/// Aborts the underlying connection.
/// </summary>
/// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
public abstract void Abort(ConnectionAbortedException abortReason);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class ConnectionContext : BaseConnectionContext, IAsyncDisposabl
/// <summary>
/// Aborts the underlying connection.
/// </summary>
/// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
public override void Abort(ConnectionAbortedException abortReason)
{
// We expect this to be overridden, but this helps maintain back compat
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Microsoft.AspNetCore.Connections.Features
{
/// <summary>
/// Supports aborting individual sides of a connection stream.
/// </summary>
public interface IStreamAbortFeature
{
/// <summary>
/// Abort the read side of the connection stream.
/// </summary>
/// <param name="errorCode">The error code to send with the abort.</param>
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason to abort the read side of the connection stream.</param>
void AbortRead(long errorCode, ConnectionAbortedException abortReason);

/// <summary>
/// Abort the write side of the connection stream.
/// </summary>
/// <param name="errorCode">The error code to send with the abort.</param>
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason to abort the write side of the connection stream.</param>
void AbortWrite(long errorCode, ConnectionAbortedException abortReason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature
Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature.Socket.get -> System.Net.Sockets.Socket!
Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature
Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature.State.get -> System.Collections.Generic.IDictionary<object!, object?>!
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortRead(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortWrite(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void
Microsoft.AspNetCore.Connections.IConnectionListener.AcceptAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<Microsoft.AspNetCore.Connections.ConnectionContext?>
Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder
Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder.ApplicationServices.get -> System.IServiceProvider!
Expand Down
14 changes: 6 additions & 8 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
private Http3StreamContext _context = default!;
private IProtocolErrorCodeFeature _errorCodeFeature = default!;
private IStreamIdFeature _streamIdFeature = default!;
private IStreamAbortFeature _streamAbortFeature = default!;
private int _isClosed;
private readonly Http3RawFrame _incomingFrame = new Http3RawFrame();
protected RequestHeaderParsingState _requestHeaderParsingState;
Expand All @@ -58,7 +59,6 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH

public bool EndStreamReceived => (_completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived;
private bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted;
internal bool RstStreamReceived => (_completionState & StreamCompletionFlags.RstStreamReceived) == StreamCompletionFlags.RstStreamReceived;

public Pipe RequestBodyPipe { get; private set; } = default!;

Expand Down Expand Up @@ -87,6 +87,7 @@ public void Initialize(Http3StreamContext context)

_errorCodeFeature = _context.ConnectionFeatures.Get<IProtocolErrorCodeFeature>()!;
_streamIdFeature = _context.ConnectionFeatures.Get<IStreamIdFeature>()!;
_streamAbortFeature = _context.ConnectionFeatures.Get<IStreamAbortFeature>()!;

_appCompleted = null;
_isClosed = 0;
Expand Down Expand Up @@ -371,18 +372,15 @@ private void CompleteStream(bool errored)
Log.RequestBodyNotEntirelyRead(ConnectionIdFeature, TraceIdentifier);
}

var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.Aborted);
var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.AbortedRead);
if (oldState != newState)
{
// https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-4.1-15
// When the server does not need to receive the remainder of the request, it MAY abort reading
// the request stream, send a complete response, and cleanly close the sending part of the stream.
// The error code H3_NO_ERROR SHOULD be used when requesting that the client stop sending on the
// request stream.

// TODO(JamesNK): Abort the read half of the stream with H3_NO_ERROR
// https://github.com/dotnet/aspnetcore/issues/33575

_streamAbortFeature.AbortRead((long)Http3ErrorCode.NoError, new ConnectionAbortedException("The application completed without reading the entire request body."));
RequestBodyPipe.Writer.Complete();
}

Expand Down Expand Up @@ -940,8 +938,8 @@ private enum PseudoHeaderFields
private enum StreamCompletionFlags
{
None = 0,
RstStreamReceived = 1,
EndStreamReceived = 2,
EndStreamReceived = 1,
AbortedRead = 2,
Aborted = 4,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ internal interface IQuicTrace : ILogger
void StreamShutdownWrite(QuicStreamContext streamContext, string reason);
void StreamAborted(QuicStreamContext streamContext, Exception ex);
void StreamAbort(QuicStreamContext streamContext, string reason);
void StreamAbortRead(QuicStreamContext streamContext, string reason);
void StreamAbortWrite(QuicStreamContext streamContext, string reason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
{
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
{
private IDictionary<object, object?>? _persistentState;

Expand All @@ -27,12 +27,47 @@ internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStre
}
}

public void AbortRead(long errorCode, ConnectionAbortedException abortReason)
{
lock (_shutdownLock)
{
if (_stream.CanRead)
{
_shutdownReadReason = abortReason;
_log.StreamAbortRead(this, abortReason.Message);
_stream.AbortRead(errorCode);
}
else
{
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
}
}
}

public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
{
lock (_shutdownLock)
{
if (_stream.CanWrite)
{
_shutdownWriteReason = abortReason;
_log.StreamAbortWrite(this, abortReason.Message);
_stream.AbortWrite(errorCode);
}
else
{
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
}
}
}

private void InitializeFeatures()
{
_currentIPersistentStateFeature = this;
_currentIStreamDirectionFeature = this;
_currentIProtocolErrorCodeFeature = this;
_currentIStreamIdFeature = this;
_currentIStreamAbortFeature = this;
_currentITlsConnectionFeature = _connection._currentITlsConnectionFeature;
}
}
Expand Down
142 changes: 67 additions & 75 deletions src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ internal partial class QuicStreamContext : TransportConnection, IPooledStream
private CancellationTokenSource _streamClosedTokenSource = default!;
private string? _connectionId;
private const int MinAllocBufferSize = 4096;
private volatile Exception? _shutdownReadReason;
private volatile Exception? _shutdownWriteReason;
private volatile Exception? _shutdownReason;
private bool _streamClosed;
private bool _serverAborted;
Expand Down Expand Up @@ -175,7 +177,42 @@ private async Task DoReceive()

try
{
await ProcessReceives();
var input = Input;
while (true)
{
var buffer = Input.GetMemory(MinAllocBufferSize);
var bytesReceived = await _stream.ReadAsync(buffer);

if (bytesReceived == 0)
{
// Read completed.
break;
}

input.Advance(bytesReceived);

var flushTask = input.FlushAsync();

var paused = !flushTask.IsCompleted;

if (paused)
{
_log.StreamPause(this);
}

var result = await flushTask;

if (paused)
{
_log.StreamResume(this);
}

if (result.IsCompleted || result.IsCanceled)
{
// Pipe consumer is shut down, do we stop writing
break;
}
}
}
catch (QuicStreamAbortedException ex)
{
Expand Down Expand Up @@ -204,54 +241,14 @@ private async Task DoReceive()
finally
{
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
Input.Complete(_shutdownReason ?? error);
Input.Complete(_shutdownReadReason ?? _shutdownReason ?? error);

FireStreamClosed();

await _waitForConnectionClosedTcs.Task;
}
}

private async Task ProcessReceives()
{
var input = Input;
while (true)
{
var buffer = Input.GetMemory(MinAllocBufferSize);
var bytesReceived = await _stream.ReadAsync(buffer);

if (bytesReceived == 0)
{
// Read completed.
break;
}

input.Advance(bytesReceived);

var flushTask = input.FlushAsync();

var paused = !flushTask.IsCompleted;

if (paused)
{
_log.StreamPause(this);
}

var result = await flushTask;

if (paused)
{
_log.StreamResume(this);
}

if (result.IsCompleted || result.IsCanceled)
{
// Pipe consumer is shut down, do we stop writing
break;
}
}
}

private void FireStreamClosed()
{
// Guard against scheduling this multiple times
Expand Down Expand Up @@ -291,7 +288,34 @@ private async Task DoSend()

try
{
await ProcessSends();
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
var output = Output;
while (true)
{
var result = await output.ReadAsync();

if (result.IsCanceled)
{
break;
}

var buffer = result.Buffer;

var end = buffer.End;
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await _stream.WriteAsync(buffer, endStream: isCompleted);
}

output.AdvanceTo(end);

if (isCompleted)
{
// Once the stream pipe is closed, shutdown the stream.
break;
}
}
}
catch (QuicStreamAbortedException ex)
{
Expand Down Expand Up @@ -329,38 +353,6 @@ private async Task DoSend()
}
}

private async Task ProcessSends()
{
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
var output = Output;
while (true)
{
var result = await output.ReadAsync();

if (result.IsCanceled)
{
break;
}

var buffer = result.Buffer;

var end = buffer.End;
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await _stream.WriteAsync(buffer, endStream: isCompleted);
}

output.AdvanceTo(end);

if (isCompleted)
{
// Once the stream pipe is closed, shutdown the stream.
break;
}
}
}

public override void Abort(ConnectionAbortedException abortReason)
{
// This abort is called twice, make sure that doesn't happen.
Expand Down
22 changes: 22 additions & 0 deletions src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ public void StreamAbort(QuicStreamContext streamContext, string reason)
}
}

[LoggerMessage(13, LogLevel.Debug, @"Stream id ""{ConnectionId}"" read side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)]
private static partial void StreamAbortRead(ILogger logger, string connectionId, string reason);

public void StreamAbortRead(QuicStreamContext streamContext, string reason)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
StreamAbortRead(_logger, streamContext.ConnectionId, reason);
}
}

[LoggerMessage(14, LogLevel.Debug, @"Stream id ""{ConnectionId}"" write side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)]
private static partial void StreamAbortWrite(ILogger logger, string connectionId, string reason);

public void StreamAbortWrite(QuicStreamContext streamContext, string reason)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
StreamAbortWrite(_logger, streamContext.ConnectionId, reason);
}
}

private static StreamType GetStreamType(QuicStreamContext streamContext) =>
streamContext.CanRead && streamContext.CanWrite
? StreamType.Bidirectional
Expand Down
Loading