Skip to content

Commit 1516d2c

Browse files
committed
HTTP/3 response drain timeout
1 parent 62be6d1 commit 1516d2c

19 files changed

+315
-94
lines changed

src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -176,38 +176,64 @@ public void Tick(DateTimeOffset now)
176176
return;
177177
}
178178

179-
UpdateStartingStreams(now);
179+
UpdateStreamTimeouts(now);
180180
}
181181

182-
private void UpdateStartingStreams(DateTimeOffset now)
182+
private void UpdateStreamTimeouts(DateTimeOffset now)
183183
{
184+
// This method checks for timeouts:
185+
// 1. When a stream first starts and waits to receive headers.
186+
// Uses RequestHeadersTimeout.
187+
// 2. When a stream finished and is waiting for underlying transport to drain.
188+
// Uses MinResponseDataRate.
189+
184190
var ticks = now.Ticks;
185191

186192
lock (_streams)
187193
{
188194
foreach (var stream in _streams.Values)
189195
{
190-
if (stream.ReceivedHeader)
196+
if (stream.IsReceivingHeader)
191197
{
192-
continue;
193-
}
198+
if (stream.StreamTimeoutTicks == default)
199+
{
200+
// On expiration overflow, use max value.
201+
var expirationTicks = ticks + _context.ServiceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks;
202+
stream.StreamTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue;
203+
}
194204

195-
if (stream.HeaderTimeoutTicks == default)
196-
{
197-
// On expiration overflow, use max value.
198-
var expirationTicks = ticks + _context.ServiceContext.ServerOptions.Limits.RequestHeadersTimeout.Ticks;
199-
stream.HeaderTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue;
205+
if (stream.StreamTimeoutTicks < ticks)
206+
{
207+
if (stream.IsRequestStream)
208+
{
209+
stream.Abort(new ConnectionAbortedException(CoreStrings.BadRequest_RequestHeadersTimeout), Http3ErrorCode.RequestRejected);
210+
}
211+
else
212+
{
213+
stream.Abort(new ConnectionAbortedException(CoreStrings.Http3ControlStreamHeaderTimeout), Http3ErrorCode.StreamCreationError);
214+
}
215+
}
200216
}
201-
202-
if (stream.HeaderTimeoutTicks < ticks)
217+
else if (stream.IsDraining)
203218
{
204-
if (stream.IsRequestStream)
219+
var minDataRate = _context.ServiceContext.ServerOptions.Limits.MinResponseDataRate;
220+
if (minDataRate == null)
205221
{
206-
stream.Abort(new ConnectionAbortedException(CoreStrings.BadRequest_RequestHeadersTimeout), Http3ErrorCode.RequestRejected);
222+
continue;
207223
}
208-
else
224+
225+
if (stream.StreamTimeoutTicks == default)
226+
{
227+
var expirationTicks = Math.Max(
228+
_context.TimeoutControl.GetWriteTimingTimeoutTimestamp(),
229+
ticks + minDataRate.GracePeriod.Ticks);
230+
231+
stream.StreamTimeoutTicks = expirationTicks >= 0 ? expirationTicks : long.MaxValue;
232+
}
233+
234+
if (stream.StreamTimeoutTicks < ticks)
209235
{
210-
stream.Abort(new ConnectionAbortedException(CoreStrings.Http3ControlStreamHeaderTimeout), Http3ErrorCode.StreamCreationError);
236+
stream.Abort(new ConnectionAbortedException(CoreStrings.ConnectionTimedBecauseResponseMininumDataRateNotSatisfied), Http3ErrorCode.RequestCancelled);
211237
}
212238
}
213239
}
@@ -396,7 +422,6 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> appl
396422
}
397423

398424
_context.TimeoutControl.CancelTimeout();
399-
_context.TimeoutControl.StartDrainTimeout(Limits.MinResponseDataRate, Limits.MaxResponseBufferSize);
400425
}
401426
catch
402427
{
@@ -648,7 +673,7 @@ void IHttp3StreamLifetimeHandler.OnInboundControlStreamSetting(Http3SettingType
648673

649674
void IHttp3StreamLifetimeHandler.OnStreamHeaderReceived(IHttp3Stream stream)
650675
{
651-
Debug.Assert(stream.ReceivedHeader);
676+
Debug.Assert(!stream.IsReceivingHeader);
652677
}
653678

654679
public void HandleRequestHeadersTimeout()

src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ private void OnStreamClosed()
6767
public PipeReader Input => _context.Transport.Input;
6868
public IKestrelTrace Log => _context.ServiceContext.Log;
6969

70-
public long HeaderTimeoutTicks { get; set; }
71-
public bool ReceivedHeader => _headerType >= 0;
70+
public long StreamTimeoutTicks { get; set; }
71+
public bool IsReceivingHeader => _headerType == -1;
72+
public bool IsDraining => false;
7273

7374
public bool IsRequestStream => false;
7475

src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
using System;
54
using System.Buffers;
65
using System.Diagnostics;
76
using System.IO.Pipelines;
87
using System.Net.Http;
98
using System.Net.Http.QPack;
10-
using System.Net.Quic;
119
using System.Runtime.CompilerServices;
12-
using System.Threading;
13-
using System.Threading.Tasks;
1410
using Microsoft.AspNetCore.Connections;
1511
using Microsoft.AspNetCore.Connections.Features;
1612
using Microsoft.AspNetCore.Hosting.Server;
17-
using Microsoft.AspNetCore.Http.Features;
1813
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
1914
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
2015
using Microsoft.Extensions.Logging;
@@ -73,8 +68,9 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
7368
public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits;
7469
public long StreamId => _streamIdFeature.StreamId;
7570

76-
public long HeaderTimeoutTicks { get; set; }
77-
public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received
71+
public long StreamTimeoutTicks { get; set; }
72+
public bool IsReceivingHeader => _appCompleted == null; // TCS is assigned once headers are received
73+
public bool IsDraining => _appCompleted?.Task.IsCompleted ?? false; // Draining starts once app is complete
7874

7975
public bool IsRequestStream => true;
8076

@@ -97,7 +93,7 @@ public void Initialize(Http3StreamContext context)
9793
_totalParsedHeaderSize = 0;
9894
_isMethodConnect = false;
9995
_completionState = default;
100-
HeaderTimeoutTicks = 0;
96+
StreamTimeoutTicks = 0;
10197

10298
if (_frameWriter == null)
10399
{
@@ -409,7 +405,6 @@ private bool TryClose()
409405
return true;
410406
}
411407

412-
// TODO make this actually close the Http3Stream by telling quic to close the stream.
413408
return false;
414409
}
415410

@@ -501,12 +496,21 @@ public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> appli
501496
}
502497
finally
503498
{
504-
ApplyCompletionFlag(StreamCompletionFlags.Completed);
499+
// Completing the stream needs to happen in the middle of DisposeAsync.
500+
// After the stream has drained but before the stream is returned to the pool.
501+
// OnCompleted callback executes in the right place.
502+
_context.StreamContext.Features.Get<IConnectionCompleteFeature>()!.OnCompleted(static (state) =>
503+
{
504+
var s = (Http3Stream)state;
505+
506+
s.ApplyCompletionFlag(StreamCompletionFlags.Completed);
507+
508+
// Tells the connection to remove the stream from its active collection.
509+
s._context.StreamLifetimeHandler.OnStreamCompleted(s);
505510

506-
// Tells the connection to remove the stream from its active collection.
507-
_context.StreamLifetimeHandler.OnStreamCompleted(this);
511+
return Task.CompletedTask;
512+
}, this);
508513

509-
// Dispose must happen after stream is no longer active.
510514
await _context.StreamContext.DisposeAsync();
511515
}
512516
}
@@ -600,8 +604,6 @@ private async Task ProcessHeadersFrameAsync<TContext>(IHttpApplication<TContext>
600604
case RequestHeaderParsingState.Headers:
601605
break;
602606
case RequestHeaderParsingState.Trailers:
603-
// trailers
604-
// TODO figure out if there is anything else to do here.
605607
return;
606608
default:
607609
Debug.Fail("Unexpected header parsing state.");
@@ -627,6 +629,7 @@ private async Task ProcessHeadersFrameAsync<TContext>(IHttpApplication<TContext>
627629
}
628630

629631
_appCompleted = new TaskCompletionSource();
632+
StreamTimeoutTicks = default;
630633
_context.StreamLifetimeHandler.OnStreamHeaderReceived(this);
631634

632635
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);

