Skip to content

Commit 34903da

Browse files
authored
Fix ConcurrentPipeWriterTests (#12383)
1 parent c54a720 commit 34903da

File tree

1 file changed

+141
-71
lines changed

1 file changed

+141
-71
lines changed

src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs

Lines changed: 141 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public async Task PassthroughIfAllFlushesAreAwaited()
2525
};
2626

2727
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
28+
29+
// No need to pass in a real sync object since all the calls in this test are passthrough.
2830
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
2931

3032
var memory = concurrentPipeWriter.GetMemory();
@@ -71,24 +73,31 @@ public async Task QueuesIfFlushIsNotAwaited()
7173
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
7274
};
7375

76+
var sync = new object();
7477
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
75-
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
78+
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
79+
var flushTask0 = default(ValueTask<FlushResult>);
80+
var flushTask1 = default(ValueTask<FlushResult>);
81+
var completeTask = default(ValueTask);
7682

77-
var memory = concurrentPipeWriter.GetMemory();
78-
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
83+
lock (sync)
84+
{
85+
var memory = concurrentPipeWriter.GetMemory();
86+
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
7987

80-
concurrentPipeWriter.Advance(memory.Length);
81-
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
88+
concurrentPipeWriter.Advance(memory.Length);
89+
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
8290

83-
var flushTask0 = concurrentPipeWriter.FlushAsync();
84-
Assert.Equal(1, mockPipeWriter.FlushCallCount);
91+
flushTask0 = concurrentPipeWriter.FlushAsync();
92+
Assert.Equal(1, mockPipeWriter.FlushCallCount);
8593

86-
Assert.False(flushTask0.IsCompleted);
87-
88-
// Since the flush was not awaited, the following API calls are queued.
89-
memory = concurrentPipeWriter.GetMemory();
90-
concurrentPipeWriter.Advance(memory.Length);
91-
var flushTask1 = concurrentPipeWriter.FlushAsync();
94+
Assert.False(flushTask0.IsCompleted);
95+
96+
// Since the flush was not awaited, the following API calls are queued.
97+
memory = concurrentPipeWriter.GetMemory();
98+
concurrentPipeWriter.Advance(memory.Length);
99+
flushTask1 = concurrentPipeWriter.FlushAsync();
100+
}
92101

93102
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
94103
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
@@ -99,11 +108,15 @@ public async Task QueuesIfFlushIsNotAwaited()
99108

100109
mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
101110
pipeWriterFlushTcsArray[0].SetResult(default);
111+
102112
await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
103113

104-
// Since the flush was not awaited, the following API calls are queued.
105-
memory = concurrentPipeWriter.GetMemory();
106-
concurrentPipeWriter.Advance(memory.Length);
114+
lock (sync)
115+
{
116+
// Since the flush was not awaited, the following API calls are queued.
117+
var memory = concurrentPipeWriter.GetMemory();
118+
concurrentPipeWriter.Advance(memory.Length);
119+
}
107120

108121
// We do not need to flush the final bytes, since the incomplete flush will pick it up.
109122
Assert.Equal(2, mockPipeWriter.GetMemoryCallCount);
@@ -115,6 +128,7 @@ public async Task QueuesIfFlushIsNotAwaited()
115128

116129
mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
117130
pipeWriterFlushTcsArray[1].SetResult(default);
131+
118132
await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
119133

120134
// Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times.
@@ -126,7 +140,12 @@ public async Task QueuesIfFlushIsNotAwaited()
126140
Assert.False(flushTask1.IsCompleted);
127141

128142
var completeEx = new Exception();
129-
await concurrentPipeWriter.CompleteAsync(completeEx);
143+
lock (sync)
144+
{
145+
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
146+
}
147+
148+
await completeTask.DefaultTimeout();
130149

131150
// Complete isn't called on the inner PipeWriter until the inner flushes have completed.
132151
Assert.Null(mockPipeWriter.CompleteException);
@@ -151,21 +170,29 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
151170
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
152171
};
153172

173+
var sync = new object();
154174
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
155-
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
175+
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
176+
var memory = default(Memory<byte>);
177+
var flushTask0 = default(ValueTask<FlushResult>);
178+
var flushTask1 = default(ValueTask<FlushResult>);
179+
var completeTask = default(ValueTask);
156180

