Skip to content

Commit 6f197a9

Browse files
authored
Using IAsyncEnumerable in the .NET Client (#8935)
1 parent 69f4b6d commit 6f197a9

File tree

8 files changed

+760
-3
lines changed

8 files changed

+760
-3
lines changed

src/SignalR/clients/csharp/Client.Core/ref/Microsoft.AspNetCore.SignalR.Client.Core.csproj

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<!-- This file is automatically generated. -->
22
<Project Sdk="Microsoft.NET.Sdk">
33
<PropertyGroup>
4-
<TargetFrameworks>netstandard2.0</TargetFrameworks>
4+
<TargetFrameworks>netstandard2.0;netcoreapp3.0</TargetFrameworks>
55
</PropertyGroup>
66
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
77
<Compile Include="Microsoft.AspNetCore.SignalR.Client.Core.netstandard2.0.cs" />
@@ -11,4 +11,12 @@
1111
<Reference Include="Microsoft.Extensions.Logging" />
1212
<Reference Include="System.Threading.Channels" />
1313
</ItemGroup>
14+
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.0'">
15+
<Compile Include="Microsoft.AspNetCore.SignalR.Client.Core.netcoreapp3.0.cs" />
16+
<Reference Include="Microsoft.AspNetCore.SignalR.Common" />
17+
<Reference Include="Microsoft.AspNetCore.SignalR.Protocols.NewtonsoftJson" />
18+
<Reference Include="Microsoft.Extensions.DependencyInjection" />
19+
<Reference Include="Microsoft.Extensions.Logging" />
20+
<Reference Include="System.Threading.Channels" />
21+
</ItemGroup>
1422
</Project>

src/SignalR/clients/csharp/Client.Core/ref/Microsoft.AspNetCore.SignalR.Client.Core.netcoreapp3.0.cs

Lines changed: 165 additions & 0 deletions
Large diffs are not rendered by default.

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using Microsoft.AspNetCore.Connections.Features;
1818
using Microsoft.AspNetCore.Internal;
1919
using Microsoft.AspNetCore.SignalR.Client.Internal;
20+
using Microsoft.AspNetCore.SignalR.Internal;
2021
using Microsoft.AspNetCore.SignalR.Protocol;
2122
using Microsoft.Extensions.Logging;
2223
using Microsoft.Extensions.Logging.Abstractions;
@@ -436,6 +437,46 @@ private async Task StopAsyncCore(bool disposing)
436437
}
437438
}
438439

440+
#if NETCOREAPP3_0
441+
442+
/// <summary>
443+
/// Invokes a streaming hub method on the server using the specified method name, return type and arguments.
444+
/// </summary>
445+
/// <typeparam name="TResult">The return type of the streaming server method.</typeparam>
446+
/// <param name="methodName">The name of the server method to invoke.</param>
447+
/// <param name="args">The arguments used to invoke the server method.</param>
448+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
449+
/// <returns>
450+
/// A <see cref="IAsyncEnumerable{TResult}"/> that represents the stream.
451+
/// </returns>
452+
public IAsyncEnumerable<TResult> StreamAsyncCore<TResult>(string methodName, object[] args, CancellationToken cancellationToken = default)
453+
{
454+
var cts = cancellationToken.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) : new CancellationTokenSource();
455+
var stream = CastIAsyncEnumerable<TResult>(methodName, args, cts);
456+
var cancelableStream = AsyncEnumerableAdapters.MakeCancelableTypedAsyncEnumerable(stream, cts);
457+
return cancelableStream;
458+
}
459+
460+
private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(string methodName, object[] args, CancellationTokenSource cts)
461+
{
462+
var reader = await StreamAsChannelCoreAsync(methodName, typeof(T), args, cts.Token);
463+
try
464+
{
465+
while (await reader.WaitToReadAsync(cts.Token))
466+
{
467+
while (reader.TryRead(out var item))
468+
{
469+
yield return (T)item;
470+
}
471+
}
472+
}
473+
finally
474+
{
475+
cts.Dispose();
476+
}
477+
}
478+
#endif
479+
439480
private async Task<ChannelReader<object>> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
440481
{
441482
async Task OnStreamCanceled(InvocationRequest irq)

src/SignalR/clients/csharp/Client.Core/src/HubConnectionExtensions.StreamAsync.cs

Lines changed: 242 additions & 0 deletions
Large diffs are not rendered by default.

src/SignalR/clients/csharp/Client.Core/src/Microsoft.AspNetCore.SignalR.Client.Core.csproj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<Description>Client for ASP.NET Core SignalR</Description>
5-
<TargetFramework>netstandard2.0</TargetFramework>
5+
<TargetFrameworks>netstandard2.0;netcoreapp3.0</TargetFrameworks>
66
<RootNamespace>Microsoft.AspNetCore.SignalR.Client</RootNamespace>
77
<IsShippingPackage>true</IsShippingPackage>
88
</PropertyGroup>
@@ -13,6 +13,7 @@
1313
<Compile Include="$(SignalRSharedSourceRoot)PipeWriterStream.cs" Link="PipeWriterStream.cs" />
1414
<Compile Include="$(SignalRSharedSourceRoot)ReflectionHelper.cs" Link="ReflectionHelper.cs" />
1515
<Compile Include="$(SignalRSharedSourceRoot)TimerAwaitable.cs" Link="Internal\TimerAwaitable.cs" />
16+
<Compile Include="$(SignalRSharedSourceRoot)AsyncEnumerableAdapters.cs" Link="Internal\AsyncEnumerableAdapters.cs" />
1617
</ItemGroup>
1718

1819
<ItemGroup>

src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,203 @@ public async Task CanInvokeFromOnHandler(string protocolName, HttpTransportType
322322
}
323323
}
324324

