From 78de0b0ccb8efb8585f288157b7c5ad9c0cc874f Mon Sep 17 00:00:00 2001 From: David Fowler Date: Fri, 31 Jul 2020 20:11:53 -0700 Subject: [PATCH 1/4] Implement CopyToAsync in the FileBufferingReadStream - overrride Span and Memory overloads and implement array overloads in terms of those overloads. - Implemented CopyToAsync (but not CopyTo) - Added tests Fixes #24032 --- .../src/FileBufferingReadStream.cs | 89 ++++++++++++++----- .../test/FileBufferingReadStreamTests.cs | 75 +++++++++++++++- 2 files changed, 140 insertions(+), 24 deletions(-) diff --git a/src/Http/WebUtilities/src/FileBufferingReadStream.cs b/src/Http/WebUtilities/src/FileBufferingReadStream.cs index 66c53f695114..50b77f8518e2 100644 --- a/src/Http/WebUtilities/src/FileBufferingReadStream.cs +++ b/src/Http/WebUtilities/src/FileBufferingReadStream.cs @@ -208,39 +208,41 @@ private Stream CreateTempFile() FileOptions.Asynchronous | FileOptions.DeleteOnClose | FileOptions.SequentialScan); } - public override int Read(byte[] buffer, int offset, int count) + public override int Read(Span buffer) { ThrowIfDisposed(); - if (_buffer.Position < _buffer.Length || _completelyBuffered) + + if (_completelyBuffered) { // Just read from the buffer - return _buffer.Read(buffer, offset, (int)Math.Min(count, _buffer.Length - _buffer.Position)); + return _buffer.Read(buffer); } - int read = _inner.Read(buffer, offset, count); + var read = _inner.Read(buffer); if (_bufferLimit.HasValue && _bufferLimit - read < _buffer.Length) { - Dispose(); throw new IOException("Buffer limit exceeded."); } - if (_inMemory && _buffer.Length + read > _memoryThreshold) + // We're about to go over the threshold, switch to a file + if (_inMemory && _memoryThreshold - read < _buffer.Length) { _inMemory = false; var oldBuffer = _buffer; _buffer = CreateTempFile(); if (_rentedBuffer == null) { + // Copy data from the in memory buffer to the file stream using a pooled buffer oldBuffer.Position = 0; var rentedBuffer = _bytePool.Rent(Math.Min((int)oldBuffer.Length, _maxRentedBufferSize)); try { - var copyRead = oldBuffer.Read(rentedBuffer, 0, rentedBuffer.Length); + var copyRead = oldBuffer.Read(rentedBuffer); while (copyRead > 0) { - _buffer.Write(rentedBuffer, 0, copyRead); - copyRead = oldBuffer.Read(rentedBuffer, 0, rentedBuffer.Length); + _buffer.Write(rentedBuffer.AsSpan(0, copyRead)); + copyRead = oldBuffer.Read(rentedBuffer); } } finally @@ -250,7 +252,7 @@ public override int Read(byte[] buffer, int offset, int count) } else { - _buffer.Write(_rentedBuffer, 0, (int)oldBuffer.Length); + _buffer.Write(_rentedBuffer.AsSpan(0, (int)oldBuffer.Length)); _bytePool.Return(_rentedBuffer); _rentedBuffer = null; } @@ -258,7 +260,7 @@ public override int Read(byte[] buffer, int offset, int count) if (read > 0) { - _buffer.Write(buffer, offset, read); + _buffer.Write(buffer.Slice(0, read)); } else { @@ -268,24 +270,34 @@ public override int Read(byte[] buffer, int offset, int count) return read; } - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + public override int Read(byte[] buffer, int offset, int count) + { + return Read(buffer.AsSpan(offset, count)); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { ThrowIfDisposed(); - if (_buffer.Position < _buffer.Length || _completelyBuffered) + + if (_completelyBuffered) { // Just read from the buffer - return await _buffer.ReadAsync(buffer, offset, (int)Math.Min(count, _buffer.Length - _buffer.Position), cancellationToken); + return await _buffer.ReadAsync(buffer, cancellationToken); } - int read = await _inner.ReadAsync(buffer, offset, count, cancellationToken); + var read = await _inner.ReadAsync(buffer, cancellationToken); if (_bufferLimit.HasValue && _bufferLimit - read < _buffer.Length) { - Dispose(); throw new IOException("Buffer limit exceeded."); } - if (_inMemory && _buffer.Length + read > _memoryThreshold) + if (_inMemory && _memoryThreshold - read < _buffer.Length) { _inMemory = false; var oldBuffer = _buffer; @@ -297,11 +309,11 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, try { // oldBuffer is a MemoryStream, no need to do async reads. - var copyRead = oldBuffer.Read(rentedBuffer, 0, rentedBuffer.Length); + var copyRead = oldBuffer.Read(rentedBuffer); while (copyRead > 0) { - await _buffer.WriteAsync(rentedBuffer, 0, copyRead, cancellationToken); - copyRead = oldBuffer.Read(rentedBuffer, 0, rentedBuffer.Length); + await _buffer.WriteAsync(rentedBuffer.AsMemory(0, copyRead), cancellationToken); + copyRead = oldBuffer.Read(rentedBuffer); } } finally @@ -311,7 +323,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, } else { - await _buffer.WriteAsync(_rentedBuffer, 0, (int)oldBuffer.Length, cancellationToken); + await _buffer.WriteAsync(_rentedBuffer.AsMemory(0, (int)oldBuffer.Length), cancellationToken); _bytePool.Return(_rentedBuffer); _rentedBuffer = null; } @@ -319,7 +331,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, if (read > 0) { - await _buffer.WriteAsync(buffer, offset, read, cancellationToken); + await _buffer.WriteAsync(buffer.Slice(0, read), cancellationToken); } else { @@ -349,6 +361,39 @@ public override void Flush() throw new NotSupportedException(); } + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + // If we're completed buffered then copy from the underlying source + if (_completelyBuffered) + { + return _buffer.CopyToAsync(destination, bufferSize, cancellationToken); + } + + async Task CopyToAsyncImpl() + { + // At least a 4K buffer + byte[] buffer = _bytePool.Rent(Math.Min(bufferSize, 4096)); + try + { + while (true) + { + int bytesRead = await ReadAsync(buffer, cancellationToken); + if (bytesRead == 0) + { + break; + } + await destination.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken); + } + } + finally + { + _bytePool.Return(buffer); + } + } + + return CopyToAsyncImpl(); + } + protected override void Dispose(bool disposing) { if (!_disposed) diff --git a/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs b/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs index 1c4be6250813..c8c506ca7caf 100644 --- a/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs +++ b/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; using System.IO; +using System.Linq; using System.Text; using System.Threading.Tasks; using Moq; @@ -157,7 +158,6 @@ public void FileBufferingReadStream_SyncReadWithOnDiskLimit_EnforcesLimit() Assert.Equal("Buffer limit exceeded.", exception.Message); Assert.False(stream.InMemory); Assert.NotNull(stream.TempFileName); - Assert.False(File.Exists(tempFileName)); } Assert.False(File.Exists(tempFileName)); @@ -287,7 +287,6 @@ public async Task FileBufferingReadStream_AsyncReadWithOnDiskLimit_EnforcesLimit Assert.Equal("Buffer limit exceeded.", exception.Message); Assert.False(stream.InMemory); Assert.NotNull(stream.TempFileName); - Assert.False(File.Exists(tempFileName)); } Assert.False(File.Exists(tempFileName)); @@ -351,6 +350,78 @@ public async Task FileBufferingReadStream_UsingMemoryStream_RentsAndReturnsRente Assert.False(File.Exists(tempFileName)); } + [Fact] + public async Task CopyToAsyncWorks() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).Reverse().ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 1024 * 1024, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + await stream.CopyToAsync(withoutBufferMs); + + var withBufferMs = new MemoryStream(); + stream.Position = 0; + await stream.CopyToAsync(withBufferMs); + + Assert.Equal(data, withoutBufferMs.ToArray()); + Assert.Equal(data, withBufferMs.ToArray()); + } + + [Fact] + public async Task CopyToAsyncWorksWithFileThreshold() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).Reverse().ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 100, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + await stream.CopyToAsync(withoutBufferMs); + + var withBufferMs = new MemoryStream(); + stream.Position = 0; + await stream.CopyToAsync(withBufferMs); + + Assert.Equal(data, withoutBufferMs.ToArray()); + Assert.Equal(data, withBufferMs.ToArray()); + } + + [Fact] + public async Task ReadAsyncThenCopyToAsyncWorks() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 1024 * 1024, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + var buffer = new byte[100]; + await stream.ReadAsync(buffer); + await stream.CopyToAsync(withoutBufferMs); + + Assert.Equal(data.AsMemory(0, 100).ToArray(), buffer); + Assert.Equal(data.AsMemory(100).ToArray(), withoutBufferMs.ToArray()); + } + + [Fact] + public async Task ReadThenCopyToAsyncWorks() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 1024 * 1024, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + var buffer = new byte[100]; + stream.Read(buffer); + await stream.CopyToAsync(withoutBufferMs); + + Assert.Equal(data.AsMemory(0, 100).ToArray(), buffer); + Assert.Equal(data.AsMemory(100).ToArray(), withoutBufferMs.ToArray()); + } + private static string GetCurrentDirectory() { return AppContext.BaseDirectory; From 63024625616e0147353f845d97ca64c3490caf67 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Fri, 31 Jul 2020 22:03:07 -0700 Subject: [PATCH 2/4] Fixed tests --- .../Formatters/JsonInputFormatterTestBase.cs | 27 +++++++++++++++--- ...ataContractSerializerInputFormatterTest.cs | 28 ++++++++++++++++--- .../test/XmlSerializerInputFormatterTest.cs | 27 +++++++++++++++--- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/Mvc/Mvc.Core/test/Formatters/JsonInputFormatterTestBase.cs b/src/Mvc/Mvc.Core/test/Formatters/JsonInputFormatterTestBase.cs index 83380231e55e..1333f95fcbb6 100644 --- a/src/Mvc/Mvc.Core/test/Formatters/JsonInputFormatterTestBase.cs +++ b/src/Mvc/Mvc.Core/test/Formatters/JsonInputFormatterTestBase.cs @@ -497,8 +497,8 @@ public async Task ReadAsync_DoesNotDisposeBufferedReadStream() var content = "{\"name\": \"Test\"}"; var contentBytes = Encoding.UTF8.GetBytes(content); var httpContext = GetHttpContext(contentBytes); - var testBufferedReadStream = new Mock(httpContext.Request.Body, 1024) { CallBase = true }; - httpContext.Request.Body = testBufferedReadStream.Object; + var testBufferedReadStream = new VerifyDisposeFileBufferingReadStream(httpContext.Request.Body, 1024); + httpContext.Request.Body = testBufferedReadStream; var formatterContext = CreateInputFormatterContext(typeof(ComplexModel), httpContext); @@ -508,8 +508,7 @@ public async Task ReadAsync_DoesNotDisposeBufferedReadStream() // Assert var userModel = Assert.IsType(result.Model); Assert.Equal("Test", userModel.Name); - - testBufferedReadStream.Verify(v => v.DisposeAsync(), Times.Never()); + Assert.False(testBufferedReadStream.Disposed); } [Fact] @@ -635,5 +634,25 @@ protected sealed class ComplexModel public byte Small { get; set; } } + + private class VerifyDisposeFileBufferingReadStream : FileBufferingReadStream + { + public bool Disposed { get; private set; } + public VerifyDisposeFileBufferingReadStream(Stream inner, int memoryThreshold) : base(inner, memoryThreshold) + { + } + + protected override void Dispose(bool disposing) + { + Disposed = true; + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + Disposed = true; + return base.DisposeAsync(); + } + } } } diff --git a/src/Mvc/Mvc.Formatters.Xml/test/XmlDataContractSerializerInputFormatterTest.cs b/src/Mvc/Mvc.Formatters.Xml/test/XmlDataContractSerializerInputFormatterTest.cs index aff89dfc780a..539d75583795 100644 --- a/src/Mvc/Mvc.Formatters.Xml/test/XmlDataContractSerializerInputFormatterTest.cs +++ b/src/Mvc/Mvc.Formatters.Xml/test/XmlDataContractSerializerInputFormatterTest.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Buffers; using System.IO; using System.Linq; using System.Runtime.Serialization; @@ -182,8 +183,8 @@ public async Task ReadAsync_DoesNotDisposeBufferedStreamIfItDidNotCreateIt() var contentBytes = Encoding.UTF8.GetBytes(input); var httpContext = new DefaultHttpContext(); - var testBufferedReadStream = new Mock(new MemoryStream(contentBytes), 1024) { CallBase = true }; - httpContext.Request.Body = testBufferedReadStream.Object; + var testBufferedReadStream = new VerifyDisposeFileBufferingReadStream(new MemoryStream(contentBytes), 1024); + httpContext.Request.Body = testBufferedReadStream; var context = GetInputFormatterContext(httpContext, typeof(TestLevelOne)); // Act @@ -196,8 +197,7 @@ public async Task ReadAsync_DoesNotDisposeBufferedStreamIfItDidNotCreateIt() Assert.Equal(expectedInt, model.SampleInt); Assert.Equal(expectedString, model.sampleString); - - testBufferedReadStream.Verify(v => v.DisposeAsync(), Times.Never()); + Assert.False(testBufferedReadStream.Disposed); } [Fact] @@ -773,5 +773,25 @@ public override void OnCompleted(Func callback, object state) // do not do anything } } + + private class VerifyDisposeFileBufferingReadStream : FileBufferingReadStream + { + public bool Disposed { get; private set; } + public VerifyDisposeFileBufferingReadStream(Stream inner, int memoryThreshold) : base(inner, memoryThreshold) + { + } + + protected override void Dispose(bool disposing) + { + Disposed = true; + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + Disposed = true; + return base.DisposeAsync(); + } + } } } \ No newline at end of file diff --git a/src/Mvc/Mvc.Formatters.Xml/test/XmlSerializerInputFormatterTest.cs b/src/Mvc/Mvc.Formatters.Xml/test/XmlSerializerInputFormatterTest.cs index 366e1a8dbc6d..9b35c9f970f5 100644 --- a/src/Mvc/Mvc.Formatters.Xml/test/XmlSerializerInputFormatterTest.cs +++ b/src/Mvc/Mvc.Formatters.Xml/test/XmlSerializerInputFormatterTest.cs @@ -638,8 +638,8 @@ public async Task ReadAsync_DoesNotDisposeBufferedStreamIfItDidNotCreateIt() var contentBytes = Encoding.UTF8.GetBytes(input); var httpContext = new DefaultHttpContext(); - var testBufferedReadStream = new Mock(new MemoryStream(contentBytes), 1024) { CallBase = true }; - httpContext.Request.Body = testBufferedReadStream.Object; + var testBufferedReadStream = new VerifyDisposeFileBufferingReadStream(new MemoryStream(contentBytes), 1024); + httpContext.Request.Body = testBufferedReadStream; var context = GetInputFormatterContext(httpContext, typeof(TestLevelOne)); // Act @@ -652,8 +652,7 @@ public async Task ReadAsync_DoesNotDisposeBufferedStreamIfItDidNotCreateIt() Assert.Equal(expectedInt, model.SampleInt); Assert.Equal(expectedString, model.sampleString); - - testBufferedReadStream.Verify(v => v.DisposeAsync(), Times.Never()); + Assert.False(testBufferedReadStream.Disposed); } private InputFormatterContext GetInputFormatterContext(byte[] contentBytes, Type modelType) @@ -713,5 +712,25 @@ public override void OnCompleted(Func callback, object state) // do not do anything } } + + private class VerifyDisposeFileBufferingReadStream : FileBufferingReadStream + { + public bool Disposed { get; private set; } + public VerifyDisposeFileBufferingReadStream(Stream inner, int memoryThreshold) : base(inner, memoryThreshold) + { + } + + protected override void Dispose(bool disposing) + { + Disposed = true; + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + Disposed = true; + return base.DisposeAsync(); + } + } } } From 9c32cf00919541e09ff5b4eb0ee3613bff1ac525 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 1 Aug 2020 00:53:40 -0700 Subject: [PATCH 3/4] Support partial reads then seeking and re-reading - Added more tests as this was a missing scenario --- .../src/FileBufferingReadStream.cs | 4 +- .../test/FileBufferingReadStreamTests.cs | 47 +++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/Http/WebUtilities/src/FileBufferingReadStream.cs b/src/Http/WebUtilities/src/FileBufferingReadStream.cs index 50b77f8518e2..f03fd29edd25 100644 --- a/src/Http/WebUtilities/src/FileBufferingReadStream.cs +++ b/src/Http/WebUtilities/src/FileBufferingReadStream.cs @@ -212,7 +212,7 @@ public override int Read(Span buffer) { ThrowIfDisposed(); - if (_completelyBuffered) + if (_buffer.Position < _buffer.Length || _completelyBuffered) { // Just read from the buffer return _buffer.Read(buffer); @@ -284,7 +284,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation { ThrowIfDisposed(); - if (_completelyBuffered) + if (_buffer.Position < _buffer.Length || _completelyBuffered) { // Just read from the buffer return await _buffer.ReadAsync(buffer, cancellationToken); diff --git a/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs b/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs index c8c506ca7caf..1bb481763a2e 100644 --- a/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs +++ b/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs @@ -415,11 +415,52 @@ public async Task ReadThenCopyToAsyncWorks() var withoutBufferMs = new MemoryStream(); var buffer = new byte[100]; - stream.Read(buffer); + var read = stream.Read(buffer); await stream.CopyToAsync(withoutBufferMs); - Assert.Equal(data.AsMemory(0, 100).ToArray(), buffer); - Assert.Equal(data.AsMemory(100).ToArray(), withoutBufferMs.ToArray()); + Assert.Equal(100, read); + Assert.Equal(data.AsMemory(0, read).ToArray(), buffer); + Assert.Equal(data.AsMemory(read).ToArray(), withoutBufferMs.ToArray()); + } + + [Fact] + public void PartialReadThenSeekReplaysBuffer() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 1024 * 1024, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + var buffer = new byte[100]; + var read1 = stream.Read(buffer); + stream.Position = 0; + var buffer2 = new byte[200]; + var read2 = stream.Read(buffer2); + Assert.Equal(100, read1); + Assert.Equal(100, read2); + Assert.Equal(data.AsMemory(0, read1).ToArray(), buffer); + Assert.Equal(data.AsMemory(0, read2).ToArray(), buffer2.AsMemory(0, read2).ToArray()); + } + + [Fact] + public async Task PartialReadAsyncThenSeekReplaysBuffer() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 1024 * 1024, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + var buffer = new byte[100]; + var read1 = await stream.ReadAsync(buffer); + stream.Position = 0; + var buffer2 = new byte[200]; + var read2 = await stream.ReadAsync(buffer2); + Assert.Equal(100, read1); + Assert.Equal(100, read2); + Assert.Equal(data.AsMemory(0, read1).ToArray(), buffer); + Assert.Equal(data.AsMemory(0, read2).ToArray(), buffer2.AsMemory(0, read2).ToArray()); } private static string GetCurrentDirectory() From 2812f250cb09201d182256ff86706f04ce1a1c07 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Sat, 1 Aug 2020 00:59:32 -0700 Subject: [PATCH 4/4] Add a seek then CopyToAsync test --- .../test/FileBufferingReadStreamTests.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs b/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs index 1bb481763a2e..e00292acc1cb 100644 --- a/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs +++ b/src/Http/WebUtilities/test/FileBufferingReadStreamTests.cs @@ -423,6 +423,25 @@ public async Task ReadThenCopyToAsyncWorks() Assert.Equal(data.AsMemory(read).ToArray(), withoutBufferMs.ToArray()); } + [Fact] + public async Task ReadThenSeekThenCopyToAsyncWorks() + { + var data = Enumerable.Range(0, 1024).Select(b => (byte)b).ToArray(); + var inner = new MemoryStream(data); + + using var stream = new FileBufferingReadStream(inner, 1024 * 1024, bufferLimit: null, GetCurrentDirectory()); + + var withoutBufferMs = new MemoryStream(); + var buffer = new byte[100]; + var read = stream.Read(buffer); + stream.Position = 0; + await stream.CopyToAsync(withoutBufferMs); + + Assert.Equal(100, read); + Assert.Equal(data.AsMemory(0, read).ToArray(), buffer); + Assert.Equal(data.ToArray(), withoutBufferMs.ToArray()); + } + [Fact] public void PartialReadThenSeekReplaysBuffer() {