Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ protected PipeReader() { }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
public static System.IO.Pipelines.PipeReader Create(System.Buffers.ReadOnlySequence<byte> sequence) { throw null; }
[System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
public virtual void OnWriterCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<Compile Include="System\IO\Pipelines\PipeWriterStream.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
<Compile Include="System\IO\Pipelines\SequencePipeReader.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeReader.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeReaderOptions.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOp
return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default);
}

/// <summary>
/// Creates a <see cref="PipeReader"/> wrapping the specified <see cref="ReadOnlySequence{T}"/>.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this fine? It points to the docs for ReadOnlySequence<T>, with no indication of it being only for bytes. Is this just how things are?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that how the other docs are?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to google this issue (because you can't do ReadOnlySequence<byte>), and the idea here is that there is no docs page to link to for ReadOnlySequence<byte>, but the is for ReadOnlySequence<T>, so that is why I believe. I'm not sure where else I would find docs like this in .NET as I can't recall any off the top of my head at least.

/// </summary>
/// <param name="sequence">The sequence.</param>
/// <returns>A <see cref="PipeReader"/> that wraps the <see cref="ReadOnlySequence{T}"/>.</returns>
public static PipeReader Create(ReadOnlySequence<byte> sequence)
{
return new SequencePipeReader(sequence);
}

/// <summary>Asynchronously reads the bytes from the <see cref="System.IO.Pipelines.PipeReader" /> and writes them to the specified <see cref="System.IO.Pipelines.PipeWriter" />, using a specified buffer size and cancellation token.</summary>
/// <param name="destination">The pipe writer to which the contents of the current stream will be copied.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="System.Threading.CancellationToken.None" />.</param>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Pipelines
{
internal sealed class SequencePipeReader : PipeReader
{
private ReadOnlySequence<byte> _sequence;
private bool _isReaderCompleted;

private int _cancelNext;

public SequencePipeReader(ReadOnlySequence<byte> sequence)
{
_sequence = sequence;
}

/// <inheritdoc />
public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}

/// <inheritdoc />
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
ThrowIfCompleted();

// Fast path: did we consume everything?
if (consumed.Equals(_sequence.End))
{
_sequence = ReadOnlySequence<byte>.Empty;
return;
}

_sequence = _sequence.Slice(consumed);
}

/// <inheritdoc />
public override void CancelPendingRead()
{
Interlocked.Exchange(ref _cancelNext, 1);
}

/// <inheritdoc />
public override void Complete(Exception? exception = null)
{
if (_isReaderCompleted)
{
return;
}

_isReaderCompleted = true;
_sequence = ReadOnlySequence<byte>.Empty;
}

/// <inheritdoc />
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
if (TryRead(out ReadResult result))
{
return new ValueTask<ReadResult>(result);
}

result = new ReadResult(ReadOnlySequence<byte>.Empty, isCanceled: false, isCompleted: true);
return new ValueTask<ReadResult>(result);
}

/// <inheritdoc />
public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();

bool isCancellationRequested = Interlocked.Exchange(ref _cancelNext, 0) == 1;
if (isCancellationRequested || _sequence.Length > 0)
{
result = new ReadResult(_sequence, isCancellationRequested, isCompleted: true);
return true;
}

result = default;
return false;
}

private void ThrowIfCompleted()
{
if (_isReaderCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
}
}
}
232 changes: 232 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/SequencePipeReaderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace System.IO.Pipelines.Tests
{
public class SequencePipeReaderTests
{
[Fact]
public async Task CanRead()
{
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes("Hello World"));
var reader = PipeReader.Create(sequence);

ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;

Assert.Equal(11, buffer.Length);
Assert.True(buffer.IsSingleSegment);
Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));

reader.AdvanceTo(buffer.End);
reader.Complete();
}

[Fact]
public async Task TryReadReturnsTrueIfBufferedBytesAndNotExaminedEverything()
{
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes("Hello World"));
var reader = PipeReader.Create(sequence);

ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
Assert.Equal(11, buffer.Length);
Assert.True(buffer.IsSingleSegment);
reader.AdvanceTo(buffer.Start, buffer.GetPosition(5));

Assert.True(reader.TryRead(out readResult));
Assert.Equal(11, buffer.Length);
Assert.True(buffer.IsSingleSegment);
Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));

reader.Complete();
}

[Fact]
public async Task TryReadReturnsFalseIfBufferedBytesAndEverythingExamined()
{
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes("Hello World"));
var reader = PipeReader.Create(sequence);

ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
Assert.Equal(11, buffer.Length);
Assert.True(buffer.IsSingleSegment);
reader.AdvanceTo(buffer.End);

Assert.False(reader.TryRead(out readResult));
reader.Complete();
}