325+
[Theory]
326+
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
327+
[LogLevel(LogLevel.Trace)]
328+
public async Task StreamAsyncCoreTest(string protocolName, HttpTransportType transportType, string path)
329+
{
330+
var protocol = HubProtocols[protocolName];
331+
using (StartServer<Startup>(out var server))
332+
{
333+
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
334+
try
335+
{
336+
await connection.StartAsync().OrTimeout();
337+
var expectedValue = 0;
338+
var streamTo = 5;
339+
var asyncEnumerable = connection.StreamAsyncCore<int>("Stream", new object[] { streamTo });
340+
await foreach (var streamValue in asyncEnumerable)
341+
{
342+
Assert.Equal(expectedValue, streamValue);
343+
expectedValue++;
344+
}
345+
346+
Assert.Equal(streamTo, expectedValue);
347+
}
348+
catch (Exception ex)
349+
{
350+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
351+
throw;
352+
}
353+
finally
354+
{
355+
await connection.DisposeAsync().OrTimeout();
356+
}
357+
}
358+
}
359+
360+
[Theory]
361+
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
362+
[LogLevel(LogLevel.Trace)]
363+
public async Task StreamAsyncTest(string protocolName, HttpTransportType transportType, string path)
364+
{
365+
var protocol = HubProtocols[protocolName];
366+
using (StartServer<Startup>(out var server))
367+
{
368+
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
369+
try
370+
{
371+
await connection.StartAsync().OrTimeout();
372+
var expectedValue = 0;
373+
var streamTo = 5;
374+
var asyncEnumerable = connection.StreamAsync<int>("Stream", streamTo);
375+
await foreach (var streamValue in asyncEnumerable)
376+
{
377+
Assert.Equal(expectedValue, streamValue);
378+
expectedValue++;
379+
}
380+
381+
Assert.Equal(streamTo, expectedValue);
382+
}
383+
catch (Exception ex)
384+
{
385+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
386+
throw;
387+
}
388+
finally
389+
{
390+
await connection.DisposeAsync().OrTimeout();
391+
}
392+
}
393+
}
394+
395+
[Theory]
396+
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
397+
[LogLevel(LogLevel.Trace)]
398+
public async Task StreamAsyncDoesNotStartIfTokenAlreadyCanceled(string protocolName, HttpTransportType transportType, string path)
399+
{
400+
bool ExpectedErrors(WriteContext writeContext)
401+
{
402+
return writeContext.LoggerName == DefaultHubDispatcherLoggerName &&
403+
writeContext.EventId.Name == "FailedInvokingHubMethod";
404+
}
405+
var protocol = HubProtocols[protocolName];
406+
using (StartServer<Startup>(out var server, ExpectedErrors))
407+
{
408+
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
409+
try
410+
{
411+
await connection.StartAsync().OrTimeout();
412+
413+
var cts = new CancellationTokenSource();
414+
cts.Cancel();
415+
416+
var ex = Assert.ThrowsAsync<OperationCanceledException>(async () =>
417+
{
418+
var stream = connection.StreamAsync<int>("Stream", 5, cts.Token);
419+
await foreach (var streamValue in stream)
420+
{
421+
Assert.True(false, "Expected an exception from the streaming invocation.");
422+
}
423+
});
424+
}
425+
catch (Exception ex)
426+
{
427+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
428+
throw;
429+
}
430+
finally
431+
{
432+
await connection.DisposeAsync().OrTimeout();
433+
}
434+
}
435+
}
436+
437+
[Theory]
438+
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
439+
[LogLevel(LogLevel.Trace)]
440+
public async Task StreamAsyncCanBeCanceled(string protocolName, HttpTransportType transportType, string path)
441+
{
442+
var protocol = HubProtocols[protocolName];
443+
using (StartServer<Startup>(out var server))
444+
{
445+
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
446+
try
447+
{
448+
await connection.StartAsync().OrTimeout();
449+
450+
var cts = new CancellationTokenSource();
451+
452+
var stream = connection.StreamAsync<int>("Stream", 5, cts.Token);
453+
var results = new List<int>();
454+
455+
var enumerator = stream.GetAsyncEnumerator();
456+
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
457+
{
458+
while (await enumerator.MoveNextAsync())
459+
{
460+
results.Add(enumerator.Current);
461+
cts.Cancel();
462+
}
463+
});
464+
465+
Assert.Single(results);
466+
Assert.Equal(0, results[0]);
467+
}
468+
catch (Exception ex)
469+
{
470+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
471+
throw;
472+
}
473+
finally
474+
{
475+
await connection.DisposeAsync().OrTimeout();
476+
}
477+
}
478+
}
479+
480+
[Theory]
481+
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
482+
[LogLevel(LogLevel.Trace)]
483+
public async Task StreamAsyncWithException(string protocolName, HttpTransportType transportType, string path)
484+
{
485+
bool ExpectedErrors(WriteContext writeContext)
486+
{
487+
return writeContext.LoggerName == DefaultHubDispatcherLoggerName &&
488+
writeContext.EventId.Name == "FailedInvokingHubMethod";
489+
}
490+
491+
var protocol = HubProtocols[protocolName];
492+
using (StartServer<Startup>(out var server, ExpectedErrors))
493+
{
494+
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
495+
try
496+
{
497+
await connection.StartAsync().OrTimeout();
498+
var asyncEnumerable = connection.StreamAsync<int>("StreamException");
499+
var ex = await Assert.ThrowsAsync<HubException>(async () =>
500+
{
501+
await foreach (var streamValue in asyncEnumerable)
502+
{
503+
Assert.True(false, "Expected an exception from the streaming invocation.");
504+
}
505+
});
506+
507+
Assert.Equal("An unexpected error occurred invoking 'StreamException' on the server. InvalidOperationException: Error occurred while streaming.", ex.Message);
508+
509+
}
510+
catch (Exception ex)
511+
{
512+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
513+
throw;
514+
}
515+
finally
516+
{
517+
await connection.DisposeAsync().OrTimeout();
518+
}
519+
}
520+
}
521+
325522
[Theory]
326523
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
327524
[LogLevel(LogLevel.Trace)]
@@ -465,6 +662,48 @@ public async Task CanStreamToAndFromClientInSameInvocation(string protocolName,
465662
}
466663
}
467664

