Skip to content

Fix ConcurrentPipeWriterTests #12383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 24, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 141 additions & 71 deletions src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public async Task PassthroughIfAllFlushesAreAwaited()
};

var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);

// No need to pass in a real sync object since all the calls in this test are passthrough.
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());

var memory = concurrentPipeWriter.GetMemory();
Expand Down Expand Up @@ -71,24 +73,31 @@ public async Task QueuesIfFlushIsNotAwaited()
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};

var sync = new object();
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
var flushTask0 = default(ValueTask<FlushResult>);
var flushTask1 = default(ValueTask<FlushResult>);
var completeTask = default(ValueTask);

var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
lock (sync)
{
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);

concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);

var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);

Assert.False(flushTask0.IsCompleted);

// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
var flushTask1 = concurrentPipeWriter.FlushAsync();
Assert.False(flushTask0.IsCompleted);

// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
flushTask1 = concurrentPipeWriter.FlushAsync();
}

Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Expand All @@ -99,11 +108,15 @@ public async Task QueuesIfFlushIsNotAwaited()

mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
pipeWriterFlushTcsArray[0].SetResult(default);

await mockPipeWriter.FlushTcs.Task.DefaultTimeout();

// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
lock (sync)
{
// Since the flush was not awaited, the following API calls are queued.
var memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
}

// We do not need to flush the final bytes, since the incomplete flush will pick it up.
Assert.Equal(2, mockPipeWriter.GetMemoryCallCount);
Expand All @@ -115,6 +128,7 @@ public async Task QueuesIfFlushIsNotAwaited()

mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
pipeWriterFlushTcsArray[1].SetResult(default);

await mockPipeWriter.FlushTcs.Task.DefaultTimeout();

// Even though we only called flush on the ConcurrentPipeWriter twice, the inner PipeWriter was flushed three times.
Expand All @@ -126,7 +140,12 @@ public async Task QueuesIfFlushIsNotAwaited()
Assert.False(flushTask1.IsCompleted);

var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);
lock (sync)
{
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
}

await completeTask.DefaultTimeout();

// Complete isn't called on the inner PipeWriter until the inner flushes have completed.
Assert.Null(mockPipeWriter.CompleteException);
Expand All @@ -151,21 +170,29 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};

var sync = new object();
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
var memory = default(Memory<byte>);
var flushTask0 = default(ValueTask<FlushResult>);
var flushTask1 = default(ValueTask<FlushResult>);
var completeTask = default(ValueTask);

var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
lock (sync)
{
memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);

concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);

var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);

// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
memory = concurrentPipeWriter.GetMemory();
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush complets.
memory = concurrentPipeWriter.GetMemory();
}

// If the inner flush completes between a call to GetMemory() and Advance(), the outer
// flush completes, and the next flush will pick up the buffered data.
Expand All @@ -177,11 +204,14 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);

concurrentPipeWriter.Advance(memory.Length);
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
lock (sync)
{
concurrentPipeWriter.Advance(memory.Length);
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);

var flushTask1 = concurrentPipeWriter.FlushAsync();
flushTask1 = concurrentPipeWriter.FlushAsync();
}

// Now that we flushed the ConcurrentPipeWriter again, the GetMemory() and Advance() calls are replayed.
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
Expand All @@ -201,7 +231,14 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
Assert.Equal(2, mockPipeWriter.FlushCallCount);

var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);

lock (sync)
{
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
}

await completeTask.DefaultTimeout();

Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
Expand All @@ -217,21 +254,28 @@ public async Task CompleteFlushesQueuedBytes()
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};

var sync = new object();
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
var memory = default(Memory<byte>);
var flushTask0 = default(ValueTask<FlushResult>);
var completeTask = default(ValueTask);

var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
lock (sync)
{
memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);

concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);

var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);
flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
Assert.False(flushTask0.IsCompleted);

// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
memory = concurrentPipeWriter.GetMemory();
// Only GetMemory() is called but not Advance() is not called yet when the first inner flush completes.
memory = concurrentPipeWriter.GetMemory();
}

// If the inner flush completes between a call to GetMemory() and Advance(), the outer
// flush completes, and the next flush will pick up the buffered data.
Expand All @@ -243,13 +287,19 @@ public async Task CompleteFlushesQueuedBytes()
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);

concurrentPipeWriter.Advance(memory.Length);
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);

// Complete the ConcurrentPipeWriter without flushing any of the queued data.
var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);

lock (sync)
{
concurrentPipeWriter.Advance(memory.Length);
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);

// Complete the ConcurrentPipeWriter without flushing any of the queued data.
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
}

await completeTask.DefaultTimeout();

// Now that we completed the ConcurrentPipeWriter, the GetMemory() and Advance() calls are replayed.
// Make sure that MockPipeWriter.SlabMemoryPoolBlockSize matches SlabMemoryPool._blockSize or else
Expand All @@ -272,45 +322,58 @@ public async Task CancelPendingFlushInterruptsFlushLoop()
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
};

var sync = new object();
var mockPipeWriter = new MockPipeWriter(pipeWriterFlushTcsArray);
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, new object());
var concurrentPipeWriter = new ConcurrentPipeWriter(mockPipeWriter, diagnosticPool, sync);
var flushTask0 = default(ValueTask<FlushResult>);
var flushTask1 = default(ValueTask<FlushResult>);
var flushTask2 = default(ValueTask<FlushResult>);
var completeTask = default(ValueTask);

var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
lock (sync)
{
var memory = concurrentPipeWriter.GetMemory();
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);

concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
concurrentPipeWriter.Advance(memory.Length);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);

var flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);
flushTask0 = concurrentPipeWriter.FlushAsync();
Assert.Equal(1, mockPipeWriter.FlushCallCount);

Assert.False(flushTask0.IsCompleted);

// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
var flushTask1 = concurrentPipeWriter.FlushAsync();
Assert.False(flushTask0.IsCompleted);

Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);
// Since the flush was not awaited, the following API calls are queued.
memory = concurrentPipeWriter.GetMemory();
concurrentPipeWriter.Advance(memory.Length);
flushTask1 = concurrentPipeWriter.FlushAsync();

Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);
Assert.Equal(1, mockPipeWriter.GetMemoryCallCount);
Assert.Equal(1, mockPipeWriter.AdvanceCallCount);
Assert.Equal(1, mockPipeWriter.FlushCallCount);

// CancelPendingFlush() does not get queued.
concurrentPipeWriter.CancelPendingFlush();
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);

// CancelPendingFlush() does not get queued.
concurrentPipeWriter.CancelPendingFlush();
Assert.Equal(1, mockPipeWriter.CancelPendingFlushCallCount);
}

pipeWriterFlushTcsArray[0].SetResult(new FlushResult(isCanceled: true, isCompleted: false));

Assert.True((await flushTask0.DefaultTimeout()).IsCanceled);
Assert.True((await flushTask1.DefaultTimeout()).IsCanceled);

var flushTask2 = concurrentPipeWriter.FlushAsync();
lock (sync)
{
flushTask2 = concurrentPipeWriter.FlushAsync();
}

Assert.False(flushTask2.IsCompleted);

pipeWriterFlushTcsArray[1].SetResult(default);

await flushTask2.DefaultTimeout();

// We do not need to flush the final bytes, since the incomplete flush will pick it up.
Expand All @@ -319,7 +382,14 @@ public async Task CancelPendingFlushInterruptsFlushLoop()
Assert.Equal(2, mockPipeWriter.FlushCallCount);

var completeEx = new Exception();
await concurrentPipeWriter.CompleteAsync(completeEx);

lock (sync)
{
completeTask = concurrentPipeWriter.CompleteAsync(completeEx);
}

await completeTask.DefaultTimeout();

Assert.Same(completeEx, mockPipeWriter.CompleteException);
}
}
Expand Down