-
Notifications
You must be signed in to change notification settings - Fork 10.3k
[SignalR] Add client return results #40811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/SignalR/clients/csharp/Client.Core/src/HubConnectionExtensions.OnResult.cs
Show resolved
Hide resolved
src/SignalR/clients/csharp/Client.Core/src/HubConnectionExtensions.OnResult.cs
Show resolved
Hide resolved
src/SignalR/common/Protocols.Json/src/Protocol/JsonHubProtocol.cs
Outdated
Show resolved
Hide resolved
if (resultType == typeof(RawResult)) | ||
{ | ||
Debug.Assert(((RawResult)message.Result).RawSerializedData.IsSingleSegment); | ||
writer.WriteRawValue(((RawResult)message.Result).RawSerializedData.First.Span, skipInputValidation: true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.First.Span? Shouldn't this write the whole thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what the assert is for, currently we're always allocating a byte[] to store the data.
reader.Skip(); | ||
var end = reader.BytesConsumed; | ||
var sequence = input.Slice(start, end - start); | ||
// Review: Technically we could pass the sequence without copying into a new array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit
// Review: Technically we could pass the sequence without copying into a new array | |
// REVIEW: Technically we could pass the sequence without copying into a new array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update this comment now to reflect we're not copying here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to update this?
var token = JToken.Load(reader); | ||
var str = token.ToString(Formatting.None); | ||
result = new RawResult(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(str))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @JamesNK (JRaw?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think JRaw is useful here.
However, this code could be optimized. Rather than ToString + GetBytes, you could serialize the JToken directly to a MemoryStream then ToArray. Or even get the underlying array from the memory stream, trim it as Memory<bytes>
and create a sequence from that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting:
var token = JToken.Load(reader);
using var strm = new MemoryStream();
using var writer = new StreamWriter(strm);
using var jsonTextWriter = new JsonTextWriter(writer);
token.WriteTo(jsonTextWriter);
jsonTextWriter.Flush();
writer.Flush();
Memory<byte> buf;
if (strm.TryGetBuffer(out var segment))
{
buf = segment.Array.AsMemory(segment.Offset, segment.Count);
}
else
{
buf = strm.ToArray();
}
src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs
Outdated
Show resolved
Hide resolved
public RawResult(ReadOnlySequence<byte> rawBytes) | ||
{ | ||
// Review: If we want to use an ArrayPool we would need some sort of release mechanism | ||
RawSerializedData = new ReadOnlySequence<byte>(rawBytes.ToArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we copying? We should make the caller copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can find a clean way to use a pool in this type then we can state in the doc comments that we'll do the copying so they don't need to allocate. But if we decide not to use a pool then we can say callers should copy.
/// <param name="args">The invocation arguments.</param> | ||
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.</param> | ||
/// <returns>The response from the connection.</returns> | ||
public virtual Task<T> InvokeConnectionAsync<T>(string connectionId, string methodName, object?[] args, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need a SupportsClientResults bool so we can throw a nicer error when implementations don't support it. We can check and throw in the ISingleClientProxy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why a new API is needed. I updated the error message to:
$"{GetType().Name} does not support client return values."
so it's clear what the problem is.
_parallelEnabled = parallelEnabled; | ||
} | ||
|
||
private class NotParallelSingleClientProxy : ISingleClientProxy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever!
{ | ||
// Written as a MessagePack 'arr' containing at least these items: | ||
// * A MessagePack 'arr' of 'str's representing the excluded ids | ||
// * [The output of WriteSerializedHubMessage, which is an 'arr'] | ||
// For invocations expecting a result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is non-breaking right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
var connection = _connections[connectionId]; | ||
|
||
// Needs to be unique across servers, easiest way to do that is prefix with connection ID. | ||
var invocationId = $"{connectionId}{Interlocked.Increment(ref _lastInvocationId)}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine as long as it's not being sent all the way to the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is being sent all the way to the client and our idea of storing a tuple of (connectionId, invocationId) won't work.
The invocationId needs to be the main source of truth for identifying a specific invoke as it is used by the HubProtocols when querying for the expected type for deserialization. If we stored as a (connectionId, invocationId) then we won't have the connectionId when asking the dictionary for the type. We could in theory iterate over every item and find the invocationId, but then it's possible to have duplicates for IDs coming from different servers.
TLDR; The invocationId needs to be unique across all servers. There are 3 ways to do this, use the connection ID as part of the invocationId (what I did), use Redis to store a key and have all servers accessing and incrementing it (please no), or generate another unique key like we do for connection IDs to represent the invocationId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't see why any of this would mean the connection id needs to be transmitted back and forth from the client. I could imagine some sort of low-level transport filter that added the client's connection id to the invocation id on the way to the server and strips it on the way out. This wouldn't have to change the internal logic on the server at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is HubProtocol only has the invocationID when asking for the type. If servers don't generate a unique ID across all servers then there could be multiple of the same IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. We could pass the IHubProtocol a connection id outside of the payload, but then that'd require changing the interface which we've avoided. Even though the IHubProtocol does need to support RawResult
anyway. What happens again if you try using client return values with an IHubProtocol that doesn't support it?
Couldn't we pass in a per-connection, IInvocationBinder that adds the connection id prefix back to the invocation id so the IHubProtocol doesn't need to be aware of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting idea! I'll think about it for preview5
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
} | ||
)); | ||
|
||
// TODO: this isn't great |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this races with tokenRegistration?.Dispose();
right? Is there any impact other than not cleaning up the registration as soon as we could?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of any issues. My biggest issue with this is the closure allocation.
src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs
Outdated
Show resolved
Hide resolved
|
||
public Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default) | ||
{ | ||
throw new InvalidOperationException("Client results inside a Hub method requires HubOptions.MaximumParallelInvocationsPerClient to be greater than 1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we abort the connection and log an error if we're awaiting MaximumParallelInvocationsPerClient
client invocations at once even though it's greater than 1? This is a huge footgun. I can see a lot of connections locking up for no apparent reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we file an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Planning on having a list of known items on the original issue once this is merged.
src/SignalR/clients/csharp/Client.Core/src/HubConnectionExtensions.OnResult.cs
Outdated
Show resolved
Hide resolved
await hubConnection.StartAsync().DefaultTimeout(); | ||
|
||
// No result provided | ||
hubConnection.On("Result", () => { }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test for returning null
as the Task<TResult>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean Task<object> () => null
or object () => null
?
Task<object> () => null
will null ref on ConfigureAwait
reader.Skip(); | ||
var end = reader.BytesConsumed; | ||
var sequence = input.Slice(start, end - start); | ||
// Review: Technically we could pass the sequence without copying into a new array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to update this?
src/SignalR/common/Protocols.NewtonsoftJson/src/Protocol/NewtonsoftJsonHubProtocol.cs
Outdated
Show resolved
Hide resolved
} | ||
} | ||
)); | ||
Debug.Assert(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could a misbehaving server cause this to happen? If so, we should just throw instead of assert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well lots of things can happen if there is a misbehaving server :)
We always generate a unique ID for the invocation so this will never be an issue for our servers right now. If we do collide then it's a big problem.
Log.CompletingStream(_logger, completionMessage); | ||
} | ||
// TODO: this relies on the lifetime manager keeping state for the return type after deserializing the message, is that ok? | ||
// InvocationId is always required on CompletionMessage, it's nullable because of the base type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we love new
so much, we should add a non-nullable new public string InvocationId { get; }
thats non-nullable to CompletionMessage
Log.CompletingStream(_logger, streamCompleteMessage); | ||
Log.CompletingStream(_logger, completionMessage); | ||
} | ||
// TODO: this relies on the lifetime manager keeping state for the return type after deserializing the message, is that ok? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine. What's the alternative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding yet another API, #40811 (comment), but I think this is fine as is.
|
||
public Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default) | ||
{ | ||
throw new InvalidOperationException("Client results inside a Hub method requires HubOptions.MaximumParallelInvocationsPerClient to be greater than 1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we file an issue for this?
var connection = _connections[connectionId]; | ||
|
||
// Needs to be unique across servers, easiest way to do that is prefix with connection ID. | ||
var invocationId = $"{connectionId}{Interlocked.Increment(ref _lastInvocationId)}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. We could pass the IHubProtocol a connection id outside of the payload, but then that'd require changing the interface which we've avoided. Even though the IHubProtocol does need to support RawResult
anyway. What happens again if you try using client return values with an IHubProtocol that doesn't support it?
Couldn't we pass in a per-connection, IInvocationBinder that adds the connection id prefix back to the invocation id so the IHubProtocol doesn't need to be aware of it?
This is the majority of the #5280 work.
There is still some cleanup needed like adding logging and better exceptions/exception messages.
And some additional work needed after this change goes in that I'll list in the issue later.