@@ -41,6 +41,7 @@ internal sealed class PhysicalBridge : IDisposable
4141 private int _backlogProcessorIsRunning = 0 ;
4242 private int _backlogCurrentEnqueued = 0 ;
4343 private long _backlogTotalEnqueued = 0 ;
44+ private Exception ? _abandonPendingBacklogException ;
4445
4546 private int activeWriters = 0 ;
4647 private int beating ;
@@ -483,11 +484,18 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
483484
484485 private void AbandonPendingBacklog ( Exception ex )
485486 {
487+ // Peeking at the backlog, checking message and then dequeuing is not thread-safe.
488+ // CheckBacklogForTimeouts() depends on this being set to properly complete dequeued messages.
489+ Volatile . Write ( ref _abandonPendingBacklogException , ex ) ;
490+
486491 while ( BacklogTryDequeue ( out Message ? next ) )
487492 {
488493 Multiplexer . OnMessageFaulted ( next , ex ) ;
489494 next . SetExceptionAndComplete ( ex , this ) ;
490495 }
496+
497+ // Best effort cleanup to avoid false positive thread safey check failures in CheckBacklogForTimeouts().
498+ if ( _backlogStatus != BacklogStatus . CheckingForTimeout ) Interlocked . CompareExchange ( ref _abandonPendingBacklogException , null , ex ) ;
491499 }
492500
493501 internal void OnFullyEstablished ( PhysicalConnection connection , string source )
@@ -888,24 +896,29 @@ private void CheckBacklogForTimeouts()
888896 var now = Environment . TickCount ;
889897 var timeout = TimeoutMilliseconds ;
890898
891- // Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use
892- // a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately.
893- // But we reduce contention by only locking if we see something that looks timed out .
899+ // Peeking at the backlog, checking message and then dequeuing is not thread-safe.
900+ // Because AbandonPendingBacklog() is the only dequeuer that can run concurrently,
901+ // locking can be avoided by throwing the AbandonPendingBacklog() exception here .
894902 while ( _backlog . TryPeek ( out Message ? message ) )
895903 {
896904 // See if the message has pass our async timeout threshold
897905 // Note: All timed out messages must be dequeued, even when no completion is needed, to be able to dequeue and complete other timed out messages.
898906 if ( ! message . HasTimedOut ( now , timeout , out var _ ) ) break ; // not a timeout - we can stop looking
899- lock ( _backlog )
907+ if ( ! BacklogTryDequeue ( out var message2 ) ) message2 = null ; // consume it for real
908+ if ( message != message2 )
900909 {
901- // Peek again since we didn't have lock before...
902- // and rerun the exact same checks as above, note that it may be a different message now
903- if ( ! _backlog . TryPeek ( out message ) ) break ;
904- if ( ! message . HasTimedOut ( now , timeout , out var _ ) ) break ;
910+ var ex = Volatile . Read ( ref _abandonPendingBacklogException ) ;
911+ var isAbandonPendingBacklog = ex != null ;
912+ ex ??= new RedisException ( "Thread safety bug detected! A queue message disappeared when AbandonPendingBacklog() was not running." ) ;
913+ message2 ? . SetExceptionAndComplete ( ex , this ) ;
905914
906- if ( ! BacklogTryDequeue ( out var message2 ) || ( message != message2 ) ) // consume it for real
915+ if ( isAbandonPendingBacklog )
907916 {
908- throw new RedisException ( "Thread safety bug detected! A queue message disappeared while we had the backlog lock" ) ;
917+ break ;
918+ }
919+ else
920+ {
921+ throw ex ;
909922 }
910923 }
911924
@@ -976,20 +989,15 @@ private async Task ProcessBacklogAsync()
976989 if ( isDisposed && BacklogHasItems )
977990 {
978991 _backlogStatus = BacklogStatus . NotifyingDisposed ;
979- // Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use
980- // a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately.
981- // But we reduce contention by only locking if we see something that looks timed out.
992+ // Peeking at the backlog, checking message and then dequeuing is not thread-safe.
993+ // CheckBacklogForTimeouts() depends on not running concurrently with this.
982994 while ( BacklogHasItems )
983995 {
984- Message ? message = null ;
985- lock ( _backlog )
996+ if ( ! BacklogTryDequeue ( out Message ? message ) )
986997 {
987- if ( ! BacklogTryDequeue ( out message ) )
988- {
989- break ;
990- }
998+ break ;
991999 }
992-
1000+
9931001 var ex = ExceptionFactory . Timeout ( Multiplexer , "The message was in the backlog when connection was disposed" , message , ServerEndPoint , WriteResult . TimeoutBeforeWrite , this ) ;
9941002 message . SetExceptionAndComplete ( ex , this ) ;
9951003 }
@@ -1073,17 +1081,13 @@ private async Task ProcessBridgeBacklogAsync()
10731081 // If we can't write them, abort and wait for the next heartbeat or activation to try this again.
10741082 while ( IsConnected && physical ? . HasOutputPipe == true )
10751083 {
1076- Message ? message ;
10771084 _backlogStatus = BacklogStatus . CheckingForWork ;
10781085
1079- lock ( _backlog )
1086+ // Note that we're actively taking it off the queue here, not peeking
1087+ // If there's nothing left in queue, we're done.
1088+ if ( ! BacklogTryDequeue ( out Message ? message ) )
10801089 {
1081- // Note that we're actively taking it off the queue here, not peeking
1082- // If there's nothing left in queue, we're done.
1083- if ( ! BacklogTryDequeue ( out message ) )
1084- {
1085- break ;
1086- }
1090+ break ;
10871091 }
10881092
10891093 try
0 commit comments