@@ -14,6 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
14
14
/// </summary>
15
15
internal sealed class Http1UpgradeMessageBody : Http1MessageBody
16
16
{
17
+ private int _userCanceled ;
18
+
17
19
public Http1UpgradeMessageBody ( Http1Connection context , bool keepAlive )
18
20
: base ( context , keepAlive )
19
21
{
@@ -23,13 +25,26 @@ public Http1UpgradeMessageBody(Http1Connection context, bool keepAlive)
23
25
// This returns IsEmpty so we can avoid draining the body (since it's basically an endless stream)
24
26
public override bool IsEmpty => true ;
25
27
28
+ public override ValueTask < ReadResult > ReadAsync ( CancellationToken cancellationToken = default )
29
+ {
30
+ ThrowIfReaderCompleted ( ) ;
31
+ return ReadAsyncInternal ( cancellationToken ) ;
32
+ }
33
+
34
+ public override bool TryRead ( out ReadResult result )
35
+ {
36
+ ThrowIfReaderCompleted ( ) ;
37
+ return TryReadInternal ( out result ) ;
38
+ }
39
+
26
40
public override void AdvanceTo ( SequencePosition consumed , SequencePosition examined )
27
41
{
28
42
_context . Input . AdvanceTo ( consumed , examined ) ;
29
43
}
30
44
31
45
public override void CancelPendingRead ( )
32
46
{
47
+ Interlocked . Exchange ( ref _userCanceled , 1 ) ;
33
48
_context . Input . CancelPendingRead ( ) ;
34
49
}
35
50
@@ -45,12 +60,49 @@ public override ValueTask StopAsync()
45
60
46
61
public override bool TryReadInternal ( out ReadResult readResult )
47
62
{
48
- return _context . Input . TryRead ( out readResult ) ;
63
+ // Ignore the canceled readResult unless it was canceled by the user.
64
+ do
65
+ {
66
+ if ( ! _context . Input . TryRead ( out readResult ) )
67
+ {
68
+ return false ;
69
+ }
70
+ } while ( readResult . IsCanceled && Interlocked . Exchange ( ref _userCanceled , 0 ) == 0 ) ;
71
+
72
+ return true ;
49
73
}
50
74
51
75
public override ValueTask < ReadResult > ReadAsyncInternal ( CancellationToken cancellationToken = default )
52
76
{
53
- return _context . Input . ReadAsync ( cancellationToken ) ;
77
+ ReadResult readResult ;
78
+
79
+ // Ignore the canceled readResult unless it was canceled by the user.
80
+ do
81
+ {
82
+ var readTask = _context . Input . ReadAsync ( cancellationToken ) ;
83
+
84
+ if ( ! readTask . IsCompletedSuccessfully )
85
+ {
86
+ return ReadAsyncInternalAwaited ( readTask , cancellationToken ) ;
87
+ }
88
+
89
+ readResult = readTask . GetAwaiter ( ) . GetResult ( ) ;
90
+ } while ( readResult . IsCanceled && Interlocked . Exchange ( ref _userCanceled , 0 ) == 0 ) ;
91
+
92
+ return new ValueTask < ReadResult > ( readResult ) ;
93
+ }
94
+
95
+ private async ValueTask < ReadResult > ReadAsyncInternalAwaited ( ValueTask < ReadResult > readTask , CancellationToken cancellationToken = default )
96
+ {
97
+ var readResult = await readTask ;
98
+
99
+ // Ignore the canceled readResult unless it was canceled by the user.
100
+ while ( readResult . IsCanceled && Interlocked . Exchange ( ref _userCanceled , 0 ) == 0 )
101
+ {
102
+ readResult = await _context . Input . ReadAsync ( cancellationToken ) ;
103
+ }
104
+
105
+ return readResult ;
54
106
}
55
107
}
56
108
}
0 commit comments