Skip to content

Commit 2a0468e

Browse files
authored
Resolve sync-context issues (missing configureawait) in the multiplexer (#2229)
* add test to investigate #2223 * add connect test * assert zeros in SyncConfigure * add missing ForAwait uses * investigate rantocompletion fault * fix brittle test * stabilize tests * release notes
1 parent f3f3013 commit 2a0468e

File tree

6 files changed

+201
-18
lines changed

6 files changed

+201
-18
lines changed

docs/ReleaseNotes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- Adds: `IConnectionMultiplexer.GetServers()` to get all `IServer` instances for a multiplexer ([#2203 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2203))
88
- Fix [#2016](https://github.com/StackExchange/StackExchange.Redis/issues/2016): Align server selection with supported commands (e.g. with writable servers) to reduce `Command cannot be issued to a replica` errors ([#2191 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2191))
99
- Performance: Optimization around timeout processing to reduce lock contention in the case of many items that haven't yet timed out during a heartbeat ([#2217 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2217))
10+
- Fix: Resolve sync-context issues (missing `ConfigureAwait(false)`) ([#2229 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2229))
1011

1112
## 2.6.48
1213

src/StackExchange.Redis/ConnectionMultiplexer.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOpt
234234
log?.WriteLine($"Checking {Format.ToString(srv.EndPoint)} is available...");
235235
try
236236
{
237-
await srv.PingAsync(flags); // if it isn't happy, we're not happy
237+
await srv.PingAsync(flags).ForAwait(); // if it isn't happy, we're not happy
238238
}
239239
catch (Exception ex)
240240
{
@@ -257,7 +257,7 @@ internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOpt
257257
msg = Message.Create(0, flags | CommandFlags.FireAndForget, RedisCommand.SET, tieBreakerKey, newPrimary);
258258
try
259259
{
260-
await node.WriteDirectAsync(msg, ResultProcessor.DemandOK);
260+
await node.WriteDirectAsync(msg, ResultProcessor.DemandOK).ForAwait();
261261
}
262262
catch { }
263263
}
@@ -267,7 +267,7 @@ internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOpt
267267
log?.WriteLine($"Making {Format.ToString(srv.EndPoint)} a primary...");
268268
try
269269
{
270-
await srv.ReplicaOfAsync(null, flags);
270+
await srv.ReplicaOfAsync(null, flags).ForAwait();
271271
}
272272
catch (Exception ex)
273273
{
@@ -282,7 +282,7 @@ internal async Task MakePrimaryAsync(ServerEndPoint server, ReplicationChangeOpt
282282
msg = Message.Create(0, flags | CommandFlags.FireAndForget, RedisCommand.SET, tieBreakerKey, newPrimary);
283283
try
284284
{
285-
await server.WriteDirectAsync(msg, ResultProcessor.DemandOK);
285+
await server.WriteDirectAsync(msg, ResultProcessor.DemandOK).ForAwait();
286286
}
287287
catch { }
288288
}
@@ -311,13 +311,13 @@ async Task BroadcastAsync(ServerEndPoint[] serverNodes)
311311
if (!node.IsConnected) continue;
312312
log?.WriteLine($"Broadcasting via {Format.ToString(node.EndPoint)}...");
313313
msg = Message.Create(-1, flags | CommandFlags.FireAndForget, RedisCommand.PUBLISH, channel, newPrimary);
314-
await node.WriteDirectAsync(msg, ResultProcessor.Int64);
314+
await node.WriteDirectAsync(msg, ResultProcessor.Int64).ForAwait();
315315
}
316316
}
317317
}
318318

319319
// Send a message before it happens - because afterwards a new replica may be unresponsive
320-
await BroadcastAsync(nodes);
320+
await BroadcastAsync(nodes).ForAwait();
321321

322322
if (options.HasFlag(ReplicationChangeOptions.ReplicateToOtherEndpoints))
323323
{
@@ -327,14 +327,14 @@ async Task BroadcastAsync(ServerEndPoint[] serverNodes)
327327

328328
log?.WriteLine($"Replicating to {Format.ToString(node.EndPoint)}...");
329329
msg = RedisServer.CreateReplicaOfMessage(node, server.EndPoint, flags);
330-
await node.WriteDirectAsync(msg, ResultProcessor.DemandOK);
330+
await node.WriteDirectAsync(msg, ResultProcessor.DemandOK).ForAwait();
331331
}
332332
}
333333

334334
// ...and send one after it happens - because the first broadcast may have landed on a secondary client
335335
// and it can reconfigure before any topology change actually happened. This is most likely to happen
336336
// in low-latency environments.
337-
await BroadcastAsync(nodes);
337+
await BroadcastAsync(nodes).ForAwait();
338338

339339
// and reconfigure the muxer
340340
log?.WriteLine("Reconfiguring all endpoints...");
@@ -344,7 +344,7 @@ async Task BroadcastAsync(ServerEndPoint[] serverNodes)
344344
{
345345
Interlocked.Exchange(ref activeConfigCause, null);
346346
}
347-
if (!await ReconfigureAsync(first: false, reconfigureAll: true, log, srv.EndPoint, cause: nameof(MakePrimaryAsync)))
347+
if (!await ReconfigureAsync(first: false, reconfigureAll: true, log, srv.EndPoint, cause: nameof(MakePrimaryAsync)).ForAwait())
348348
{
349349
log?.WriteLine("Verifying the configuration was incomplete; please verify");
350350
}
@@ -1097,7 +1097,7 @@ public bool Configure(TextWriter? log = null)
10971097
public async Task<bool> ConfigureAsync(TextWriter? log = null)
10981098
{
10991099
using var logProxy = LogProxy.TryCreate(log);
1100-
return await ReconfigureAsync(first: false, reconfigureAll: true, logProxy, null, "configure").ObserveErrors();
1100+
return await ReconfigureAsync(first: false, reconfigureAll: true, logProxy, null, "configure").ObserveErrors().ForAwait();
11011101
}
11021102

11031103
internal int SyncConnectTimeout(bool forConnect)
@@ -2036,10 +2036,10 @@ public void Dispose()
20362036
public async ValueTask DisposeAsync()
20372037
{
20382038
GC.SuppressFinalize(this);
2039-
await CloseAsync(!_isDisposed);
2039+
await CloseAsync(!_isDisposed).ForAwait();
20402040
if (sentinelConnection is ConnectionMultiplexer sentinel)
20412041
{
2042-
await sentinel.DisposeAsync();
2042+
await sentinel.DisposeAsync().ForAwait();
20432043
}
20442044
var oldTimer = Interlocked.Exchange(ref sentinelPrimaryReconnectTimer, null);
20452045
oldTimer?.Dispose();

src/StackExchange.Redis/LoggingPipe.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private async Task CloneAsync(string path, PipeReader from, PipeWriter to)
5454

5555
while(true)
5656
{
57-
var result = await from.ReadAsync();
57+
var result = await from.ReadAsync().ForAwait();
5858
var buffer = result.Buffer;
5959
if (result.IsCompleted && buffer.IsEmpty) break;
6060

src/StackExchange.Redis/RedisServer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ public async Task MakePrimaryAsync(ReplicationChangeOptions options, TextWriter?
418418
{
419419
using (var proxy = LogProxy.TryCreate(log))
420420
{
421-
await multiplexer.MakePrimaryAsync(server, options, proxy);
421+
await multiplexer.MakePrimaryAsync(server, options, proxy).ForAwait();
422422
}
423423
}
424424

@@ -793,18 +793,18 @@ public async Task ReplicaOfAsync(EndPoint? master, CommandFlags flags = CommandF
793793
{
794794
try
795795
{
796-
await server.WriteDirectAsync(tieBreakerRemoval, ResultProcessor.Boolean);
796+
await server.WriteDirectAsync(tieBreakerRemoval, ResultProcessor.Boolean).ForAwait();
797797
}
798798
catch { }
799799
}
800800

801801
var msg = CreateReplicaOfMessage(server, master, flags);
802-
await ExecuteAsync(msg, ResultProcessor.DemandOK);
802+
await ExecuteAsync(msg, ResultProcessor.DemandOK).ForAwait();
803803

804804
// attempt to broadcast a reconfigure message to anybody listening to this server
805805
if (GetConfigChangeMessage() is Message configChangeMessage)
806806
{
807-
await server.WriteDirectAsync(configChangeMessage, ResultProcessor.Int64);
807+
await server.WriteDirectAsync(configChangeMessage, ResultProcessor.Int64).ForAwait();
808808
}
809809
}
810810

src/StackExchange.Redis/ServerEndPoint.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -939,7 +939,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, LogProxy? log)
939939
var connType = bridge.ConnectionType;
940940
if (connType == ConnectionType.Interactive)
941941
{
942-
await AutoConfigureAsync(connection, log);
942+
await AutoConfigureAsync(connection, log).ForAwait();
943943
}
944944

945945
var tracer = GetTracerMessage(true);
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
using System;
2+
using System.IO;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Xunit;
6+
using Xunit.Abstractions;
7+
8+
namespace StackExchange.Redis.Tests
9+
{
10+
public class SyncContextTests : TestBase
11+
{
12+
public SyncContextTests(ITestOutputHelper testOutput) : base(testOutput) { }
13+
14+
/* Note A (referenced below)
15+
*
16+
* When sync-context is *enabled*, we don't validate OpCount > 0 - this is because *with the additional checks*,
17+
* it can genuinely happen that by the time we actually await it, it has completd - which results in a brittle test.
18+
*/
19+
[Theory]
20+
[InlineData(true)]
21+
[InlineData(false)]
22+
public async Task DetectSyncContextUsafe(bool continueOnCapturedContext)
23+
{
24+
using var ctx = new MySyncContext(Writer);
25+
Assert.Equal(0, ctx.OpCount);
26+
await Task.Delay(100).ConfigureAwait(continueOnCapturedContext);
27+
28+
AssertState(continueOnCapturedContext, ctx);
29+
}
30+
31+
private void AssertState(bool continueOnCapturedContext, MySyncContext ctx)
32+
{
33+
LogNoTime($"Context in AssertState: {ctx}");
34+
if (continueOnCapturedContext)
35+
{
36+
Assert.True(ctx.IsCurrent, nameof(ctx.IsCurrent));
37+
// see note A re OpCount
38+
}
39+
else
40+
{
41+
// no guarantees on sync-context still being current; depends on sync vs async
42+
Assert.Equal(0, ctx.OpCount);
43+
}
44+
}
45+
46+
[Fact]
47+
public void SyncPing()
48+
{
49+
using var ctx = new MySyncContext(Writer);
50+
using var conn = Create();
51+
Assert.Equal(0, ctx.OpCount);
52+
var db = conn.GetDatabase();
53+
db.Ping();
54+
Assert.Equal(0, ctx.OpCount);
55+
}
56+
57+
[Theory]
58+
[InlineData(true)]
59+
[InlineData(false)]
60+
public async Task AsyncPing(bool continueOnCapturedContext)
61+
{
62+
using var ctx = new MySyncContext(Writer);
63+
using var conn = Create();
64+
Assert.Equal(0, ctx.OpCount);
65+
var db = conn.GetDatabase();
66+
LogNoTime($"Context before await: {ctx}");
67+
await db.PingAsync().ConfigureAwait(continueOnCapturedContext);
68+
69+
AssertState(continueOnCapturedContext, ctx);
70+
}
71+
72+
[Fact]
73+
public void SyncConfigure()
74+
{
75+
using var ctx = new MySyncContext(Writer);
76+
using var conn = Create();
77+
Assert.Equal(0, ctx.OpCount);
78+
Assert.True(conn.Configure());
79+
Assert.Equal(0, ctx.OpCount);
80+
}
81+
82+
[Theory]
83+
[InlineData(true)] // fail: Expected: Not RanToCompletion, Actual: RanToCompletion
84+
[InlineData(false)] // pass
85+
public async Task AsyncConfigure(bool continueOnCapturedContext)
86+
{
87+
using var ctx = new MySyncContext(Writer);
88+
using var conn = Create();
89+
90+
LogNoTime($"Context initial: {ctx}");
91+
await Task.Delay(500);
92+
await conn.GetDatabase().PingAsync(); // ensure we're all ready
93+
ctx.Reset();
94+
LogNoTime($"Context before: {ctx}");
95+
96+
Assert.Equal(0, ctx.OpCount);
97+
Assert.True(await conn.ConfigureAsync(Writer).ConfigureAwait(continueOnCapturedContext), "config ran");
98+
99+
AssertState(continueOnCapturedContext, ctx);
100+
}
101+
102+
[Theory]
103+
[InlineData(true)]
104+
[InlineData(false)]
105+
public async Task ConnectAsync(bool continueOnCapturedContext)
106+
{
107+
using var ctx = new MySyncContext(Writer);
108+
var config = GetConfiguration(); // not ideal, but sufficient
109+
await ConnectionMultiplexer.ConnectAsync(config, Writer).ConfigureAwait(continueOnCapturedContext);
110+
111+
AssertState(continueOnCapturedContext, ctx);
112+
}
113+
114+
public sealed class MySyncContext : SynchronizationContext, IDisposable
115+
{
116+
private readonly SynchronizationContext? _previousContext;
117+
private readonly TextWriter? _log;
118+
public MySyncContext(TextWriter? log)
119+
{
120+
_previousContext = Current;
121+
_log = log;
122+
SetSynchronizationContext(this);
123+
}
124+
public int OpCount => Thread.VolatileRead(ref _opCount);
125+
private int _opCount;
126+
private void Incr()
127+
{
128+
Interlocked.Increment(ref _opCount);
129+
}
130+
131+
public void Reset() => Thread.VolatileWrite(ref _opCount, 0);
132+
133+
public override string ToString() => $"Sync context ({(IsCurrent ? "active" : "inactive")}): {OpCount}";
134+
135+
void IDisposable.Dispose() => SetSynchronizationContext(_previousContext);
136+
137+
public override void Post(SendOrPostCallback d, object? state)
138+
{
139+
_log?.WriteLine("sync-ctx: Post");
140+
Incr();
141+
ThreadPool.QueueUserWorkItem(static state =>
142+
{
143+
var tuple = (Tuple<MySyncContext, SendOrPostCallback, object?>)state!;
144+
tuple.Item1.Invoke(tuple.Item2, tuple.Item3);
145+
}, Tuple.Create<MySyncContext, SendOrPostCallback, object?>(this, d, state));
146+
}
147+
148+
private void Invoke(SendOrPostCallback d, object? state)
149+
{
150+
_log?.WriteLine("sync-ctx: Invoke");
151+
if (!IsCurrent) SetSynchronizationContext(this);
152+
d(state);
153+
}
154+
155+
public override void Send(SendOrPostCallback d, object? state)
156+
{
157+
_log?.WriteLine("sync-ctx: Send");
158+
Incr();
159+
Invoke(d, state);
160+
}
161+
162+
public bool IsCurrent => ReferenceEquals(this, Current);
163+
164+
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
165+
{
166+
Incr();
167+
return base.Wait(waitHandles, waitAll, millisecondsTimeout);
168+
}
169+
public override void OperationStarted()
170+
{
171+
Incr();
172+
base.OperationStarted();
173+
}
174+
public override void OperationCompleted()
175+
{
176+
Incr();
177+
base.OperationCompleted();
178+
}
179+
}
180+
181+
}
182+
}

0 commit comments

Comments
 (0)