Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Makes `StreamEntry` constructor public for better unit test experience (#1923 via WeihanLi)
- Fix integer overflow error (issue #1926) with 2GiB+ result payloads (#1928 via mgravell)
- Update assumed redis versions to v2.8 or v4.0 in the Azure case (#1929 via NickCraver)
- Moved tiebreaker fetching in connections into the handshake phase (streamline + simplification) (#1931 via NickCraver)

## 2.2.88

Expand Down
71 changes: 16 additions & 55 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1719,20 +1719,16 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
}
int standaloneCount = 0, clusterCount = 0, sentinelCount = 0;
var endpoints = RawConfig.EndPoints;
log?.WriteLine($"{endpoints.Count} unique nodes specified");
bool useTieBreakers = !string.IsNullOrWhiteSpace(RawConfig.TieBreaker);
log?.WriteLine($"{endpoints.Count} unique nodes specified ({(useTieBreakers ? "with" : "without")} tiebreaker)");

if (endpoints.Count == 0)
{
throw new InvalidOperationException("No nodes to consider");
}
#pragma warning disable CS0618
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority;
#pragma warning restore CS0618
List<ServerEndPoint> masters = new List<ServerEndPoint>(endpoints.Count);
bool useTieBreakers = !string.IsNullOrWhiteSpace(RawConfig.TieBreaker);

ServerEndPoint[] servers = null;
Task<string>[] tieBreakers = null;
bool encounteredConnectedClusterServer = false;
Stopwatch watch = null;

Expand All @@ -1748,7 +1744,6 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
if (endpoints == null) break;

var available = new Task<string>[endpoints.Count];
tieBreakers = useTieBreakers ? new Task<string>[endpoints.Count] : null;
servers = new ServerEndPoint[available.Length];

RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)RawConfig.TieBreaker : default(RedisKey);
Expand Down Expand Up @@ -1791,22 +1786,6 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
log?.WriteLine($"{Format.ToString(server.EndPoint)}: Endpoint is {server.ConnectionState}");
}

// After we've successfully connected (and authenticated), kickoff tie breakers if needed
if (useTieBreakers)
{
log?.WriteLine($"Election: Gathering tie-breakers...");
for (int i = 0; i < available.Length; i++)
{
var server = servers[i];

log?.WriteLine($"{Format.ToString(server.EndPoint)}: Requesting tie-break (Key=\"{RawConfig.TieBreaker}\")...");
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
tieBreakers[i] = server.WriteDirectAsync(msg, ResultProcessor.String);
}
}

EndPointCollection updatedClusterEndpointCollection = null;
for (int i = 0; i < available.Length; i++)
{
Expand Down Expand Up @@ -1920,7 +1899,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
ServerSelectionStrategy.ServerType = ServerType.Standalone;
}

var preferred = await NominatePreferredMaster(log, servers, useTieBreakers, tieBreakers, masters, timeoutMs: RawConfig.ConnectTimeout - checked((int)watch.ElapsedMilliseconds)).ObserveErrors().ForAwait();
var preferred = NominatePreferredMaster(log, servers, useTieBreakers, masters);
foreach (var master in masters)
{
if (master == preferred || master.IsReplica)
Expand Down Expand Up @@ -2050,44 +2029,26 @@ private void ResetAllNonConnected()
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1822:Mark members as static", Justification = "Partial - may use instance data")]
partial void OnTraceLog(LogProxy log, [CallerMemberName] string caller = null);

private static async Task<ServerEndPoint> NominatePreferredMaster(LogProxy log, ServerEndPoint[] servers, bool useTieBreakers, Task<string>[] tieBreakers, List<ServerEndPoint> masters, int timeoutMs)
private static ServerEndPoint NominatePreferredMaster(LogProxy log, ServerEndPoint[] servers, bool useTieBreakers, List<ServerEndPoint> masters)
{
Dictionary<string, int> uniques = null;
if (useTieBreakers)
{ // count the votes
uniques = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
log?.WriteLine("Waiting for tiebreakers...");
await WaitAllIgnoreErrorsAsync("tiebreaker", tieBreakers, Math.Max(timeoutMs, 200), log).ForAwait();
for (int i = 0; i < tieBreakers.Length; i++)
for (int i = 0; i < servers.Length; i++)
{
var ep = servers[i].EndPoint;
var status = tieBreakers[i].Status;
switch (status)
var server = servers[i];
string serverResult = server.TieBreakerResult;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we nuke this here? or not worth the additional race conditions that would introduce for overlapped/staggered connects; meh, probably not worth the complexity risk - just wanted to share a thought

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, I like the thinking - seems like we'd want to change where it's stored so it's transient in a connection state object or some such and not stored on the server but as a lookup that has the lifetime of the connect handshake perhaps?


if (string.IsNullOrWhiteSpace(serverResult))
{
case TaskStatus.RanToCompletion:
string s = tieBreakers[i].Result;
if (string.IsNullOrWhiteSpace(s))
{
log?.WriteLine($"Election: {Format.ToString(ep)} had no tiebreaker set");
}
else
{
log?.WriteLine($"Election: {Format.ToString(ep)} nominates: {s}");
if (!uniques.TryGetValue(s, out int count)) count = 0;
uniques[s] = count + 1;
}
break;
case TaskStatus.Faulted:
log?.WriteLine($"Election: {Format.ToString(ep)} failed to nominate ({status})");
foreach (var ex in tieBreakers[i].Exception.InnerExceptions)
{
if (ex.Message.StartsWith("MOVED ") || ex.Message.StartsWith("ASK ")) continue;
log?.WriteLine("> " + ex.Message);
}
break;
default:
log?.WriteLine($"Election: {Format.ToString(ep)} failed to nominate ({status})");
break;
log?.WriteLine($"Election: {Format.ToString(server)} had no tiebreaker set");
}
else
{
log?.WriteLine($"Election: {Format.ToString(server)} nominates: {serverResult}");
if (!uniques.TryGetValue(serverResult, out int count)) count = 0;
uniques[serverResult] = count + 1;
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public static readonly StreamPendingMessagesProcessor

public static readonly ResultProcessor<string>
String = new StringProcessor(),
TieBreaker = new TieBreakerProcessor(),
ClusterNodesRaw = new ClusterNodesRawProcessor();

#region Sentinel
Expand Down Expand Up @@ -2068,6 +2069,34 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}

private sealed class TieBreakerProcessor : ResultProcessor<string>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
{
switch (result.Type)
{
case ResultType.SimpleString:
case ResultType.BulkString:
var tieBreaker = result.GetString();
SetResult(message, tieBreaker);

var bridge = connection.BridgeCouldBeNull;
try
{
var endpoint = bridge?.ServerEndPoint;
if (endpoint != null)
{
endpoint.TieBreakerResult = tieBreaker;
}
}
catch { }

return true;
}
return false;
}
}

private class TracerProcessor : ResultProcessor<bool>
{
private readonly bool establishConnection;
Expand Down Expand Up @@ -2146,6 +2175,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
if (establishConnection)
{
// This is what ultimately brings us to complete a connection, by advancing the state forward from a successful tracer after connection.
connection.BridgeCouldBeNull?.OnFullyEstablished(connection, $"From command: {message.Command}");
}
SetResult(message, happy);
Expand Down
15 changes: 15 additions & 0 deletions src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ internal async Task AutoConfigureAsync(PhysicalConnection connection, LogProxy l
msg.SetInternalCall();
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.ClusterNodes).ForAwait();
}
// If we are ging to fetch a tie breaker, do so last and we'll get it in before the tracer fires completing the connection
if (!string.IsNullOrEmpty(Multiplexer.RawConfig.TieBreaker))
{
RedisKey tieBreakerKey = Multiplexer.RawConfig.TieBreaker;
log?.WriteLine($"{Format.ToString(EndPoint)}: Requesting tie-break (Key=\"{tieBreakerKey}\")...");
msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.TieBreaker).ForAwait();
}
}

private int _nextReplicaOffset;
Expand Down Expand Up @@ -608,6 +618,11 @@ public EndPoint MasterEndPoint
set { SetConfig(ref masterEndPoint, value); }
}

/// <summary>
/// Result of the latest tie breaker (from the last reconfigure).
/// </summary>
internal string TieBreakerResult { get; set; }

internal bool CheckInfoReplication()
{
lastInfoReplicationCheckTicks = Environment.TickCount;
Expand Down