src/Servers/Kestrel/Core/src/Internal/Http3/IHttp3Stream.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,25 @@ internal interface IHttp3Stream
1414
long StreamId { get; }
1515

1616
/// <summary>
17-
/// Used to track the timeout between when the stream was started by the client, and getting a header.
18-
/// Value is driven by <see cref="KestrelServerLimits.RequestHeadersTimeout"/>.
17+
/// Used to track the timeout in two situations:
18+
/// 1. Between when the stream was started by the client, and getting a header.
19+
/// Value is driven by <see cref="KestrelServerLimits.RequestHeadersTimeout"/>.
20+
/// 2. Between when the request delegate is complete and the transport draining.
21+
/// Value is driven by <see cref="KestrelServerLimits.MinResponseDataRate"/>.
1922
/// </summary>
20-
long HeaderTimeoutTicks { get; set; }
23+
long StreamTimeoutTicks { get; set; }
2124

2225
/// <summary>
23-
/// The stream has received and parsed the header frame.
26+
/// The stream is receiving the header frame.
2427
/// - Request streams = HEADERS frame.
2528
/// - Control streams = unidirectional stream header.
2629
/// </summary>
27-
bool ReceivedHeader { get; }
30+
bool IsReceivingHeader { get; }
31+
32+
/// <summary>
33+
/// The stream request delegate is complete and the transport is draining.
34+
/// </summary>
35+
bool IsDraining { get; }
2836

