@@ -19,6 +19,7 @@ public class WebSocketsTransport : ITransport
19
19
private readonly ClientWebSocket _webSocket = new ClientWebSocket ( ) ;
20
20
private Channel < byte [ ] , SendMessage > _application ;
21
21
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource ( ) ;
22
+ private readonly CancellationTokenSource _receiveCts = new CancellationTokenSource ( ) ;
22
23
private readonly ILogger _logger ;
23
24
private string _connectionId ;
24
25
@@ -80,7 +81,7 @@ private async Task ReceiveMessages(Uri pollUrl)
80
81
81
82
try
82
83
{
83
- while ( ! _transportCts . Token . IsCancellationRequested )
84
+ while ( ! _receiveCts . Token . IsCancellationRequested )
84
85
{
85
86
const int bufferSize = 4096 ;
86
87
var totalBytes = 0 ;
@@ -91,7 +92,7 @@ private async Task ReceiveMessages(Uri pollUrl)
91
92
var buffer = new ArraySegment < byte > ( new byte [ bufferSize ] ) ;
92
93
93
94
//Exceptions are handled above where the send and receive tasks are being run.
94
- receiveResult = await _webSocket . ReceiveAsync ( buffer , _transportCts . Token ) ;
95
+ receiveResult = await _webSocket . ReceiveAsync ( buffer , _receiveCts . Token ) ;
95
96
if ( receiveResult . MessageType == WebSocketMessageType . Close )
96
97
{
97
98
_logger . WebSocketClosed ( _connectionId , receiveResult . CloseStatus ) ;
@@ -129,15 +130,25 @@ private async Task ReceiveMessages(Uri pollUrl)
129
130
Buffer . BlockCopy ( incomingMessage [ 0 ] . Array , incomingMessage [ 0 ] . Offset , messageBuffer , 0 , incomingMessage [ 0 ] . Count ) ;
130
131
}
131
132
132
- _logger . MessageToApp ( _connectionId , messageBuffer . Length ) ;
133
- while ( await _application . Out . WaitToWriteAsync ( _transportCts . Token ) )
133
+ try
134
134
{
135
- if ( _application . Out . TryWrite ( messageBuffer ) )
135
+ if ( ! _transportCts . Token . IsCancellationRequested )
136
136
{
137
- incomingMessage . Clear ( ) ;
138
- break ;
137
+ _logger . MessageToApp ( _connectionId , messageBuffer . Length ) ;
138
+ while ( await _application . Out . WaitToWriteAsync ( _transportCts . Token ) )
139
+ {
140
+ if ( _application . Out . TryWrite ( messageBuffer ) )
141
+ {
142
+ incomingMessage . Clear ( ) ;
143
+ break ;
144
+ }
145
+ }
139
146
}
140
147
}
148
+ catch ( OperationCanceledException )
149
+ {
150
+ _logger . CancelMessage ( _connectionId ) ;
151
+ }
141
152
}
142
153
}
143
154
catch ( OperationCanceledException )
@@ -198,7 +209,7 @@ private async Task SendMessages(Uri sendUrl)
198
209
finally
199
210
{
200
211
_logger . SendStopped ( _connectionId ) ;
201
- _transportCts . Cancel ( ) ;
212
+ TriggerCancel ( ) ;
202
213
}
203
214
}
204
215
@@ -248,7 +259,7 @@ private async Task CloseWebSocket()
248
259
await _webSocket . CloseOutputAsync ( WebSocketCloseStatus . NormalClosure , null , CancellationToken . None ) ;
249
260
250
261
// shutdown the transport after a timeout in case the server does not send close frame
251
- _transportCts . CancelAfter ( TimeSpan . FromSeconds ( 5 ) ) ;
262
+ TriggerCancel ( ) ;
252
263
}
253
264
}
254
265
catch ( Exception ex )
@@ -258,5 +269,12 @@ private async Task CloseWebSocket()
258
269
_logger . ClosingWebSocketFailed ( _connectionId , ex ) ;
259
270
}
260
271
}
272
+
273
+ private void TriggerCancel ( )
274
+ {
275
+ // Give server 5 seconds to respond with a close frame for graceful close.
276
+ _receiveCts . CancelAfter ( TimeSpan . FromSeconds ( 5 ) ) ;
277
+ _transportCts . Cancel ( ) ;
278
+ }
261
279
}
262
280
}
0 commit comments