157-
var memory = concurrentPipeWriter.GetMemory();
158-
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
181+
lock (sync)
182+
{
183+
memory = concurrentPipeWriter.GetMemory();
184+
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
159185

160-
concurrentPipeWriter.Advance(memory.Length);
161-
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
186+
concurrentPipeWriter.Advance(memory.Length);
187+
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
162188

163-
var flushTask0 = concurrentPipeWriter.FlushAsync();
164-
Assert.Equal(1, mockPipeWriter.FlushCallCount);
165-
Assert.False(flushTask0.IsCompleted);
189+
flushTask0 = concurrentPipeWriter.FlushAsync();
190+
Assert.Equal(1, mockPipeWriter.FlushCallCount);
191+
Assert.False(flushTask0.IsCompleted);
166192

167-
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
168-
memory = concurrentPipeWriter.GetMemory();
193+
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
194+
memory = concurrentPipeWriter.GetMemory();
195+
}
169196

170197
// If the inner flush completes between a call to GetMemory() and Advance(), the outer
171198
// flush completes, and the next flush will pick up the buffered data.
@@ -177,11 +204,14 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
177204
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
178205
Assert.Equal(1, mockPipeWriter.FlushCallCount);
179206

180-
concurrentPipeWriter.Advance(memory.Length);
181-
memory = concurrentPipeWriter.GetMemory();
182-
concurrentPipeWriter.Advance(memory.Length);
207+
lock (sync)
208+
{
209+
concurrentPipeWriter.Advance(memory.Length);
210+
memory = concurrentPipeWriter.GetMemory();
211+
concurrentPipeWriter.Advance(memory.Length);
183212

184-
var flushTask1 = concurrentPipeWriter.FlushAsync();
213+
flushTask1 = concurrentPipeWriter.FlushAsync();
214+
}
185215

186216
// Now that we flushed the ConcurrentPipeWriter again, the GetMemory() and Advance() calls are replayed.
187217
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
@@ -201,7 +231,14 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
201231
Assert.Equal(2, mockPipeWriter.FlushCallCount);
202232

203233
var completeEx = new Exception();
204-
await concurrentPipeWriter.CompleteAsync(completeEx);
234+
235+
lock (sync)
236+
{
237+
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
238+
}
239+
240+
await completeTask.DefaultTimeout();
241+
205242
Assert.Same(completeEx, mockPipeWriter.CompleteException);
206243
}
207244
}
@@ -217,21 +254,28 @@ public async Task CompleteFlushesQueuedBytes()
217254
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
218255
};
219256

257+
var sync = new object();
220258
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
221-
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
259+
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
260+
var memory = default(Memory<byte>);
261+
var flushTask0 = default(ValueTask<FlushResult>);
262+
var completeTask = default(ValueTask);
222263

223-
var memory = concurrentPipeWriter.GetMemory();
224-
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
264+
lock (sync)
265+
{
266+
memory = concurrentPipeWriter.GetMemory();
267+
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
225268

226-
concurrentPipeWriter.Advance(memory.Length);
227-
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
269+
concurrentPipeWriter.Advance(memory.Length);
270+
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
228271

229-
var flushTask0 = concurrentPipeWriter.FlushAsync();
230-
Assert.Equal(1, mockPipeWriter.FlushCallCount);
231-
Assert.False(flushTask0.IsCompleted);
272+
flushTask0 = concurrentPipeWriter.FlushAsync();
273+
Assert.Equal(1, mockPipeWriter.FlushCallCount);
274+
Assert.False(flushTask0.IsCompleted);
232275

233-
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
234-
memory = concurrentPipeWriter.GetMemory();
276+
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
277+
memory = concurrentPipeWriter.GetMemory();
278+
}
235279

236280
// If the inner flush completes between a call to GetMemory() and Advance(), the outer
237281
// flush completes, and the next flush will pick up the buffered data.
@@ -243,13 +287,19 @@ public async Task CompleteFlushesQueuedBytes()
243287
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
244288
Assert.Equal(1, mockPipeWriter.FlushCallCount);
245289

246-
concurrentPipeWriter.Advance(memory.Length);
247-
memory = concurrentPipeWriter.GetMemory();
248-
concurrentPipeWriter.Advance(memory.Length);
249-
250-
// Complete the ConcurrentPipeWriter without flushing any of the queued data.
251290
var completeEx = new Exception();
252-
await concurrentPipeWriter.CompleteAsync(completeEx);
291+
292+
lock (sync)
293+
{
294+
concurrentPipeWriter.Advance(memory.Length);
295+
memory = concurrentPipeWriter.GetMemory();
296+
concurrentPipeWriter.Advance(memory.Length);
297+
298+
// Complete the ConcurrentPipeWriter without flushing any of the queued data.
299+
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
300+
}
301+
302+
await completeTask.DefaultTimeout();
253303

