diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs index b0388773d0f0f8..4f35df94ce11a2 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs @@ -50,6 +50,7 @@ protected PipeReader() { } public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; } + public static System.IO.Pipelines.PipeReader Create(System.Buffers.ReadOnlySequence sequence) { throw null; } [System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")] public virtual void OnWriterCompleted(System.Action callback, object? state) { } public abstract System.Threading.Tasks.ValueTask ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index db4b9fb79535bd..539d5f0dcef310 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -31,6 +31,7 @@ + diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs index a8740209ab88ed..238fb5ba1c4f21 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs @@ -104,6 +104,16 @@ public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOp return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default); } + /// + /// Creates a wrapping the specified . + /// + /// The sequence. + /// A that wraps the . + public static PipeReader Create(ReadOnlySequence sequence) + { + return new SequencePipeReader(sequence); + } + /// Asynchronously reads the bytes from the and writes them to the specified , using a specified buffer size and cancellation token. /// The pipe writer to which the contents of the current stream will be copied. /// The token to monitor for cancellation requests. The default value is . diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/SequencePipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/SequencePipeReader.cs new file mode 100644 index 00000000000000..963a595cb1fbc0 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/SequencePipeReader.cs @@ -0,0 +1,97 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + internal sealed class SequencePipeReader : PipeReader + { + private ReadOnlySequence _sequence; + private bool _isReaderCompleted; + + private int _cancelNext; + + public SequencePipeReader(ReadOnlySequence sequence) + { + _sequence = sequence; + } + + /// + public override void AdvanceTo(SequencePosition consumed) + { + AdvanceTo(consumed, consumed); + } + + /// + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + ThrowIfCompleted(); + + // Fast path: did we consume everything? + if (consumed.Equals(_sequence.End)) + { + _sequence = ReadOnlySequence.Empty; + return; + } + + _sequence = _sequence.Slice(consumed); + } + + /// + public override void CancelPendingRead() + { + Interlocked.Exchange(ref _cancelNext, 1); + } + + /// + public override void Complete(Exception? exception = null) + { + if (_isReaderCompleted) + { + return; + } + + _isReaderCompleted = true; + _sequence = ReadOnlySequence.Empty; + } + + /// + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + if (TryRead(out ReadResult result)) + { + return new ValueTask(result); + } + + result = new ReadResult(ReadOnlySequence.Empty, isCanceled: false, isCompleted: true); + return new ValueTask(result); + } + + /// + public override bool TryRead(out ReadResult result) + { + ThrowIfCompleted(); + + bool isCancellationRequested = Interlocked.Exchange(ref _cancelNext, 0) == 1; + if (isCancellationRequested || _sequence.Length > 0) + { + result = new ReadResult(_sequence, isCancellationRequested, isCompleted: true); + return true; + } + + result = default; + return false; + } + + private void ThrowIfCompleted() + { + if (_isReaderCompleted) + { + ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); + } + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/SequencePipeReaderTests.cs b/src/libraries/System.IO.Pipelines/tests/SequencePipeReaderTests.cs new file mode 100644 index 00000000000000..d12e4f1ec84704 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/tests/SequencePipeReaderTests.cs @@ -0,0 +1,232 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class SequencePipeReaderTests + { + [Fact] + public async Task CanRead() + { + var sequence = new ReadOnlySequence(Encoding.ASCII.GetBytes("Hello World")); + var reader = PipeReader.Create(sequence); + + ReadResult readResult = await reader.ReadAsync(); + ReadOnlySequence buffer = readResult.Buffer; + + Assert.Equal(11, buffer.Length); + Assert.True(buffer.IsSingleSegment); + Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.AdvanceTo(buffer.End); + reader.Complete(); + } + + [Fact] + public async Task TryReadReturnsTrueIfBufferedBytesAndNotExaminedEverything() + { + var sequence = new ReadOnlySequence(Encoding.ASCII.GetBytes("Hello World")); + var reader = PipeReader.Create(sequence); + + ReadResult readResult = await reader.ReadAsync(); + ReadOnlySequence buffer = readResult.Buffer; + Assert.Equal(11, buffer.Length); + Assert.True(buffer.IsSingleSegment); + reader.AdvanceTo(buffer.Start, buffer.GetPosition(5)); + + Assert.True(reader.TryRead(out readResult)); + Assert.Equal(11, buffer.Length); + Assert.True(buffer.IsSingleSegment); + Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.Complete(); + } + + [Fact] + public async Task TryReadReturnsFalseIfBufferedBytesAndEverythingExamined() + { + var sequence = new ReadOnlySequence(Encoding.ASCII.GetBytes("Hello World")); + var reader = PipeReader.Create(sequence); + + ReadResult readResult = await reader.ReadAsync(); + ReadOnlySequence buffer = readResult.Buffer; + Assert.Equal(11, buffer.Length); + Assert.True(buffer.IsSingleSegment); + reader.AdvanceTo(buffer.End); + + Assert.False(reader.TryRead(out readResult)); + reader.Complete(); + } + + [Fact] + public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow() + { + var sequence = ReadOnlySequence.Empty; + PipeReader reader = PipeReader.Create(sequence); + ReadResult readResult = await reader.ReadAsync(); + Assert.True(readResult.Buffer.IsEmpty); + Assert.True(readResult.IsCompleted); + reader.AdvanceTo(readResult.Buffer.End); + + readResult = await reader.ReadAsync(); + Assert.True(readResult.Buffer.IsEmpty); + Assert.True(readResult.IsCompleted); + reader.AdvanceTo(readResult.Buffer.End); + reader.Complete(); + } + + [Fact] + public async Task DataCanBeReadMultipleTimes() + { + var helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var sequence = new ReadOnlySequence(helloBytes); + PipeReader reader = PipeReader.Create(sequence); + + + ReadResult readResult = await reader.ReadAsync(); + ReadOnlySequence buffer = readResult.Buffer; + reader.AdvanceTo(buffer.Start, buffer.End); + + // Make sure IsCompleted is true + readResult = await reader.ReadAsync(); + buffer = readResult.Buffer; + reader.AdvanceTo(buffer.Start, buffer.End); + Assert.True(readResult.IsCompleted); + + var value = await ReadFromPipeAsString(reader); + Assert.Equal("Hello World", value); + reader.Complete(); + } + + [Fact] + public async Task NextReadAfterPartiallyExaminedReturnsImmediately() + { + var sequence = new ReadOnlySequence(Encoding.ASCII.GetBytes(new string('a', 10000))); + PipeReader reader = PipeReader.Create(sequence); + + ReadResult readResult = await reader.ReadAsync(); + reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.GetPosition(2048)); + + ValueTask task = reader.ReadAsync(); + + // This should complete synchronously since + Assert.True(task.IsCompleted); + + readResult = await task; + reader.AdvanceTo(readResult.Buffer.End); + reader.Complete(); + } + + [Fact] + public async Task CompleteReaderWithoutAdvanceDoesNotThrow() + { + PipeReader reader = PipeReader.Create(ReadOnlySequence.Empty); + await reader.ReadAsync(); + reader.Complete(); + } + + [Fact] + public async Task AdvanceAfterCompleteThrows() + { + PipeReader reader = PipeReader.Create(new ReadOnlySequence(new byte[100])); + ReadOnlySequence buffer = (await reader.ReadAsync()).Buffer; + + reader.Complete(); + + Assert.Throws(() => reader.AdvanceTo(buffer.End)); + } + + [Fact] + public async Task ThrowsOnReadAfterCompleteReader() + { + PipeReader reader = PipeReader.Create(ReadOnlySequence.Empty); + + reader.Complete(); + await Assert.ThrowsAsync(async () => await reader.ReadAsync()); + } + + [Fact] + public void TryReadAfterCancelPendingReadReturnsTrue() + { + PipeReader reader = PipeReader.Create(ReadOnlySequence.Empty); + + reader.CancelPendingRead(); + + Assert.True(reader.TryRead(out ReadResult result)); + Assert.True(result.IsCanceled); + reader.AdvanceTo(result.Buffer.End); + reader.Complete(); + } + + [Fact] + public async Task ReadAsyncReturnsCanceledIfCanceledBeforeRead() + { + var sequence = new ReadOnlySequence(new byte[10000]); + PipeReader reader = PipeReader.Create(sequence); + + // Make sure state isn't used from before + for (var i = 0; i < 3; i++) + { + reader.CancelPendingRead(); + ValueTask readResultTask = reader.ReadAsync(); + Assert.True(readResultTask.IsCompleted); + ReadResult readResult = readResultTask.GetAwaiter().GetResult(); + Assert.True(readResult.IsCanceled); + readResult = await reader.ReadAsync(); + reader.AdvanceTo(readResult.Buffer.End); + } + + reader.Complete(); + } + + [Fact] + public async Task ReadAsyncReturnsCanceledInterleaved() + { + var sequence = new ReadOnlySequence(new byte[10000]); + PipeReader reader = PipeReader.Create(sequence); + + // Cancel and Read interleaved to confirm cancellations are independent + for (var i = 0; i < 3; i++) + { + reader.CancelPendingRead(); + ValueTask readResultTask = reader.ReadAsync(); + Assert.True(readResultTask.IsCompleted); + ReadResult readResult = readResultTask.GetAwaiter().GetResult(); + Assert.True(readResult.IsCanceled); + + readResult = await reader.ReadAsync(); + Assert.False(readResult.IsCanceled); + } + + reader.Complete(); + } + + [Fact] + public void OnWriterCompletedNoops() + { + bool fired = false; + PipeReader reader = PipeReader.Create(ReadOnlySequence.Empty); +#pragma warning disable CS0618 // Type or member is obsolete + reader.OnWriterCompleted((_, __) => { fired = true; }, null); +#pragma warning restore CS0618 // Type or member is obsolete + reader.Complete(); + Assert.False(fired); + } + + private static async Task ReadFromPipeAsString(PipeReader reader) + { + ReadResult readResult = await reader.ReadAsync(); + var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray()); + reader.AdvanceTo(readResult.Buffer.End); + return result; + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj index d4b9d67b32f668..fe6ecb26ae0a92 100644 --- a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj +++ b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj @@ -30,6 +30,7 @@ +