Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!-- This file is automatically generated. -->
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netcoreapp3.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<Compile Include="Microsoft.AspNetCore.SignalR.Client.Core.netstandard2.0.cs" />
Expand All @@ -11,4 +11,12 @@
<Reference Include="Microsoft.Extensions.Logging" />
<Reference Include="System.Threading.Channels" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.0'">
<Compile Include="Microsoft.AspNetCore.SignalR.Client.Core.netcoreapp3.0.cs" />
<Reference Include="Microsoft.AspNetCore.SignalR.Common" />
<Reference Include="Microsoft.AspNetCore.SignalR.Protocols.NewtonsoftJson" />
<Reference Include="Microsoft.Extensions.DependencyInjection" />
<Reference Include="Microsoft.Extensions.Logging" />
<Reference Include="System.Threading.Channels" />
</ItemGroup>
</Project>

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.SignalR.Client.Internal;
using Microsoft.AspNetCore.SignalR.Internal;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand Down Expand Up @@ -436,6 +437,46 @@ private async Task StopAsyncCore(bool disposing)
}
}

#if NETCOREAPP3_0

/// <summary>
/// Invokes a streaming hub method on the server using the specified method name, return type and arguments.
/// </summary>
/// <typeparam name="TResult">The return type of the streaming server method.</typeparam>
/// <param name="methodName">The name of the server method to invoke.</param>
/// <param name="args">The arguments used to invoke the server method.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param>
/// <returns>
/// A <see cref="IAsyncEnumerable{TResult}"/> that represents the stream.
/// </returns>
public IAsyncEnumerable<TResult> StreamAsyncCore<TResult>(string methodName, object[] args, CancellationToken cancellationToken = default)
{
var cts = cancellationToken.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) : new CancellationTokenSource();
var stream = CastIAsyncEnumerable<TResult>(methodName, args, cts);
var cancelableStream = AsyncEnumerableAdapters.MakeCancelableTypedAsyncEnumerable(stream, cts);
return cancelableStream;
}

private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(string methodName, object[] args, CancellationTokenSource cts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename StreamAsAsyncEnumerableCore now

{
var reader = await StreamAsChannelCoreAsync(methodName, typeof(T), args, cts.Token);
try
{
while (await reader.WaitToReadAsync(cts.Token))
{
while (reader.TryRead(out var item))
{
yield return (T)item;
}
}
}
finally
{
cts.Dispose();
}
}
#endif

private async Task<ChannelReader<object>> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
async Task OnStreamCanceled(InvocationRequest irq)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>Client for ASP.NET Core SignalR</Description>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFrameworks>netstandard2.0;netcoreapp3.0</TargetFrameworks>
<RootNamespace>Microsoft.AspNetCore.SignalR.Client</RootNamespace>
<IsShippingPackage>true</IsShippingPackage>
</PropertyGroup>
Expand All @@ -13,6 +13,7 @@
<Compile Include="$(SignalRSharedSourceRoot)PipeWriterStream.cs" Link="PipeWriterStream.cs" />
<Compile Include="$(SignalRSharedSourceRoot)ReflectionHelper.cs" Link="ReflectionHelper.cs" />
<Compile Include="$(SignalRSharedSourceRoot)TimerAwaitable.cs" Link="Internal\TimerAwaitable.cs" />
<Compile Include="$(SignalRSharedSourceRoot)AsyncEnumerableAdapters.cs" Link="Internal\AsyncEnumerableAdapters.cs" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,203 @@ public async Task CanInvokeFromOnHandler(string protocolName, HttpTransportType
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
public async Task StreamAsyncCoreTest(string protocolName, HttpTransportType transportType, string path)
{
var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server))
{
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
try
{
await connection.StartAsync().OrTimeout();
var expectedValue = 0;
var streamTo = 5;
var asyncEnumerable = connection.StreamAsyncCore<int>("Stream", new object[] { streamTo });
await foreach (var streamValue in asyncEnumerable)
{
Assert.Equal(expectedValue, streamValue);
expectedValue++;
}

Assert.Equal(streamTo, expectedValue);
}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
public async Task StreamAsyncTest(string protocolName, HttpTransportType transportType, string path)
{
var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server))
{
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
try
{
await connection.StartAsync().OrTimeout();
var expectedValue = 0;
var streamTo = 5;
var asyncEnumerable = connection.StreamAsync<int>("Stream", streamTo);
await foreach (var streamValue in asyncEnumerable)
{
Assert.Equal(expectedValue, streamValue);
expectedValue++;
}

Assert.Equal(streamTo, expectedValue);
}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
public async Task StreamAsyncDoesNotStartIfTokenAlreadyCanceled(string protocolName, HttpTransportType transportType, string path)
{
bool ExpectedErrors(WriteContext writeContext)
{
return writeContext.LoggerName == DefaultHubDispatcherLoggerName &&
writeContext.EventId.Name == "FailedInvokingHubMethod";
}
var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server, ExpectedErrors))
{
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
try
{
await connection.StartAsync().OrTimeout();

var cts = new CancellationTokenSource();
cts.Cancel();

var ex = Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
var stream = connection.StreamAsync<int>("Stream", 5, cts.Token);
await foreach (var streamValue in stream)
{
Assert.True(false, "Expected an exception from the streaming invocation.");
}
});
}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
public async Task StreamAsyncCanBeCanceled(string protocolName, HttpTransportType transportType, string path)
{
var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server))
{
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
try
{
await connection.StartAsync().OrTimeout();

var cts = new CancellationTokenSource();

var stream = connection.StreamAsync<int>("Stream", 5, cts.Token);
var results = new List<int>();

var enumerator = stream.GetAsyncEnumerator();
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
while (await enumerator.MoveNextAsync())
{
results.Add(enumerator.Current);
cts.Cancel();
}
});

