@@ -39,7 +39,6 @@ public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposab
39
39
40
40
private readonly AckHandler _ackHandler ;
41
41
private int _internalAckId ;
42
- private ulong _lastInvocationId ;
43
42
44
43
/// <summary>
45
44
/// Constructs the <see cref="RedisHubLifetimeManager{THub}"/> with types from Dependency Injection.
@@ -72,7 +71,7 @@ public RedisHubLifetimeManager(ILogger<RedisHubLifetimeManager<THub>> logger,
72
71
_logger = logger ;
73
72
_options = options . Value ;
74
73
_ackHandler = new AckHandler ( ) ;
75
- _channels = new RedisChannels ( typeof ( THub ) . FullName ! ) ;
74
+ _channels = new RedisChannels ( typeof ( THub ) . FullName ! , _serverName ) ;
76
75
if ( globalHubOptions != null && hubOptions != null )
77
76
{
78
77
_protocol = new RedisProtocol ( new DefaultHubMessageSerializer ( hubProtocolResolver , globalHubOptions . Value . SupportedProtocols , hubOptions . Value . SupportedProtocols ) ) ;
@@ -416,8 +415,8 @@ public override async Task<T> InvokeConnectionAsync<T>(string connectionId, stri
416
415
417
416
var connection = _connections [ connectionId ] ;
418
417
419
- // Needs to be unique across servers, easiest way to do that is prefix with connection ID .
420
- var invocationId = $ " { connectionId } { Interlocked . Increment ( ref _lastInvocationId ) } " ;
418
+ // ID needs to be unique for each invocation and across servers, we generate a GUID every time, that should provide enough uniqueness guarantees .
419
+ var invocationId = GenerateInvocationId ( ) ;
421
420
422
421
using var _ = CancellationTokenUtils . CreateLinkedToken ( cancellationToken ,
423
422
connection ? . ConnectionAborted ?? default , out var linkedToken ) ;
@@ -428,7 +427,7 @@ public override async Task<T> InvokeConnectionAsync<T>(string connectionId, stri
428
427
if ( connection == null )
429
428
{
430
429
// TODO: Need to handle other server going away while waiting for connection result
431
- var messageBytes = _protocol . WriteInvocation ( methodName , args , invocationId , returnChannel : _channels . ReturnResults ( _serverName ) ) ;
430
+ var messageBytes = _protocol . WriteInvocation ( methodName , args , invocationId , returnChannel : _channels . ReturnResults ) ;
432
431
var received = await PublishAsync ( _channels . Connection ( connectionId ) , messageBytes ) ;
433
432
if ( received < 1 )
434
433
{
@@ -674,7 +673,7 @@ private async Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore
674
673
675
674
private async Task SubscribeToReturnResultsAsync ( )
676
675
{
677
- var channel = await _bus ! . SubscribeAsync ( _channels . ReturnResults ( _serverName ) ) ;
676
+ var channel = await _bus ! . SubscribeAsync ( _channels . ReturnResults ) ;
678
677
channel . OnMessage ( ( channelMessage ) =>
679
678
{
680
679
var completion = RedisProtocol . ReadCompletion ( channelMessage . Message ) ;
@@ -700,6 +699,7 @@ private async Task SubscribeToReturnResultsAsync()
700
699
Debug . Assert ( parseSuccess ) ;
701
700
702
701
var invocationInfo = _clientResultsManager . RemoveInvocation ( ( ( CompletionMessage ) hubMessage ! ) . InvocationId ! ) ;
702
+
703
703
invocationInfo ? . Completion ( invocationInfo ? . Tcs ! , ( CompletionMessage ) hubMessage ! ) ;
704
704
} ) ;
705
705
}
@@ -784,6 +784,21 @@ private static string GenerateServerName()
784
784
return $ "{ Environment . MachineName } _{ Guid . NewGuid ( ) : N} ";
785
785
}
786
786
787
+ private static string GenerateInvocationId ( )
788
+ {
789
+ Span < byte > buffer = stackalloc byte [ 16 ] ;
790
+ var success = Guid . NewGuid ( ) . TryWriteBytes ( buffer ) ;
791
+ Debug . Assert ( success ) ;
792
+ // 16 * 4/3 = 21.333 which means base64 encoding will use 22 characters of actual data and 2 characters of padding ('=')
793
+ Span < char > base64 = stackalloc char [ 24 ] ;
794
+ success = Convert . TryToBase64Chars ( buffer , base64 , out var written ) ;
795
+ Debug . Assert ( success ) ;
796
+ Debug . Assert ( written == 24 ) ;
797
+ // Trim the two '=='
798
+ Debug . Assert ( base64 . EndsWith ( "==" ) ) ;
799
+ return new string ( base64 [ ..^ 2 ] ) ;
800
+ }
801
+
787
802
private sealed class LoggerTextWriter : TextWriter
788
803
{
789
804
private readonly ILogger _logger ;
0 commit comments