Skip to content

HTTP/3: Fix incorrectly pooling aborted streams #35434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,15 @@ private void ShutdownWrite(Exception? shutdownReason)

public override async ValueTask DisposeAsync()
{
// Be conservative about what can be pooled.
// Only pool bidirectional streams whose pipes have completed successfully and haven't been aborted.
CanReuse = _stream.CanRead && _stream.CanWrite
&& _transportPipeReader.IsCompletedSuccessfully
&& _transportPipeWriter.IsCompletedSuccessfully
&& !_clientAbort
&& !_serverAborted;
&& !_serverAborted
&& _shutdownReadReason == null
&& _shutdownWriteReason == null;

_originalTransport.Input.Complete();
_originalTransport.Output.Complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public async Task ClientCertificate_Required_NotSent_ConnectionAborted()

// https://github.com/dotnet/runtime/issues/57246 The accept still completes even though the connection was rejected, but it's already failed.
var serverContext = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout();
qex = await Assert.ThrowsAsync<QuicException>(() => serverContext.ConnectAsync().DefaultTimeout());
Assert.Equal("Failed to open stream to peer. Error Code: INVALID_STATE", qex.Message);
await Assert.ThrowsAsync<QuicException>(() => serverContext.ConnectAsync().DefaultTimeout());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the new exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember exactly. It was something like "Stream creation error BLAH BLAH BLAH" 😄

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,52 @@ public async Task BidirectionalStream_ServerReadsDataAndCompletes_GracefullyClos
Assert.Contains(TestSink.Writes, m => m.Message.Contains(@"shutting down writes because: ""The QUIC transport's send loop completed gracefully.""."));
}

[ConditionalFact]
[MsQuicSupported]
public async Task BidirectionalStream_ReadAborted_NotPooled()
{
// Arrange
await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory);

var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint);
using var clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options);
await clientConnection.ConnectAsync().DefaultTimeout();

await using var serverConnection = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout();

// Act
var clientStream = clientConnection.OpenBidirectionalStream();
await clientStream.WriteAsync(TestData).DefaultTimeout();
var serverStream = await serverConnection.AcceptAsync().DefaultTimeout();
var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout();
serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End);

await clientStream.WriteAsync(TestData).DefaultTimeout();

// Complete writing.
await serverStream.Transport.Output.CompleteAsync();

// Abort read-side of the stream and then complete pipe.
// This simulates what Kestrel does when a request finishes without
// reading the request body to the end.
serverStream.Features.Get<IStreamAbortFeature>().AbortRead((long)Http3ErrorCode.NoError, new ConnectionAbortedException("Test message."));
await serverStream.Transport.Input.CompleteAsync();

var quicStreamContext = Assert.IsType<QuicStreamContext>(serverStream);

// Both send and receive loops have exited.
await quicStreamContext._processingTask.DefaultTimeout();
Assert.True(quicStreamContext.CanWrite);
Assert.True(quicStreamContext.CanRead);

await quicStreamContext.DisposeAsync();

var quicConnectionContext = Assert.IsType<QuicConnectionContext>(serverConnection);

// Assert
Assert.Equal(0, quicConnectionContext.StreamPool.Count);
}

[ConditionalTheory]
[MsQuicSupported]
[InlineData(1024)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,69 @@ public async Task GET_MultipleRequestsInSequence_ReusedState()
}
}

[ConditionalFact]
[MsQuicSupported]
public async Task StreamResponseContent_DelayAndTrailers_ClientSuccess()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Un-related unit test. IMO no harm in adding it. These tests are all quarantined.

{
// Arrange
var builder = CreateHostBuilder(async context =>
{
var feature = context.Features.Get<IHttpResponseTrailersFeature>();

for (var i = 1; i < 200; i++)
{
feature.Trailers.Append($"trailer-{i}", new string('!', i));
}

Logger.LogInformation($"Server trailer count: {feature.Trailers.Count}");

await context.Request.BodyReader.ReadAtLeastAsync(TestData.Length);

for (var i = 0; i < 3; i++)
{
await context.Response.BodyWriter.WriteAsync(TestData);

await Task.Delay(TimeSpan.FromMilliseconds(10));
}
});

using (var host = builder.Build())
using (var client = Http3Helpers.CreateClient())
{
await host.StartAsync();

// Act
var request = new HttpRequestMessage(HttpMethod.Post, $"https://127.0.0.1:{host.GetPort()}/");
request.Content = new ByteArrayContent(TestData);
request.Version = HttpVersion.Version30;
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;

var response = await client.SendAsync(request, CancellationToken.None);
response.EnsureSuccessStatusCode();

var responseStream = await response.Content.ReadAsStreamAsync();

await responseStream.ReadUntilEndAsync();

Logger.LogInformation($"Client trailer count: {response.TrailingHeaders.Count()}");

for (var i = 1; i < 200; i++)
{
try
{
var value = response.TrailingHeaders.GetValues($"trailer-{i}").Single();
Assert.Equal(new string('!', i), value);
}
catch (Exception ex)
{
throw new Exception($"Error checking trailer {i}", ex);
}
}

await host.StopAsync();
}
}

[ConditionalFact]
[MsQuicSupported]
public async Task GET_MultipleRequests_ConnectionAndTraceIdsUpdated()
Expand Down