254304
// Now that we completed the ConcurrentPipeWriter, the GetMemory() and Advance() calls are replayed.
255305
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
@@ -272,45 +322,58 @@ public async Task CancelPendingFlushInterruptsFlushLoop()
272322
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
273323
};
274324

325+
var sync = new object();
275326
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
276-
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
327+
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
328+
var flushTask0 = default(ValueTask<FlushResult>);
329+
var flushTask1 = default(ValueTask<FlushResult>);
330+
var flushTask2 = default(ValueTask<FlushResult>);
331+
var completeTask = default(ValueTask);
277332

278-
var memory = concurrentPipeWriter.GetMemory();
279-
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
333+
lock (sync)
334+
{
335+
var memory = concurrentPipeWriter.GetMemory();
336+
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
280337

281-
concurrentPipeWriter.Advance(memory.Length);
282-
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
338+
concurrentPipeWriter.Advance(memory.Length);
339+
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
283340

284-
var flushTask0 = concurrentPipeWriter.FlushAsync();
285-
Assert.Equal(1, mockPipeWriter.FlushCallCount);
341+
flushTask0 = concurrentPipeWriter.FlushAsync();
342+
Assert.Equal(1, mockPipeWriter.FlushCallCount);
286343

287-
Assert.False(flushTask0.IsCompleted);
288-
289-
// Since the flush was not awaited, the following API calls are queued.
290-
memory = concurrentPipeWriter.GetMemory();
291-
concurrentPipeWriter.Advance(memory.Length);
292-
var flushTask1 = concurrentPipeWriter.FlushAsync();
344+
Assert.False(flushTask0.IsCompleted);
293345

294-
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
295-
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
296-
Assert.Equal(1, mockPipeWriter.FlushCallCount);
346+
// Since the flush was not awaited, the following API calls are queued.
347+
memory = concurrentPipeWriter.GetMemory();
348+
concurrentPipeWriter.Advance(memory.Length);
349+
flushTask1 = concurrentPipeWriter.FlushAsync();
297350

298-
Assert.False(flushTask0.IsCompleted);
299-
Assert.False(flushTask1.IsCompleted);
351+
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
352+
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
353+
Assert.Equal(1, mockPipeWriter.FlushCallCount);
300354

301-
// CancelPendingFlush() does not get queued.
302-
concurrentPipeWriter.CancelPendingFlush();
303-
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
355+
Assert.False(flushTask0.IsCompleted);
356+
Assert.False(flushTask1.IsCompleted);
357+
358+
// CancelPendingFlush() does not get queued.
359+
concurrentPipeWriter.CancelPendingFlush();
360+
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
361+
}
304362

305363
pipeWriterFlushTcsArray[0].SetResult(new FlushResult(isCanceled: true, isCompleted: false));
306364

307365
Assert.True((await flushTask0.DefaultTimeout()).IsCanceled);
308366
Assert.True((await flushTask1.DefaultTimeout()).IsCanceled);
309367

310-
var flushTask2 = concurrentPipeWriter.FlushAsync();
368+
lock (sync)
369+
{
370+
flushTask2 = concurrentPipeWriter.FlushAsync();
371+
}
372+
311373
Assert.False(flushTask2.IsCompleted);
312374

313375
pipeWriterFlushTcsArray[1].SetResult(default);
376+
314377
await flushTask2.DefaultTimeout();
315378

316379
// We do not need to flush the final bytes, since the incomplete flush will pick it up.
@@ -319,7 +382,14 @@ public async Task CancelPendingFlushInterruptsFlushLoop()
319382
Assert.Equal(2, mockPipeWriter.FlushCallCount);
320383

321384
var completeEx = new Exception();
322-
await concurrentPipeWriter.CompleteAsync(completeEx);
385+
386+
lock (sync)
387+
{
388+
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
389+
}
390+
391+
await completeTask.DefaultTimeout();
392+
323393
Assert.Same(completeEx, mockPipeWriter.CompleteException);
324394
}
325395
}

0 commit comments

Comments
 (0)