@@ -14,6 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
1414 /// </summary>
1515 internal sealed class Http1UpgradeMessageBody : Http1MessageBody
1616 {
17+ private int _userCanceled ;
18+
1719 public Http1UpgradeMessageBody ( Http1Connection context )
1820 : base ( context )
1921 {
@@ -26,13 +28,13 @@ public Http1UpgradeMessageBody(Http1Connection context)
2628 public override ValueTask < ReadResult > ReadAsync ( CancellationToken cancellationToken = default )
2729 {
2830 ThrowIfCompleted ( ) ;
29- return _context . Input . ReadAsync ( cancellationToken ) ;
31+ return ReadAsyncInternal ( cancellationToken ) ;
3032 }
3133
3234 public override bool TryRead ( out ReadResult result )
3335 {
3436 ThrowIfCompleted ( ) ;
35- return _context . Input . TryRead ( out result ) ;
37+ return TryReadInternal ( out result ) ;
3638 }
3739
3840 public override void AdvanceTo ( SequencePosition consumed )
@@ -54,6 +56,7 @@ public override void Complete(Exception exception)
5456
5557 public override void CancelPendingRead ( )
5658 {
59+ Interlocked . Exchange ( ref _userCanceled , 1 ) ;
5760 _context . Input . CancelPendingRead ( ) ;
5861 }
5962
@@ -69,12 +72,49 @@ public override Task StopAsync()
6972
7073 public override bool TryReadInternal ( out ReadResult readResult )
7174 {
72- return _context . Input . TryRead ( out readResult ) ;
75+ // Ignore the canceled readResult unless it was canceled by the user.
76+ do
77+ {
78+ if ( ! _context . Input . TryRead ( out readResult ) )
79+ {
80+ return false ;
81+ }
82+ } while ( readResult . IsCanceled && Interlocked . Exchange ( ref _userCanceled , 0 ) == 0 ) ;
83+
84+ return true ;
7385 }
7486
7587 public override ValueTask < ReadResult > ReadAsyncInternal ( CancellationToken cancellationToken = default )
7688 {
77- return _context . Input . ReadAsync ( cancellationToken ) ;
89+ ReadResult readResult ;
90+
91+ // Ignore the canceled readResult unless it was canceled by the user.
92+ do
93+ {
94+ var readTask = _context . Input . ReadAsync ( cancellationToken ) ;
95+
96+ if ( ! readTask . IsCompletedSuccessfully )
97+ {
98+ return ReadAsyncInternalAwaited ( readTask , cancellationToken ) ;
99+ }
100+
101+ readResult = readTask . GetAwaiter ( ) . GetResult ( ) ;
102+ } while ( readResult . IsCanceled && Interlocked . Exchange ( ref _userCanceled , 0 ) == 0 ) ;
103+
104+ return new ValueTask < ReadResult > ( readResult ) ;
105+ }
106+
107+ private async ValueTask < ReadResult > ReadAsyncInternalAwaited ( ValueTask < ReadResult > readTask , CancellationToken cancellationToken = default )
108+ {
109+ var readResult = await readTask ;
110+
111+ // Ignore the canceled readResult unless it was canceled by the user.
112+ while ( readResult . IsCanceled && Interlocked . Exchange ( ref _userCanceled , 0 ) == 0 )
113+ {
114+ readResult = await _context . Input . ReadAsync ( cancellationToken ) ;
115+ }
116+
117+ return readResult ;
78118 }
79119 }
80120}
0 commit comments