-
Notifications
You must be signed in to change notification settings - Fork 10.5k
Using IAsyncEnumerable in the .NET Client #8935
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/SignalR/clients/csharp/Client.Core/src/HubConnectionExtensions.StreamAsync.cs
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| #if NETCOREAPP3_0 | ||
| public async IAsyncEnumerable<object> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just did
return await StreamAsChannelCoreAsync(methodName, returnType, args, cancellationToken).ReadAllAsync();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we should actually be doing here is using the MakeCancelableAsyncEnumerableFromChannel<T> method on the AsyncEnumerableAdapters class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. We can just share the source for that file for now.
|
Tagging @divega because I know you're passionate about |
| public async IAsyncEnumerable<object> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) | ||
| { | ||
| var stream = (await StreamAsChannelCoreAsyncCore(methodName, returnType, args, cancellationToken)).ReadAllAsync(); | ||
| await foreach(var item in AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable(stream, cancellationToken)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need the await foreach here anymore right? MakeCancelableAsyncEnumerable just returns the think we need. It can just be like this:
| await foreach(var item in AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable(stream, cancellationToken)) { | |
| return AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable(stream, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah wait, I see why. Because that would end up being a Task<IAsyncEnumerable<T>> when we want an IAsyncEnumerable<T>. Unfortunate, but makes sense. Disregard!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are going to be doing this a lot (creating IAsyncEnumerable<T> from async methods), you could consider creating a helper like MakeCancelableAsyncEnumerableFromChannel but that takes a Task<Channel<T>> and handles it to an IAsyncEnumerable<T> that defers the await until the first call to MoveNextAsync. Then you can return that directly without creating an iterator method.
The only observable difference would be that any exception that awaiting StreamAsChannelCoreAsyncCore would cause will be deferred.
This is what we do with ExecuteDataReaderAsync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a cool idea. Probably worth making a new issue to track it. We could convert Task<ChannelReader<T>> to ChannelReader<T> in the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's something we can look at if we encounter an issue. Let's get this rolling now and see how it lands :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, one correction to what I said before: You want to pass a Func<Task<Channel<T>>> to allow deferring the call (in this case to StreamAsChannelCoreAsyncCore) completely instead of passing an already initiated Task<T>.
Probably worth making a new issue to track it.
Sure. If you see value. I am back to lurker mode 😄
| public async IAsyncEnumerable<object> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) | ||
| { | ||
| var stream = (await StreamAsChannelCoreAsyncCore(methodName, returnType, args, cancellationToken)).ReadAllAsync(); | ||
| await foreach(var item in AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable(stream, cancellationToken)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of iterating over MakeCancelableAsyncEnumerable()'s return value, just return it directly.
You might want to create another version of the method which actually uses the CancellationToken passed to GetAsyncEnumerator(), and returns IAsyncEnumerable<TResult> instead of IAsyncEnumerable<object>.
Returning a IAsyncEnumerable<TResult> is actually easier since you can always directly return the value from ChannelReader<TResult>.ReadAllAsync().GetAsyncEnumerator() from CancelableAsyncEnumerable<TResult>.GetAsyncEnumerator() without worrying about wrapping the async enumerators for value types with BoxedAsyncEnumerator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of iterating over MakeCancelableAsyncEnumerable()'s return value, just return it directly.
That doesn't really work because of the await of StreamAsChannelCoreAsyncCore beforehand though, right? That's why this has to be transformed into a generator using the yield syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we return the IAsyncEnumerable directly, we'd have to make StreamAsync return Task<IAsyncEnumerable<TResult>> which @anurse and I decided isn't great. I think using an iterator method is OK then.
We still should use a version of MakeCancelableAsyncEnumerable<TResult> that returns IAsyncEnumerable<TResult> and uses the CancellationToken passed through to GetAsyncEnumerator(CancellationToken) linking it with the CancellationToken from StreamAsyncCore if both are non-default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task<IAsyncEnumerable> which @anurse and I decided isn't great. I think using an iterator method is OK then.
The problem with this is that if we use an iterator and yield return from the result of MakeCancelableAsyncEnumerable we actually lose the cancellation token which defeats the purpose.
We still should use a version of MakeCancelableAsyncEnumerable that returns IAsyncEnumerable
I worked down this route and realized that the streaming methods like StreamAsChannelCoreAsync all return ChannelReader<object>. This means that no matter what we would have to iterate and cast on every item.
| } | ||
|
|
||
| #if NETCOREAPP3_0 | ||
| public async IAsyncEnumerable<object> StreamAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this method generic and have it return IAsyncEnumerable<TResult> so you don't need to convert later in HubConnectionExtensions.StreamAsyncCore().
src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs
Show resolved
Hide resolved
| <Compile Include="$(SignalRSharedSourceRoot)PipeWriterStream.cs" Link="PipeWriterStream.cs" /> | ||
| <Compile Include="$(SignalRSharedSourceRoot)ReflectionHelper.cs" Link="ReflectionHelper.cs" /> | ||
| <Compile Include="$(SignalRSharedSourceRoot)TimerAwaitable.cs" Link="Internal\TimerAwaitable.cs" /> | ||
| <Compile Include="$(SignalRRoot)server\Core\src\Internal\AsyncEnumerableAdapters.cs" Link="Internal\AsyncEnumerableAdapters.cs" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move AsyncEnumerableAdapters to SignalRSharedSourceRoot if you can.
halter73
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like us to create and use a new AsyncEnumerableAdapters .MakeCancelableAsyncEnumerable<TResult>() method that uses the CancellationToken from GetAsyncEnumerator().
| /// <returns> | ||
| /// A <see cref="IAsyncEnumerable{TResult}"/> that represents the stream. | ||
| /// </returns> | ||
| public async Task<IAsyncEnumerable<TResult>> StreamAsyncCore<TResult>(string methodName, object[] args, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to return a Task<IAsyncEnumerable> to preserve the cancellation logic from the CancelableTypedAsyncEnumerable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, I'd rather just return the IAsyncEnumerable but otherwise we lose the cancel logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this more?
Having StreamAsync return a Task<IAsyncEnumerable> is more consistent with StreamAsChannelAsync returning Task<ChannelReader<TResult>>, but in an ideal world where we had infinite development time neither would return a Task in 3.0.
Is there some reason cancellation cannot work if StreamAsync doesn't return a Task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we were doing something similar to
var stream= AsyncEnumerableAdapters.MakeCancelableTypedAsyncEnumerable(enumerable, cancellationToken)
await foreach(var item in stream)
{
yield return stream
}As I understand it this new AsyncEnumerable that from these returns wont have the previous cancellationToken that was passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just move the call to StreamAsChannelCoreAsync into CastIAsyncEnumerable and await it at the beginning. Then StreamAsyncCore doesn't need to be async or Task-returning.
src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/Core/src/Internal/AsyncEnumerableAdapters.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/Core/src/Internal/AsyncEnumerableAdapters.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/Core/src/Internal/AsyncEnumerableAdapters.cs
Outdated
Show resolved
Hide resolved
We found that the implementation of |
|
This is ready for review again |
src/SignalR/clients/csharp/Client.Core/ref/Microsoft.AspNetCore.SignalR.Client.Core.csproj
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client.Core/ref/Microsoft.AspNetCore.SignalR.Client.Core.csproj
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client.Core/src/Microsoft.AspNetCore.SignalR.Client.Core.csproj
Outdated
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs
Outdated
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs
Outdated
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs
Outdated
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(ChannelReader<object> reader, CancellationTokenSource cts) | ||
| private async IAsyncEnumerable<T> CastIAsyncEnumerable<T>(string methodName, object[] args, CancellationTokenSource cts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename StreamAsAsyncEnumerableCore now
|
This comment was made automatically. If there is a problem contact ryanbrandenburg. I've triaged the above build. |
Fixes: #5376
We finally get to use the
StreamAsyncmethod name 😄Currently what we have is
This is adding support for