@@ -307,6 +307,40 @@ await connection.ReceiveJsonMessage(
307307 }
308308 }
309309
310+ [ Fact ]
311+ public async Task CanCancelTokenDuringStream_SendsCancelInvocation ( )
312+ {
313+ using ( StartVerifiableLog ( ) )
314+ {
315+ var connection = new TestConnection ( ) ;
316+ var hubConnection = CreateHubConnection ( connection , loggerFactory : LoggerFactory ) ;
317+
318+ await hubConnection . StartAsync ( ) . DefaultTimeout ( ) ;
319+
320+ using var cts = new CancellationTokenSource ( ) ;
321+ var asyncEnumerable = hubConnection . StreamAsync < int > ( "Stream" , 1 , cts . Token ) ;
322+
323+ await using var e = asyncEnumerable . GetAsyncEnumerator ( cts . Token ) ;
324+ var task = e . MoveNextAsync ( ) ;
325+
326+ var item = await connection . ReadSentJsonAsync ( ) . DefaultTimeout ( ) ;
327+ var invocationId = item [ "invocationId" ] ;
328+ await connection . ReceiveJsonMessage (
329+ new { type = HubProtocolConstants . StreamItemMessageType , invocationId , item = 1 }
330+ ) . DefaultTimeout ( ) ;
331+
332+ await task . DefaultTimeout ( ) ;
333+ cts . Cancel ( ) ;
334+
335+ item = await connection . ReadSentJsonAsync ( ) . DefaultTimeout ( ) ;
336+ Assert . Equal ( HubProtocolConstants . CancelInvocationMessageType , item [ "type" ] ) ;
337+ Assert . Equal ( invocationId , item [ "invocationId" ] ) ;
338+
339+ // Stream on client-side completes on cancellation
340+ await Assert . ThrowsAsync < TaskCanceledException > ( async ( ) => await e . MoveNextAsync ( ) ) . DefaultTimeout ( ) ;
341+ }
342+ }
343+
310344 [ Fact ]
311345 public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages ( )
312346 {
0 commit comments