[Fact]
public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
{
var sequence = ReadOnlySequence<byte>.Empty;
PipeReader reader = PipeReader.Create(sequence);
ReadResult readResult = await reader.ReadAsync();
Assert.True(readResult.Buffer.IsEmpty);
Assert.True(readResult.IsCompleted);
reader.AdvanceTo(readResult.Buffer.End);

readResult = await reader.ReadAsync();
Assert.True(readResult.Buffer.IsEmpty);
Assert.True(readResult.IsCompleted);
reader.AdvanceTo(readResult.Buffer.End);
reader.Complete();
}

[Fact]
public async Task DataCanBeReadMultipleTimes()
{
var helloBytes = Encoding.ASCII.GetBytes("Hello World");
var sequence = new ReadOnlySequence<byte>(helloBytes);
PipeReader reader = PipeReader.Create(sequence);


ReadResult readResult = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = readResult.Buffer;
reader.AdvanceTo(buffer.Start, buffer.End);

// Make sure IsCompleted is true
readResult = await reader.ReadAsync();
buffer = readResult.Buffer;
reader.AdvanceTo(buffer.Start, buffer.End);
Assert.True(readResult.IsCompleted);

var value = await ReadFromPipeAsString(reader);
Assert.Equal("Hello World", value);
reader.Complete();
}

[Fact]
public async Task NextReadAfterPartiallyExaminedReturnsImmediately()
{
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes(new string('a', 10000)));
PipeReader reader = PipeReader.Create(sequence);

ReadResult readResult = await reader.ReadAsync();
reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.GetPosition(2048));

ValueTask<ReadResult> task = reader.ReadAsync();

// This should complete synchronously since
Assert.True(task.IsCompleted);

readResult = await task;
reader.AdvanceTo(readResult.Buffer.End);
reader.Complete();
}

[Fact]
public async Task CompleteReaderWithoutAdvanceDoesNotThrow()
{
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);
await reader.ReadAsync();
reader.Complete();
}

[Fact]
public async Task AdvanceAfterCompleteThrows()
{
PipeReader reader = PipeReader.Create(new ReadOnlySequence<byte>(new byte[100]));
ReadOnlySequence<byte> buffer = (await reader.ReadAsync()).Buffer;

reader.Complete();

Assert.Throws<InvalidOperationException>(() => reader.AdvanceTo(buffer.End));
}

[Fact]
public async Task ThrowsOnReadAfterCompleteReader()
{
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);

reader.Complete();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await reader.ReadAsync());
}

[Fact]
public void TryReadAfterCancelPendingReadReturnsTrue()
{
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);

reader.CancelPendingRead();

Assert.True(reader.TryRead(out ReadResult result));
Assert.True(result.IsCanceled);
reader.AdvanceTo(result.Buffer.End);
reader.Complete();
}

[Fact]
public async Task ReadAsyncReturnsCanceledIfCanceledBeforeRead()
{
var sequence = new ReadOnlySequence<byte>(new byte[10000]);
PipeReader reader = PipeReader.Create(sequence);

// Make sure state isn't used from before
for (var i = 0; i < 3; i++)
{
reader.CancelPendingRead();
ValueTask<ReadResult> readResultTask = reader.ReadAsync();
Assert.True(readResultTask.IsCompleted);
ReadResult readResult = readResultTask.GetAwaiter().GetResult();
Assert.True(readResult.IsCanceled);
readResult = await reader.ReadAsync();
reader.AdvanceTo(readResult.Buffer.End);
}

reader.Complete();
}

[Fact]
public async Task ReadAsyncReturnsCanceledInterleaved()
{
var sequence = new ReadOnlySequence<byte>(new byte[10000]);
PipeReader reader = PipeReader.Create(sequence);

// Cancel and Read interleaved to confirm cancellations are independent
for (var i = 0; i < 3; i++)
{
reader.CancelPendingRead();
ValueTask<ReadResult> readResultTask = reader.ReadAsync();
Assert.True(readResultTask.IsCompleted);
ReadResult readResult = readResultTask.GetAwaiter().GetResult();
Assert.True(readResult.IsCanceled);

readResult = await reader.ReadAsync();
Assert.False(readResult.IsCanceled);
}

reader.Complete();
}

[Fact]
public void OnWriterCompletedNoops()
{
bool fired = false;
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);
#pragma warning disable CS0618 // Type or member is obsolete
reader.OnWriterCompleted((_, __) => { fired = true; }, null);
#pragma warning restore CS0618 // Type or member is obsolete
reader.Complete();
Assert.False(fired);
}

private static async Task<string> ReadFromPipeAsString(PipeReader reader)
{
ReadResult readResult = await reader.ReadAsync();
var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
reader.AdvanceTo(readResult.Buffer.End);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<Compile Include="ReadAsyncCompletionTests.cs" />
<Compile Include="ReadResultTests.cs" />
<Compile Include="SchedulerFacts.cs" />
<Compile Include="SequencePipeReaderTests.cs" />
<Compile Include="StreamPipeReaderTests.cs" />
<Compile Include="Infrastructure\TestMemoryPool.cs" />
<Compile Include="StreamPipeWriterTests.cs" />
Expand Down