diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 5195525d1..b17d6da20 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,7 +8,7 @@ Current package versions: ## Unreleased -No pending changes for the next release yet. +- Fix [#1520](https://github.com/StackExchange/StackExchange.Redis/issues/1520) & [#1660](https://github.com/StackExchange/StackExchange.Redis/issues/1660): When `MOVED` is encountered from a cluster, a reconfigure will happen proactively to react to cluster changes ASAP ([#2286 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2286)) ## 2.6.80 diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 5630b28fe..e4080de77 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -49,6 +49,10 @@ public sealed partial class ConnectionMultiplexer : IInternalConnectionMultiplex ConfigurationOptions IInternalConnectionMultiplexer.RawConfig => RawConfig; + private int lastReconfigiureTicks = Environment.TickCount; + internal long LastReconfigureSecondsAgo => + unchecked(Environment.TickCount - Thread.VolatileRead(ref lastReconfigiureTicks)) / 1000; + private int _activeHeartbeatErrors, lastHeartbeatTicks; internal long LastHeartbeatSecondsAgo => pulse is null @@ -366,8 +370,19 @@ internal void CheckMessage(Message message) } } - internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved) => - ServerSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved); + internal bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved) + { + // If we're being told to re-send something because the hash slot moved, that means our topology is out of date + // ...and we should re-evaluate what's what. + // Allow for a 5-second back-off so we don't hammer this in a loop though + if (isMoved && LastReconfigureSecondsAgo > 5) + { + // Async kickoff a reconfigure + ReconfigureIfNeeded(endpoint, false, "MOVED encountered"); + } + + return ServerSelectionStrategy.TryResend(hashSlot, message, endpoint, isMoved); + } /// /// Wait for a given asynchronous operation to complete (or timeout). @@ -1214,6 +1229,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP } Trace("Starting reconfiguration..."); Trace(blame != null, "Blaming: " + Format.ToString(blame)); + Interlocked.Exchange(ref lastReconfigiureTicks, Environment.TickCount); log?.WriteLine(RawConfig.ToString(includePassword: false)); log?.WriteLine(); @@ -1552,10 +1568,7 @@ public EndPoint[] GetEndPoints(bool configuredOnly = false) => foreach (EndPoint endpoint in clusterEndpoints) { serverEndpoint = GetServerEndPoint(endpoint); - if (serverEndpoint != null) - { - serverEndpoint.UpdateNodeRelations(clusterConfig); - } + serverEndpoint?.UpdateNodeRelations(clusterConfig); } return clusterEndpoints; } diff --git a/tests/StackExchange.Redis.Tests/CommandTimeoutTests.cs b/tests/StackExchange.Redis.Tests/CommandTimeoutTests.cs index 4ba77cfd8..f4b50a930 100644 --- a/tests/StackExchange.Redis.Tests/CommandTimeoutTests.cs +++ b/tests/StackExchange.Redis.Tests/CommandTimeoutTests.cs @@ -47,7 +47,7 @@ public async Task DefaultHeartbeatLowTimeout() using var conn = ConnectionMultiplexer.Connect(options); var pauseServer = GetServer(pauseConn); - var pauseTask = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 500); + var pauseTask = pauseServer.ExecuteAsync("CLIENT", "PAUSE", 2000); var key = Me(); var db = conn.GetDatabase();