Skip to content
Merged
71 changes: 71 additions & 0 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,77 @@ private async Task StopAsyncCore(bool disposing)
}
}

#if NETCOREAPP3_0
public async IAsyncEnumerable<object> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just did

return await StreamAsChannelCoreAsync(methodName, returnType, args, cancellationToken).ReadAllAsync();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we should actually be doing here is using the MakeCancelableAsyncEnumerableFromChannel<T> method on the AsyncEnumerableAdapters class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can just share the source for that file for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this method generic and have it return IAsyncEnumerable<TResult> so you don't need to convert later in HubConnectionExtensions.StreamAsyncCore().

{
async Task OnStreamCanceled(InvocationRequest irq)
{
// We need to take the connection lock in order to ensure we a) have a connection and b) are the only one accessing the write end of the pipe.
await WaitConnectionLockAsync();
try
{
if (_connectionState != null)
{
Log.SendingCancellation(_logger, irq.InvocationId);

// Fire and forget, if it fails that means we aren't connected anymore.
_ = SendHubMessage(new CancelInvocationMessage(irq.InvocationId), irq.CancellationToken);
}
else
{
Log.UnableToSendCancellation(_logger, irq.InvocationId);
}
}
finally
{
ReleaseConnectionLock();
}

// Cancel the invocation
irq.Dispose();
}

var readers = PackageStreamingParams(ref args, out var streamIds);

CheckDisposed();
await WaitConnectionLockAsync();

ChannelReader<object> channel;
try
{
CheckDisposed();
CheckConnectionActive(nameof(StreamAsyncCore));
cancellationToken.ThrowIfCancellationRequested();

// I just want an excuse to use 'irq' as a variable name...
var irq = InvocationRequest.Stream(cancellationToken, returnType, _connectionState.GetNextId(), _loggerFactory, this, out channel);
await InvokeStreamCore(methodName, irq, args, streamIds?.ToArray(), cancellationToken);

if (cancellationToken.CanBeCanceled)
{
cancellationToken.Register(state => _ = OnStreamCanceled((InvocationRequest)state), irq);
}
}
finally
{
ReleaseConnectionLock();
}

LaunchStreams(readers, cancellationToken);

//channel.ReadAllAsync() -> Returns IAsyncEnumerable

while (await channel.WaitToReadAsync())
{
while (channel.TryRead(out var streamItem))
{
yield return streamItem;
}
}
}

#endif

private async Task<ChannelReader<object>> StreamAsChannelCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
{
async Task OnStreamCanceled(InvocationRequest irq)
Expand Down
Loading