@@ -34,7 +34,7 @@ public sealed partial class ConnectionMultiplexer : IInternalConnectionMultiplex
3434 internal long syncOps , asyncOps ;
3535 private long syncTimeouts , fireAndForgets , asyncTimeouts ;
3636 private string ? failureMessage , activeConfigCause ;
37- private IDisposable ? pulse ;
37+ private TimerToken ? pulse ;
3838
3939 private readonly Hashtable servers = new Hashtable ( ) ;
4040 private volatile ServerSnapshot _serverSnapshot = ServerSnapshot . Empty ;
@@ -874,7 +874,7 @@ public ServerSnapshotFiltered(ServerEndPoint[] endpoints, int count, Func<Server
874874 }
875875 }
876876
877- [ return : NotNullIfNotNull ( " endpoint" ) ]
877+ [ return : NotNullIfNotNull ( nameof ( endpoint ) ) ]
878878 internal ServerEndPoint ? GetServerEndPoint ( EndPoint ? endpoint , LogProxy ? log = null , bool activate = true )
879879 {
880880 if ( endpoint == null ) return null ;
@@ -909,40 +909,130 @@ public ServerSnapshotFiltered(ServerEndPoint[] endpoints, int count, Func<Server
909909 return server ;
910910 }
911911
912- private sealed class TimerToken
912+ internal void Root ( ) => pulse ? . Root ( this ) ;
913+
914+ // note that this also acts (conditionally) as the GC root for the multiplexer
915+ // when there are in-flight messages; the timer can then acts as the heartbeat
916+ // to make sure that everything *eventually* completes
917+ private sealed class TimerToken : IDisposable
913918 {
914919 private TimerToken ( ConnectionMultiplexer muxer )
915920 {
916- _ref = new WeakReference ( muxer ) ;
921+ _weakRef = new ( muxer ) ;
917922 }
918923 private Timer ? _timer ;
919924 public void SetTimer ( Timer timer ) => _timer = timer ;
920- private readonly WeakReference _ref ;
925+
926+ private readonly WeakReference < ConnectionMultiplexer > _weakRef ;
927+
928+ private object StrongRefSyncLock => _weakRef ; // private and readonly? it'll do
929+ private ConnectionMultiplexer ? _strongRef ;
930+ private int _strongRefToken ;
921931
922932 private static readonly TimerCallback Heartbeat = state =>
923933 {
924934 var token = ( TimerToken ) state ! ;
925- var muxer = ( ConnectionMultiplexer ? ) ( token . _ref ? . Target ) ;
926- if ( muxer != null )
935+ if ( token . _weakRef . TryGetTarget ( out var muxer ) )
927936 {
928937 muxer . OnHeartbeat ( ) ;
929938 }
930939 else
931940 {
932941 // the muxer got disposed from out of us; kill the timer
933- var tmp = token . _timer ;
934- token . _timer = null ;
935- if ( tmp != null ) try { tmp . Dispose ( ) ; } catch { }
942+ token . Dispose ( ) ;
936943 }
937944 } ;
938945
939- internal static IDisposable Create ( ConnectionMultiplexer connection )
946+ internal static TimerToken Create ( ConnectionMultiplexer connection )
940947 {
941948 var token = new TimerToken ( connection ) ;
942949 var heartbeatMilliseconds = ( int ) connection . RawConfig . HeartbeatInterval . TotalMilliseconds ;
943950 var timer = new Timer ( Heartbeat , token , heartbeatMilliseconds , heartbeatMilliseconds ) ;
944951 token . SetTimer ( timer ) ;
945- return timer ;
952+ return token ;
953+ }
954+
955+ public void Dispose ( )
956+ {
957+ var tmp = _timer ;
958+ _timer = null ;
959+ if ( tmp is not null ) try { tmp . Dispose ( ) ; } catch { }
960+
961+ _strongRef = null ; // note that this shouldn't be relevant since we've unrooted the TimerToken
962+ }
963+
964+
965+ // explanation of rooting model:
966+ //
967+ // the timer has a reference to the TimerToken; this *always* has a weak-ref,
968+ // and *may* sometimes have a strong-ref; this is so that if a consumer
969+ // drops a multiplexer, it can be garbage collected, i.e. the heartbeat timer
970+ // doesn't keep the entire thing alive forever; instead, if the heartbeat detects
971+ // the weak-ref has been collected, it can cancel the timer and *itself* go away;
972+ // however: this leaves a problem where there is *in flight work* when the consumer
973+ // drops the multiplexer; in particular, if that happens when disconnected, there
974+ // could be consumer-visible pending TCS items *in the backlog queue*; we don't want
975+ // to leave those incomplete, as that fails the contractual expectations of async/await;
976+ // instead we need to root ourselves. The natural place to do this is by rooting the
977+ // multiplexer, allowing the heartbeat to keep poking things, so that the usual
978+ // message-processing and timeout rules apply. This is why we *sometimes* also keep
979+ // a strong-ref to the same multiplexer.
980+ //
981+ // The TimerToken is rooted by the timer callback; this then roots the multiplexer,
982+ // which keeps our bridges and connections in scope - until we're sure we're done
983+ // with them.
984+ //
985+ // 1) any bridge or connection will trigger rooting by calling Root when
986+ // they change from "empty" to "non-empty" i.e. whenever there
987+ // in-flight items; this always changes the token; this includes both the
988+ // backlog and awaiting-reply queues.
989+ //
990+ // 2) the heartbeat is responsible for unrooting, after processing timeouts
991+ // etc; first it checks whether it is needed (IsRooted), which also gives
992+ // it the current token.
993+ //
994+ // 3) if so, the heartbeat will (outside of the lock) query all sources to
995+ // see if they still have outstanding work; if everyone reports negatively,
996+ // then the heartbeat calls UnRoot passing in the old token; if this still
997+ // matches (i.e. no new work came in while we were looking away), then the
998+ // strong reference is removed; note that "has outstanding work" ignores
999+ // internal-call messages; we are only interested in consumer-facing items
1000+ // (but we need to check this *here* rather than when adding, as otherwise
1001+ // the definition of "is empty, should root" becomes more complicated, which
1002+ // impacts the write path, rather than the heartbeat path.
1003+ //
1004+ // This means that the multiplexer (via the timer) lasts as long as there are
1005+ // outstanding messages; if the consumer has dropped the multiplexer, then
1006+ // there will be no new incoming messages, and after timeouts: everything
1007+ // should drop.
1008+
1009+ public void Root ( ConnectionMultiplexer multiplexer )
1010+ {
1011+ lock ( StrongRefSyncLock )
1012+ {
1013+ _strongRef = multiplexer ;
1014+ _strongRefToken ++ ;
1015+ }
1016+ }
1017+
1018+ public bool IsRooted ( out int token )
1019+ {
1020+ lock ( StrongRefSyncLock )
1021+ {
1022+ token = _strongRefToken ;
1023+ return _strongRef is not null ;
1024+ }
1025+ }
1026+
1027+ public void UnRoot ( int token )
1028+ {
1029+ lock ( StrongRefSyncLock )
1030+ {
1031+ if ( token == _strongRefToken )
1032+ {
1033+ _strongRef = null ;
1034+ }
1035+ }
9461036 }
9471037 }
9481038
@@ -956,8 +1046,21 @@ private void OnHeartbeat()
9561046 Trace ( "heartbeat" ) ;
9571047
9581048 var tmp = GetServerSnapshot ( ) ;
1049+ int token = 0 ;
1050+ bool isRooted = pulse ? . IsRooted ( out token ) ?? false , hasPendingCallerFacingItems = false ;
1051+
9591052 for ( int i = 0 ; i < tmp . Length ; i ++ )
1053+ {
9601054 tmp [ i ] . OnHeartbeat ( ) ;
1055+ if ( isRooted && ! hasPendingCallerFacingItems )
1056+ {
1057+ hasPendingCallerFacingItems = tmp [ i ] . HasPendingCallerFacingItems ( ) ;
1058+ }
1059+ }
1060+ if ( isRooted && ! hasPendingCallerFacingItems )
1061+ { // release the GC root on the heartbeat *if* the token still matches
1062+ pulse ? . UnRoot ( token ) ;
1063+ }
9611064 }
9621065 catch ( Exception ex )
9631066 {
@@ -1935,7 +2038,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
19352038 }
19362039 }
19372040
1938- [ return : NotNullIfNotNull ( " defaultValue" ) ]
2041+ [ return : NotNullIfNotNull ( nameof ( defaultValue ) ) ]
19392042 internal T ? ExecuteSyncImpl < T > ( Message message , ResultProcessor < T > ? processor , ServerEndPoint ? server , T ? defaultValue = default )
19402043 {
19412044 if ( _isDisposed ) throw new ObjectDisposedException ( ToString ( ) ) ;
@@ -2049,7 +2152,7 @@ static async Task<T> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, Value
20492152
20502153 internal Task < T ? > ExecuteAsyncImpl < T > ( Message ? message , ResultProcessor < T > ? processor , object ? state , ServerEndPoint ? server )
20512154 {
2052- [ return : NotNullIfNotNull ( " tcs" ) ]
2155+ [ return : NotNullIfNotNull ( nameof ( tcs ) ) ]
20532156 static async Task < T ? > ExecuteAsyncImpl_Awaited ( ConnectionMultiplexer @this , ValueTask < WriteResult > write , TaskCompletionSource < T ? > ? tcs , Message message , ServerEndPoint ? server )
20542157 {
20552158 var result = await write . ForAwait ( ) ;
0 commit comments