Assert.Single(results);
Assert.Equal(0, results[0]);
}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
public async Task StreamAsyncWithException(string protocolName, HttpTransportType transportType, string path)
{
bool ExpectedErrors(WriteContext writeContext)
{
return writeContext.LoggerName == DefaultHubDispatcherLoggerName &&
writeContext.EventId.Name == "FailedInvokingHubMethod";
}

var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server, ExpectedErrors))
{
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
try
{
await connection.StartAsync().OrTimeout();
var asyncEnumerable = connection.StreamAsync<int>("StreamException");
var ex = await Assert.ThrowsAsync<HubException>(async () =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: OrTimeout

{
await foreach (var streamValue in asyncEnumerable)
{
Assert.True(false, "Expected an exception from the streaming invocation.");
}
});

Assert.Equal("An unexpected error occurred invoking 'StreamException' on the server. InvalidOperationException: Error occurred while streaming.", ex.Message);

}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
Expand Down Expand Up @@ -465,6 +662,48 @@ public async Task CanStreamToAndFromClientInSameInvocation(string protocolName,
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
public async Task StreamAsyncCanBeCanceledThroughGetEnumerator(string protocolName, HttpTransportType transportType, string path)
{
var protocol = HubProtocols[protocolName];
using (StartServer<Startup>(out var server))
{
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory);
try
{
await connection.StartAsync().OrTimeout();
var stream = connection.StreamAsync<int>("Stream", 5 );
var results = new List<int>();

var cts = new CancellationTokenSource();

var enumerator = stream.GetAsyncEnumerator(cts.Token);
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
while (await enumerator.MoveNextAsync())
{
results.Add(enumerator.Current);
cts.Cancel();
}
});

Assert.Single(results);
Assert.Equal(0, results[0]);
}
catch (Exception ex)
{
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
throw;
}
finally
{
await connection.DisposeAsync().OrTimeout();
}
}
}

[Theory]
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))]
[LogLevel(LogLevel.Trace)]
Expand Down
Loading