665+
[Theory]
666+
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
667+
[LogLevel(LogLevel.Trace)]
668+
public async Task StreamAsyncCanBeCanceledThroughGetEnumerator(string protocolName, HttpTransportType transportType, string path)
669+
{
670+
var protocol = HubProtocols[protocolName];
671+
using (StartServer<Startup>(out var server))
672+
{
673+
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
674+
try
675+
{
676+
await connection.StartAsync().OrTimeout();
677+
var stream = connection.StreamAsync<int>("Stream", 5 );
678+
var results = new List<int>();
679+
680+
var cts = new CancellationTokenSource();
681+
682+
var enumerator = stream.GetAsyncEnumerator(cts.Token);
683+
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
684+
{
685+
while (await enumerator.MoveNextAsync())
686+
{
687+
results.Add(enumerator.Current);
688+
cts.Cancel();
689+
}
690+
});
691+
692+
Assert.Single(results);
693+
Assert.Equal(0, results[0]);
694+
}
695+
catch (Exception ex)
696+
{
697+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
698+
throw;
699+
}
700+
finally
701+
{
702+
await connection.DisposeAsync().OrTimeout();
703+
}
704+
}
705+
}
706+
468707
[Theory]
469708
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
470709
[LogLevel(LogLevel.Trace)]

0 commit comments

Comments
 (0)