diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index c3b401a47..96f86de19 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -6,6 +6,7 @@ - Fix integer overflow error (issue #1926) with 2GiB+ result payloads (#1928 via mgravell) - Update assumed redis versions to v2.8 or v4.0 in the Azure case (#1929 via NickCraver) - Fix profiler showing `EVAL` instead `EVALSHA` (#1930 via martinpotter) +- Moved tiebreaker fetching in connections into the handshake phase (streamline + simplification) (#1931 via NickCraver) ## 2.2.88 diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index f265b8e93..055ea3d12 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -1718,20 +1718,16 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP } int standaloneCount = 0, clusterCount = 0, sentinelCount = 0; var endpoints = RawConfig.EndPoints; - log?.WriteLine($"{endpoints.Count} unique nodes specified"); + bool useTieBreakers = !string.IsNullOrWhiteSpace(RawConfig.TieBreaker); + log?.WriteLine($"{endpoints.Count} unique nodes specified ({(useTieBreakers ? "with" : "without")} tiebreaker)"); if (endpoints.Count == 0) { throw new InvalidOperationException("No nodes to consider"); } -#pragma warning disable CS0618 - const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority; -#pragma warning restore CS0618 List masters = new List(endpoints.Count); - bool useTieBreakers = !string.IsNullOrWhiteSpace(RawConfig.TieBreaker); ServerEndPoint[] servers = null; - Task[] tieBreakers = null; bool encounteredConnectedClusterServer = false; Stopwatch watch = null; @@ -1747,7 +1743,6 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP if (endpoints == null) break; var available = new Task[endpoints.Count]; - tieBreakers = useTieBreakers ? new Task[endpoints.Count] : null; servers = new ServerEndPoint[available.Length]; RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)RawConfig.TieBreaker : default(RedisKey); @@ -1790,22 +1785,6 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP log?.WriteLine($"{Format.ToString(server.EndPoint)}: Endpoint is {server.ConnectionState}"); } - // After we've successfully connected (and authenticated), kickoff tie breakers if needed - if (useTieBreakers) - { - log?.WriteLine("Election: Gathering tie-breakers..."); - for (int i = 0; i < available.Length; i++) - { - var server = servers[i]; - - log?.WriteLine($"{Format.ToString(server.EndPoint)}: Requesting tie-break (Key=\"{RawConfig.TieBreaker}\")..."); - Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey); - msg.SetInternalCall(); - msg = LoggingMessage.Create(log, msg); - tieBreakers[i] = server.WriteDirectAsync(msg, ResultProcessor.String); - } - } - EndPointCollection updatedClusterEndpointCollection = null; for (int i = 0; i < available.Length; i++) { @@ -1919,7 +1898,7 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP ServerSelectionStrategy.ServerType = ServerType.Standalone; } - var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters, timeoutMs: RawConfig.ConnectTimeout - checked((int)watch.ElapsedMilliseconds)).ObserveErrors().ForAwait(); + var preferred = NominatePreferredMaster(log, servers, useTieBreakers, masters); foreach (var master in masters) { if (master == preferred || master.IsReplica) @@ -2048,44 +2027,26 @@ private void ResetAllNonConnected() [System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822:Mark members as static", Justification = "Partial - may use instance data")] partial void OnTraceLog(LogProxy log, [CallerMemberName] string caller = null); - private static async Task NominatePreferredMaster(LogProxy log, ServerEndPoint[] servers, bool useTieBreakers, Task[] tieBreakers, List masters, int timeoutMs) + private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoint[] servers, bool useTieBreakers, List masters) { Dictionary uniques = null; if (useTieBreakers) { // count the votes uniques = new Dictionary(StringComparer.OrdinalIgnoreCase); - log?.WriteLine("Waiting for tiebreakers..."); - await WaitAllIgnoreErrorsAsync("tiebreaker", tieBreakers, Math.Max(timeoutMs, 200), log).ForAwait(); - for (int i = 0; i < tieBreakers.Length; i++) + for (int i = 0; i < servers.Length; i++) { - var ep = servers[i].EndPoint; - var status = tieBreakers[i].Status; - switch (status) + var server = servers[i]; + string serverResult = server.TieBreakerResult; + + if (string.IsNullOrWhiteSpace(serverResult)) { - case TaskStatus.RanToCompletion: - string s = tieBreakers[i].Result; - if (string.IsNullOrWhiteSpace(s)) - { - log?.WriteLine($"Election: {Format.ToString(ep)} had no tiebreaker set"); - } - else - { - log?.WriteLine($"Election: {Format.ToString(ep)} nominates: {s}"); - if (!uniques.TryGetValue(s, out int count)) count = 0; - uniques[s] = count + 1; - } - break; - case TaskStatus.Faulted: - log?.WriteLine($"Election: {Format.ToString(ep)} failed to nominate ({status})"); - foreach (var ex in tieBreakers[i].Exception.InnerExceptions) - { - if (ex.Message.StartsWith("MOVED ") || ex.Message.StartsWith("ASK ")) continue; - log?.WriteLine("> " + ex.Message); - } - break; - default: - log?.WriteLine($"Election: {Format.ToString(ep)} failed to nominate ({status})"); - break; + log?.WriteLine($"Election: {Format.ToString(server)} had no tiebreaker set"); + } + else + { + log?.WriteLine($"Election: {Format.ToString(server)} nominates: {serverResult}"); + if (!uniques.TryGetValue(serverResult, out int count)) count = 0; + uniques[serverResult] = count + 1; } } } diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index af3a91dad..d17a1902a 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -121,6 +121,7 @@ public static readonly StreamPendingMessagesProcessor public static readonly ResultProcessor String = new StringProcessor(), + TieBreaker = new TieBreakerProcessor(), ClusterNodesRaw = new ClusterNodesRawProcessor(); #region Sentinel @@ -2068,6 +2069,32 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes } } + private sealed class TieBreakerProcessor : ResultProcessor + { + protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result) + { + switch (result.Type) + { + case ResultType.SimpleString: + case ResultType.BulkString: + var tieBreaker = result.GetString(); + SetResult(message, tieBreaker); + + try + { + if (connection.BridgeCouldBeNull?.ServerEndPoint is ServerEndPoint endpoint) + { + endpoint.TieBreakerResult = tieBreaker; + } + } + catch { } + + return true; + } + return false; + } + } + private class TracerProcessor : ResultProcessor { private readonly bool establishConnection; @@ -2146,6 +2173,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes { if (establishConnection) { + // This is what ultimately brings us to complete a connection, by advancing the state forward from a successful tracer after connection. connection.BridgeCouldBeNull?.OnFullyEstablished(connection, $"From command: {message.Command}"); } SetResult(message, happy); diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index afb67349a..f41e360a3 100755 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -375,6 +375,17 @@ internal async Task AutoConfigureAsync(PhysicalConnection connection, LogProxy l msg.SetInternalCall(); await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.ClusterNodes).ForAwait(); } + // If we are going to fetch a tie breaker, do so last and we'll get it in before the tracer fires completing the connection + // But if GETs are disabled on this, do not fail the connection - we just don't get tiebreaker benefits + if (!string.IsNullOrEmpty(Multiplexer.RawConfig.TieBreaker) && Multiplexer.RawConfig.CommandMap.IsAvailable(RedisCommand.GET)) + { + RedisKey tieBreakerKey = Multiplexer.RawConfig.TieBreaker; + log?.WriteLine($"{Format.ToString(EndPoint)}: Requesting tie-break (Key=\"{tieBreakerKey}\")..."); + msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey); + msg.SetInternalCall(); + msg = LoggingMessage.Create(log, msg); + await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.TieBreaker).ForAwait(); + } } private int _nextReplicaOffset; @@ -608,6 +619,11 @@ public EndPoint MasterEndPoint set { SetConfig(ref masterEndPoint, value); } } + /// + /// Result of the latest tie breaker (from the last reconfigure). + /// + internal string TieBreakerResult { get; set; } + internal bool CheckInfoReplication() { lastInfoReplicationCheckTicks = Environment.TickCount; diff --git a/tests/StackExchange.Redis.Tests/ConnectCustomConfig.cs b/tests/StackExchange.Redis.Tests/ConnectCustomConfig.cs new file mode 100644 index 000000000..2bed55caa --- /dev/null +++ b/tests/StackExchange.Redis.Tests/ConnectCustomConfig.cs @@ -0,0 +1,77 @@ +using Xunit; +using Xunit.Abstractions; + +namespace StackExchange.Redis.Tests +{ + public class ConnectCustomConfig : TestBase + { + public ConnectCustomConfig(ITestOutputHelper output) : base (output) { } + + // So we're triggering tiebreakers here + protected override string GetConfiguration() => TestConfig.Current.MasterServerAndPort + "," + TestConfig.Current.ReplicaServerAndPort; + + [Theory] + [InlineData("config")] + [InlineData("info")] + [InlineData("get")] + [InlineData("config,get")] + [InlineData("info,get")] + [InlineData("config,info,get")] + public void DisabledCommandsStillConnect(string disabledCommands) + { + using var muxer = Create(allowAdmin: true, disabledCommands: disabledCommands.Split(','), log: Writer); + + var db = muxer.GetDatabase(); + db.Ping(); + Assert.True(db.IsConnected(default(RedisKey))); + } + + [Fact] + public void TieBreakerIntact() + { + using var muxer = Create(allowAdmin: true, log: Writer) as ConnectionMultiplexer; + + var tiebreaker = muxer.GetDatabase().StringGet(muxer.RawConfig.TieBreaker); + Log($"Tiebreaker: {tiebreaker}"); + + var snapshot = muxer.GetServerSnapshot(); + foreach (var server in snapshot) + { + Assert.Equal(tiebreaker, server.TieBreakerResult); + } + } + + [Fact] + public void TieBreakerSkips() + { + using var muxer = Create(allowAdmin: true, disabledCommands: new[] { "get" }, log: Writer) as ConnectionMultiplexer; + Assert.Throws(() => muxer.GetDatabase().StringGet(muxer.RawConfig.TieBreaker)); + + var snapshot = muxer.GetServerSnapshot(); + foreach (var server in snapshot) + { + Assert.True(server.IsConnected); + Assert.Null(server.TieBreakerResult); + } + } + + [Fact] + public void TiebreakerIncorrectType() + { + var tiebreakerKey = Me(); + using var fubarMuxer = Create(allowAdmin: true, log: Writer); + // Store something nonsensical in the tiebreaker key: + fubarMuxer.GetDatabase().HashSet(tiebreakerKey, "foo", "bar"); + + // Ensure the next connection getting an invalid type still connects + using var muxer = Create(allowAdmin: true, tieBreaker: tiebreakerKey, log: Writer); + + var db = muxer.GetDatabase(); + db.Ping(); + Assert.True(db.IsConnected(default(RedisKey))); + + var ex = Assert.Throws(() => db.StringGet(tiebreakerKey)); + Assert.Contains("WRONGTYPE", ex.Message); + } + } +}