From cbfac24dad847342c3a8cf7cb9da30b75e745197 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Fri, 19 Jul 2019 14:49:04 -0700 Subject: [PATCH] Fix ConcurrentPipeWriterTests --- .../Core/test/ConcurrentPipeWriterTests.cs | 212 ++++++++++++------ 1 file changed, 141 insertions(+), 71 deletions(-) diff --git a/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs b/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs index 3a5ac7a956a0..b76c887a445e 100644 --- a/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs +++ b/src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs @@ -25,6 +25,8 @@ public async Task PassthroughIfAllFlushesAreAwaited() }; var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); + + // No need to pass in a real sync object since all the calls in this test are passthrough. var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object()); var memory = concurrentPipeWriter.GetMemory(); @@ -71,24 +73,31 @@ public async Task QueuesIfFlushIsNotAwaited() new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), }; + var sync = new object(); var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); - var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object()); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync); + var flushTask0 = default(ValueTask); + var flushTask1 = default(ValueTask); + var completeTask = default(ValueTask); - var memory = concurrentPipeWriter.GetMemory(); - Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + lock (sync) + { + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); - concurrentPipeWriter.Advance(memory.Length); - Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); - var flushTask0 = concurrentPipeWriter.FlushAsync(); - Assert.Equal(1, mockPipeWriter.FlushCallCount); + flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); - Assert.False(flushTask0.IsCompleted); - - // Since the flush was not awaited, the following API calls are queued. - memory = concurrentPipeWriter.GetMemory(); - concurrentPipeWriter.Advance(memory.Length); - var flushTask1 = concurrentPipeWriter.FlushAsync(); + Assert.False(flushTask0.IsCompleted); + + // Since the flush was not awaited, the following API calls are queued. + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + flushTask1 = concurrentPipeWriter.FlushAsync(); + } Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); Assert.Equal(1, mockPipeWriter.AdvanceCallCount); @@ -99,11 +108,15 @@ public async Task QueuesIfFlushIsNotAwaited() mockPipeWriter.FlushTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); pipeWriterFlushTcsArray[0].SetResult(default); + await mockPipeWriter.FlushTcs.Task.DefaultTimeout(); - // Since the flush was not awaited, the following API calls are queued. - memory = concurrentPipeWriter.GetMemory(); - concurrentPipeWriter.Advance(memory.Length); + lock (sync) + { + // Since the flush was not awaited, the following API calls are queued. + var memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + } // We do not need to flush the final bytes, since the incomplete flush will pick it up. Assert.Equal(2, mockPipeWriter.GetMemoryCallCount); @@ -115,6 +128,7 @@ public async Task QueuesIfFlushIsNotAwaited() mockPipeWriter.FlushTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); pipeWriterFlushTcsArray[1].SetResult(default); + await mockPipeWriter.FlushTcs.Task.DefaultTimeout(); // Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times. @@ -126,7 +140,12 @@ public async Task QueuesIfFlushIsNotAwaited() Assert.False(flushTask1.IsCompleted); var completeEx = new Exception(); - await concurrentPipeWriter.CompleteAsync(completeEx); + lock (sync) + { + completeTask = concurrentPipeWriter.CompleteAsync(completeEx); + } + + await completeTask.DefaultTimeout(); // Complete isn't called on the inner PipeWriter until the inner flushes have completed. Assert.Null(mockPipeWriter.CompleteException); @@ -151,21 +170,29 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance() new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), }; + var sync = new object(); var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); - var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object()); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync); + var memory = default(Memory); + var flushTask0 = default(ValueTask); + var flushTask1 = default(ValueTask); + var completeTask = default(ValueTask); - var memory = concurrentPipeWriter.GetMemory(); - Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + lock (sync) + { + memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); - concurrentPipeWriter.Advance(memory.Length); - Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); - var flushTask0 = concurrentPipeWriter.FlushAsync(); - Assert.Equal(1, mockPipeWriter.FlushCallCount); - Assert.False(flushTask0.IsCompleted); + flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + Assert.False(flushTask0.IsCompleted); - // Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets. - memory = concurrentPipeWriter.GetMemory(); + // Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets. + memory = concurrentPipeWriter.GetMemory(); + } // If the inner flush completes between a call to GetMemory() and Advance(), the outer // flush completes, and the next flush will pick up the buffered data. @@ -177,11 +204,14 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance() Assert.Equal(1, mockPipeWriter.AdvanceCallCount); Assert.Equal(1, mockPipeWriter.FlushCallCount); - concurrentPipeWriter.Advance(memory.Length); - memory = concurrentPipeWriter.GetMemory(); - concurrentPipeWriter.Advance(memory.Length); + lock (sync) + { + concurrentPipeWriter.Advance(memory.Length); + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); - var flushTask1 = concurrentPipeWriter.FlushAsync(); + flushTask1 = concurrentPipeWriter.FlushAsync(); + } // Now that we flushed the ConcurrentPipeWriter again, the GetMemory() and Advance() calls are replayed. // Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else @@ -201,7 +231,14 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance() Assert.Equal(2, mockPipeWriter.FlushCallCount); var completeEx = new Exception(); - await concurrentPipeWriter.CompleteAsync(completeEx); + + lock (sync) + { + completeTask = concurrentPipeWriter.CompleteAsync(completeEx); + } + + await completeTask.DefaultTimeout(); + Assert.Same(completeEx, mockPipeWriter.CompleteException); } } @@ -217,21 +254,28 @@ public async Task CompleteFlushesQueuedBytes() new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), }; + var sync = new object(); var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); - var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object()); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync); + var memory = default(Memory); + var flushTask0 = default(ValueTask); + var completeTask = default(ValueTask); - var memory = concurrentPipeWriter.GetMemory(); - Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + lock (sync) + { + memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); - concurrentPipeWriter.Advance(memory.Length); - Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); - var flushTask0 = concurrentPipeWriter.FlushAsync(); - Assert.Equal(1, mockPipeWriter.FlushCallCount); - Assert.False(flushTask0.IsCompleted); + flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); + Assert.False(flushTask0.IsCompleted); - // Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes. - memory = concurrentPipeWriter.GetMemory(); + // Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes. + memory = concurrentPipeWriter.GetMemory(); + } // If the inner flush completes between a call to GetMemory() and Advance(), the outer // flush completes, and the next flush will pick up the buffered data. @@ -243,13 +287,19 @@ public async Task CompleteFlushesQueuedBytes() Assert.Equal(1, mockPipeWriter.AdvanceCallCount); Assert.Equal(1, mockPipeWriter.FlushCallCount); - concurrentPipeWriter.Advance(memory.Length); - memory = concurrentPipeWriter.GetMemory(); - concurrentPipeWriter.Advance(memory.Length); - - // Complete the ConcurrentPipeWriter without flushing any of the queued data. var completeEx = new Exception(); - await concurrentPipeWriter.CompleteAsync(completeEx); + + lock (sync) + { + concurrentPipeWriter.Advance(memory.Length); + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + + // Complete the ConcurrentPipeWriter without flushing any of the queued data. + completeTask = concurrentPipeWriter.CompleteAsync(completeEx); + } + + await completeTask.DefaultTimeout(); // Now that we completed the ConcurrentPipeWriter, the GetMemory() and Advance() calls are replayed. // Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else @@ -272,45 +322,58 @@ public async Task CancelPendingFlushInterruptsFlushLoop() new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), }; + var sync = new object(); var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray); - var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object()); + var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync); + var flushTask0 = default(ValueTask); + var flushTask1 = default(ValueTask); + var flushTask2 = default(ValueTask); + var completeTask = default(ValueTask); - var memory = concurrentPipeWriter.GetMemory(); - Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + lock (sync) + { + var memory = concurrentPipeWriter.GetMemory(); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); - concurrentPipeWriter.Advance(memory.Length); - Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + concurrentPipeWriter.Advance(memory.Length); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); - var flushTask0 = concurrentPipeWriter.FlushAsync(); - Assert.Equal(1, mockPipeWriter.FlushCallCount); + flushTask0 = concurrentPipeWriter.FlushAsync(); + Assert.Equal(1, mockPipeWriter.FlushCallCount); - Assert.False(flushTask0.IsCompleted); - - // Since the flush was not awaited, the following API calls are queued. - memory = concurrentPipeWriter.GetMemory(); - concurrentPipeWriter.Advance(memory.Length); - var flushTask1 = concurrentPipeWriter.FlushAsync(); + Assert.False(flushTask0.IsCompleted); - Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); - Assert.Equal(1, mockPipeWriter.AdvanceCallCount); - Assert.Equal(1, mockPipeWriter.FlushCallCount); + // Since the flush was not awaited, the following API calls are queued. + memory = concurrentPipeWriter.GetMemory(); + concurrentPipeWriter.Advance(memory.Length); + flushTask1 = concurrentPipeWriter.FlushAsync(); - Assert.False(flushTask0.IsCompleted); - Assert.False(flushTask1.IsCompleted); + Assert.Equal(1, mockPipeWriter.GetMemoryCallCount); + Assert.Equal(1, mockPipeWriter.AdvanceCallCount); + Assert.Equal(1, mockPipeWriter.FlushCallCount); - // CancelPendingFlush() does not get queued. - concurrentPipeWriter.CancelPendingFlush(); - Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount); + Assert.False(flushTask0.IsCompleted); + Assert.False(flushTask1.IsCompleted); + + // CancelPendingFlush() does not get queued. + concurrentPipeWriter.CancelPendingFlush(); + Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount); + } pipeWriterFlushTcsArray[0].SetResult(new FlushResult(isCanceled: true, isCompleted: false)); Assert.True((await flushTask0.DefaultTimeout()).IsCanceled); Assert.True((await flushTask1.DefaultTimeout()).IsCanceled); - var flushTask2 = concurrentPipeWriter.FlushAsync(); + lock (sync) + { + flushTask2 = concurrentPipeWriter.FlushAsync(); + } + Assert.False(flushTask2.IsCompleted); pipeWriterFlushTcsArray[1].SetResult(default); + await flushTask2.DefaultTimeout(); // We do not need to flush the final bytes, since the incomplete flush will pick it up. @@ -319,7 +382,14 @@ public async Task CancelPendingFlushInterruptsFlushLoop() Assert.Equal(2, mockPipeWriter.FlushCallCount); var completeEx = new Exception(); - await concurrentPipeWriter.CompleteAsync(completeEx); + + lock (sync) + { + completeTask = concurrentPipeWriter.CompleteAsync(completeEx); + } + + await completeTask.DefaultTimeout(); + Assert.Same(completeEx, mockPipeWriter.CompleteException); } }