2937
bool IsRequestStream { get; }
3038

src/Servers/Kestrel/Core/src/Internal/Infrastructure/ITimeoutControl.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ internal interface ITimeoutControl
2626
void StartTimingWrite();
2727
void StopTimingWrite();
2828
void BytesWrittenToBuffer(MinDataRate minRate, long count);
29+
long GetWriteTimingTimeoutTimestamp();
2930
}
3031
}

src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimeoutControl.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,5 +330,10 @@ void IConnectionTimeoutFeature.ResetTimeout(TimeSpan timeSpan)
330330

331331
ResetTimeout(timeSpan.Ticks, TimeoutReason.TimeoutFeature);
332332
}
333+
334+
public long GetWriteTimingTimeoutTimestamp()
335+
{
336+
return _writeTimingTimeoutTimestamp;
337+
}
333338
}
334339
}

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
using System.Net.Sockets;
55
using Microsoft.AspNetCore.Connections;
66
using Microsoft.AspNetCore.Connections.Features;
7+
using Microsoft.Extensions.Logging;
78

89
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
910
{
10-
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
11+
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature, IConnectionCompleteFeature
1112
{
1213
private IDictionary<object, object?>? _persistentState;
14+
private Stack<KeyValuePair<Func<object, Task>, object>>? _onCompleted;
1315

1416
public bool CanRead { get; private set; }
1517
public bool CanWrite { get; private set; }
@@ -31,15 +33,18 @@ public void AbortRead(long errorCode, ConnectionAbortedException abortReason)
3133
{
3234
lock (_shutdownLock)
3335
{
34-
if (_stream.CanRead)
36+
if (_stream != null)
3537
{
36-
_shutdownReadReason = abortReason;
37-
_log.StreamAbortRead(this, errorCode, abortReason.Message);
38-
_stream.AbortRead(errorCode);
39-
}
40-
else
41-
{
42-
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
38+
if (_stream.CanRead)
39+
{
40+
_shutdownReadReason = abortReason;
41+
_log.StreamAbortRead(this, errorCode, abortReason.Message);
42+
_stream.AbortRead(errorCode);
43+
}
44+
else
45+
{
46+
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
47+
}
4348
}
4449
}
4550
}
@@ -48,26 +53,39 @@ public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
4853
{
4954
lock (_shutdownLock)
5055
{
51-
if (_stream.CanWrite)
56+
if (_stream != null)
5257
{
53-
_shutdownWriteReason = abortReason;
54-
_log.StreamAbortWrite(this, errorCode, abortReason.Message);
55-
_stream.AbortWrite(errorCode);
56-
}
57-
else
58-
{
59-
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
58+
if (_stream.CanWrite)
59+
{
60+
_shutdownWriteReason = abortReason;
61+
_log.StreamAbortWrite(this, errorCode, abortReason.Message);
62+
_stream.AbortWrite(errorCode);
63+
}
64+
else
65+
{
66+
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
67+
}
6068
}
6169
}
6270
}
6371

72+
public void OnCompleted(Func<object, Task> callback, object state)
73+
{
74+
if (_onCompleted == null)
75+
{
76+
_onCompleted = new Stack<KeyValuePair<Func<object, Task>, object>>();
77+
}
78+
_onCompleted.Push(new KeyValuePair<Func<object, Task>, object>(callback, state));
79+
}
80+
6481
private void InitializeFeatures()
6582
{
6683
_currentIPersistentStateFeature = this;
6784
_currentIStreamDirectionFeature = this;
6885
_currentIProtocolErrorCodeFeature = this;
6986
_currentIStreamIdFeature = this;
7087
_currentIStreamAbortFeature = this;
88+
_currentIConnectionCompleteFeature = this;
7189
}
7290
}
7391
}

0 commit comments

Comments
 (0)