diff --git a/eng/Dependencies.props b/eng/Dependencies.props index eea85ba1bd4a..bd45447f355e 100644 --- a/eng/Dependencies.props +++ b/eng/Dependencies.props @@ -85,6 +85,7 @@ and are generated based on the last package release. + diff --git a/eng/Version.Details.xml b/eng/Version.Details.xml index d8b75f263acd..c14039a249d4 100644 --- a/eng/Version.Details.xml +++ b/eng/Version.Details.xml @@ -218,6 +218,10 @@ https://github.com/dotnet/runtime 8fac5af2b11dc98fa0504f6fd06df790164ec958 + + https://github.com/dotnet/runtime + 8fac5af2b11dc98fa0504f6fd06df790164ec958 + https://github.com/dotnet/runtime 8fac5af2b11dc98fa0504f6fd06df790164ec958 diff --git a/eng/Versions.props b/eng/Versions.props index 3c452cf9b06e..4ab313e07231 100644 --- a/eng/Versions.props +++ b/eng/Versions.props @@ -116,6 +116,7 @@ 9.0.0-preview.6.24307.2 9.0.0-preview.6.24307.2 9.0.0-preview.6.24307.2 + 9.0.0-preview.6.24307.2 9.0.0-preview.6.24307.2 9.0.0-preview.6.24307.2 9.0.0-preview.6.24307.2 diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsParserTests.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsParserTests.cs deleted file mode 100644 index 996d6a2ad48e..000000000000 --- a/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsParserTests.cs +++ /dev/null @@ -1,215 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System; -using System.Buffers; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Text; -using System.Threading.Tasks; -using Microsoft.AspNetCore.Http.Connections.Client.Internal; -using Xunit; - -namespace Microsoft.AspNetCore.SignalR.Client.Tests; - -public class ServerSentEventsParserTests -{ - [Theory] - [InlineData("\r\n", "")] - [InlineData("\r\n:\r\n", "")] - [InlineData("\r\n:comment\r\n", "")] - [InlineData("data: \r\r\n\n", "\r")] - [InlineData(":comment\r\ndata: \r\r\n\r\n", "\r")] - [InlineData("data: A\rB\r\n\r\n", "A\rB")] - [InlineData("data: Hello, World\r\n\r\n", "Hello, World")] - [InlineData("data: Hello, World\r\n\r\ndata: ", "Hello, World")] - [InlineData("data: Hello, World\r\n\r\n:comment\r\ndata: ", "Hello, World")] - [InlineData("data: Hello, World\r\n\r\n:comment", "Hello, World")] - [InlineData("data: Hello, World\r\n\r\n:comment\r\n", "Hello, World")] - [InlineData("data: Hello, World\r\n:comment\r\n\r\n", "Hello, World")] - [InlineData("data: SGVsbG8sIFdvcmxk\r\n\r\n", "SGVsbG8sIFdvcmxk")] - public void ParseSSEMessageSuccessCases(string encodedMessage, string expectedMessage) - { - var buffer = Encoding.UTF8.GetBytes(encodedMessage); - var readableBuffer = new ReadOnlySequence(buffer); - var parser = new ServerSentEventsMessageParser(); - - var parseResult = parser.ParseMessage(readableBuffer, out var consumed, out var examined, out var message); - Assert.Equal(ServerSentEventsMessageParser.ParseResult.Completed, parseResult); - Assert.Equal(consumed, examined); - - var result = Encoding.UTF8.GetString(message); - Assert.Equal(expectedMessage, result); - } - - [Theory] - [InlineData("data: T\r\nfoo: Hello, World\r\n\r\n", "Expected the message prefix 'data: '")] - [InlineData("foo: T\r\ndata: Hello, World\r\n\r\n", "Expected the message prefix 'data: '")] - [InlineData("food: T\r\ndata: Hello, World\r\n\r\n", "Expected the message prefix 'data: '")] - [InlineData("data: T\r\ndata: Hello\n, World\r\n\r\n", "Expected the message prefix 'data: '")] - public void ParseSSEMessageFailureCases(string encodedMessage, string expectedExceptionMessage) - { - var buffer = Encoding.UTF8.GetBytes(encodedMessage); - var readableBuffer = new ReadOnlySequence(buffer); - var parser = new ServerSentEventsMessageParser(); - - var ex = Assert.Throws(() => { parser.ParseMessage(readableBuffer, out var consumed, out var examined, out var message); }); - Assert.Equal(expectedExceptionMessage, ex.Message); - } - - [Theory] - [InlineData("")] - [InlineData(":")] - [InlineData(":comment")] - [InlineData(":comment\r\n")] - [InlineData("data:")] - [InlineData("data: \r")] - [InlineData("data: T\r\nda")] - [InlineData("data: T\r\ndata:")] - [InlineData("data: T\r\ndata: Hello, World")] - [InlineData("data: T\r\ndata: Hello, World\r")] - [InlineData("data: T\r\ndata: Hello, World\n")] - [InlineData("data: T\r\ndata: Hello, World\r\n")] - [InlineData("data: B\r\ndata: SGVsbG8sIFd")] - [InlineData(":\r\ndata:")] - [InlineData("data: T\r\n:\r\n")] - [InlineData("data: T\r\n:\r\ndata:")] - [InlineData("data: T\r\ndata: Hello, World\r\n:comment")] - public void ParseSSEMessageIncompleteParseResult(string encodedMessage) - { - var buffer = Encoding.UTF8.GetBytes(encodedMessage); - var readableBuffer = new ReadOnlySequence(buffer); - var parser = new ServerSentEventsMessageParser(); - - var parseResult = parser.ParseMessage(readableBuffer, out var consumed, out var examined, out var message); - - Assert.Equal(ServerSentEventsMessageParser.ParseResult.Incomplete, parseResult); - } - - [Theory] - [InlineData(new[] { "d", "ata: Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "da", "ta: Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "dat", "a: Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "data", ": Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "data:", " Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "data: Hello, World", "\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "data: Hello, World\r\n", "\r\n" }, "Hello, World")] - [InlineData(new[] { "data: ", "Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "data: ", "Hello, World\n\n" }, "Hello, World")] - [InlineData(new[] { "data: ", "Hello, World\r\n\n" }, "Hello, World")] - [InlineData(new[] { ":", "comment", "\r\n", "d", "ata: Hello, World\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { ":comment", "\r\n", "data: Hello, World", "\r\n\r\n" }, "Hello, World")] - [InlineData(new[] { "data: Hello, World\r\n", ":comment\r\n", "\r\n" }, "Hello, World")] - public async Task ParseMessageAcrossMultipleReadsSuccess(string[] messageParts, string expectedMessage) - { - var parser = new ServerSentEventsMessageParser(); - var pipe = new Pipe(); - - byte[] message = null; - SequencePosition consumed = default, examined = default; - - for (var i = 0; i < messageParts.Length; i++) - { - var messagePart = messageParts[i]; - await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(messagePart)); - var result = await pipe.Reader.ReadAsync(); - - var parseResult = parser.ParseMessage(result.Buffer, out consumed, out examined, out message); - pipe.Reader.AdvanceTo(consumed, examined); - - // parse result should be complete only after we parsed the last message part - var expectedResult = - i == messageParts.Length - 1 - ? ServerSentEventsMessageParser.ParseResult.Completed - : ServerSentEventsMessageParser.ParseResult.Incomplete; - - Assert.Equal(expectedResult, parseResult); - } - - Assert.Equal(consumed, examined); - - var resultMessage = Encoding.UTF8.GetString(message); - Assert.Equal(expectedMessage, resultMessage); - } - - [Theory] - [InlineData("data: T\r\nf", "oo: Hello, World\r\n\r\n", "Expected the message prefix 'data: '")] - [InlineData("foo", ": T\r\ndata: Hello, World\r\n\r\n", "Expected the message prefix 'data: '")] - [InlineData("food:", " T\r\ndata: Hello, World\r\n\r\n", "Expected the message prefix 'data: '")] - [InlineData("data: T\r\nda", "ta: Hello\n, World\r\n\r\n", "Expected the message prefix 'data: '")] - public async Task ParseMessageAcrossMultipleReadsFailure(string encodedMessagePart1, string encodedMessagePart2, string expectedMessage) - { - var pipe = new Pipe(); - - // Read the first part of the message - await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(encodedMessagePart1)); - - var result = await pipe.Reader.ReadAsync(); - var parser = new ServerSentEventsMessageParser(); - - var parseResult = parser.ParseMessage(result.Buffer, out var consumed, out var examined, out var buffer); - Assert.Equal(ServerSentEventsMessageParser.ParseResult.Incomplete, parseResult); - - pipe.Reader.AdvanceTo(consumed, examined); - - // Send the rest of the data and parse the complete message - await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(encodedMessagePart2)); - result = await pipe.Reader.ReadAsync(); - - var ex = Assert.Throws(() => parser.ParseMessage(result.Buffer, out consumed, out examined, out buffer)); - Assert.Equal(expectedMessage, ex.Message); - } - - [Theory] - [InlineData("data: foo\r\n\r\n", "data: bar\r\n\r\n")] - public async Task ParseMultipleMessagesText(string message1, string message2) - { - var pipe = new Pipe(); - - // Read the first part of the message - await pipe.Writer.WriteAsync(Encoding.UTF8.GetBytes(message1 + message2)); - - var result = await pipe.Reader.ReadAsync(); - var parser = new ServerSentEventsMessageParser(); - - var parseResult = parser.ParseMessage(result.Buffer, out var consumed, out var examined, out var message); - Assert.Equal(ServerSentEventsMessageParser.ParseResult.Completed, parseResult); - Assert.Equal("foo", Encoding.UTF8.GetString(message)); - Assert.Equal(consumed, result.Buffer.GetPosition(message1.Length)); - pipe.Reader.AdvanceTo(consumed, examined); - Assert.Equal(consumed, examined); - - parser.Reset(); - - result = await pipe.Reader.ReadAsync(); - parseResult = parser.ParseMessage(result.Buffer, out consumed, out examined, out message); - Assert.Equal(ServerSentEventsMessageParser.ParseResult.Completed, parseResult); - Assert.Equal("bar", Encoding.UTF8.GetString(message)); - pipe.Reader.AdvanceTo(consumed, examined); - } - - public static IEnumerable MultilineMessages - { - get - { - yield return new object[] { "data: Shaolin\r\ndata: Fantastic\r\n\r\n", "Shaolin" + Environment.NewLine + " Fantastic" }; - yield return new object[] { "data: The\r\ndata: Get\r\ndata: Down\r\n\r\n", "The" + Environment.NewLine + "Get" + Environment.NewLine + "Down" }; - } - } - - [Theory] - [MemberData(nameof(MultilineMessages))] - public void ParseMessagesWithMultipleDataLines(string encodedMessage, string expectedMessage) - { - var buffer = Encoding.UTF8.GetBytes(encodedMessage); - var readableBuffer = new ReadOnlySequence(buffer); - var parser = new ServerSentEventsMessageParser(); - - var parseResult = parser.ParseMessage(readableBuffer, out var consumed, out var examined, out var message); - Assert.Equal(ServerSentEventsMessageParser.ParseResult.Completed, parseResult); - Assert.Equal(consumed, examined); - - var result = Encoding.UTF8.GetString(message); - Assert.Equal(expectedMessage, result); - } -} diff --git a/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsTransportTests.cs b/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsTransportTests.cs index bb987398048a..ccf6d2cd1f46 100644 --- a/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsTransportTests.cs +++ b/src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsTransportTests.cs @@ -1,24 +1,20 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; -using System.IO; +using System.Diagnostics; using System.IO.Pipelines; +using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Text; -using System.Threading; -using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Connections.Client.Internal; using Microsoft.AspNetCore.Internal; -using Microsoft.AspNetCore.SignalR.Tests; using Microsoft.AspNetCore.InternalTesting; +using Microsoft.AspNetCore.SignalR.Tests; using Microsoft.Extensions.Logging.Testing; using Moq; using Moq.Protected; -using Xunit; -using System.Net; namespace Microsoft.AspNetCore.SignalR.Client.Tests; @@ -121,7 +117,7 @@ await sseTransport.StartAsync( } [Fact] - public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults() + public async Task SSETransportStopIfServerSendsIncompleteResults() { var mockHttpHandler = new Mock(); var calls = 0; @@ -156,11 +152,9 @@ public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults() await sseTransport.StartAsync( new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout(); - var exception = await Assert.ThrowsAsync(() => sseTransport.Input.ReadAllAsync()); + await sseTransport.Input.ReadAllAsync().DefaultTimeout(); await sseTransport.Running.DefaultTimeout(); - - Assert.Equal("Incomplete message.", exception.Message); } } @@ -444,4 +438,81 @@ public async Task StartAsyncSetsCorrectAcceptHeaderForSSE() Assert.Equal(HttpStatusCode.OK, response.StatusCode); } } + + [Theory] + [InlineData(new[] { "\r\n" }, "")] + [InlineData(new[] { "\r\n:\r\n" }, "")] + [InlineData(new[] { "\r\n:comment\r\n" }, "")] + [InlineData(new[] { "data: \r\r\n\n" }, "")] + [InlineData(new[] { ":comment\r\ndata: \r\r\n\r\n" }, "")] + [InlineData(new[] { "data: A\rB\r\n\r\n" }, "A")] + [InlineData(new[] { "data: Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n\r\ndata: " }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n\r\n:comment\r\ndata: " }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n\r\n:comment" }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n\r\n:comment\r\n" }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n:comment\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data: SGVsbG8sIFdvcmxk\r\n\r\n" }, "SGVsbG8sIFdvcmxk")] + [InlineData(new[] { "d", "ata: Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "da", "ta: Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "dat", "a: Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data", ": Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data:", " Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data: Hello, World", "\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n", "\r\n" }, "Hello, World")] + [InlineData(new[] { "data: ", "Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data: ", "Hello, World\n\n" }, "Hello, World")] + [InlineData(new[] { "data: ", "Hello, World\r\n\n" }, "Hello, World")] + [InlineData(new[] { ":", "comment", "\r\n", "d", "ata: Hello, World\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { ":comment", "\r\n", "data: Hello, World", "\r\n\r\n" }, "Hello, World")] + [InlineData(new[] { "data: Hello, World\r\n", ":comment\r\n", "\r\n" }, "Hello, World")] + [InlineData(new[] { "data: Hello \r\n", "data: World\r\n\r\n" }, "Hello \nWorld")] + public async Task CanProcessMessagesSuccessfully(string[] messageParts, string expectedMessage) + { + var mockHttpHandler = new Mock(); + mockHttpHandler.Protected() + .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(async (request, cancellationToken) => + { + await Task.Yield(); + return new HttpResponseMessage { Content = new StreamContent(new OneAtATimeStream(messageParts)) }; + }); + + using (var httpClient = new HttpClient(mockHttpHandler.Object)) + using (StartVerifiableLog()) + { + var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory); + + await sseTransport.StartAsync( + new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout(); + + var message = await sseTransport.Input.ReadAllAsync().DefaultTimeout(); + Assert.Equal(expectedMessage, Encoding.ASCII.GetString(message)); + + await sseTransport.Running.DefaultTimeout(); + } + } + + public sealed class OneAtATimeStream : MemoryStream + { + private readonly string[] _contents; + private int _index; + + public OneAtATimeStream(string[] contents) + { + _contents = contents; + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (_index == _contents.Length) + { + return new(0); + } + + Debug.Assert(buffer.Length > _contents[_index].Length); + + return new(Encoding.UTF8.GetBytes(_contents[_index++], buffer.Span)); + } + } } diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsMessageParser.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsMessageParser.cs deleted file mode 100644 index 9c1d11eec965..000000000000 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsMessageParser.cs +++ /dev/null @@ -1,185 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System; -using System.Buffers; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; -using System.Text; - -namespace Microsoft.AspNetCore.Http.Connections.Client.Internal; - -internal sealed class ServerSentEventsMessageParser -{ - private const byte ByteCR = (byte)'\r'; - private const byte ByteLF = (byte)'\n'; - private const byte ByteColon = (byte)':'; - - // This uses C# compiler's ability to refer to static data directly. For more information see https://vcsjones.dev/2019/02/01/csharp-readonly-span-bytes-static - private static ReadOnlySpan DataPrefix => "data: "u8; - private static ReadOnlySpan SseLineEnding => "\r\n"u8; - private static readonly byte[] _newLine = Encoding.UTF8.GetBytes(Environment.NewLine); - - private InternalParseState _internalParserState = InternalParseState.ReadMessagePayload; - private readonly List _data = new List(); - - public ParseResult ParseMessage(ReadOnlySequence buffer, out SequencePosition consumed, out SequencePosition examined, out byte[]? message) - { - consumed = buffer.Start; - examined = buffer.End; - message = null; - - var start = consumed; - - while (buffer.Length > 0) - { - if (!(buffer.PositionOf(ByteLF) is SequencePosition lineEnd)) - { - // Partial message. We need to read more. - return ParseResult.Incomplete; - } - - // buffer, and thus line should atleast contain \n at this point. - lineEnd = buffer.GetPosition(1, lineEnd); - var line = ConvertBufferToSpan(buffer.Slice(start, lineEnd)); - buffer = buffer.Slice(line.Length); - - // Skip comments - if (line[0] == ByteColon) - { - start = lineEnd; - consumed = lineEnd; - continue; - } - - if (IsMessageEnd(line)) - { - _internalParserState = InternalParseState.ReadEndOfMessage; - } - else - { - EnsureStartsWithDataPrefix(line); - } - - var payload = Array.Empty(); - switch (_internalParserState) - { - case InternalParseState.ReadMessagePayload: - EnsureStartsWithDataPrefix(line); - - // Slice away the 'data: ' - var payloadLength = line.Length - DataPrefix.Length; - var lineWithEnding = line.Slice(DataPrefix.Length, payloadLength); - var newData = TrimEnding(lineWithEnding).ToArray(); - _data.Add(newData); - - start = lineEnd; - consumed = lineEnd; - break; - case InternalParseState.ReadEndOfMessage: - if (_data.Count == 1) - { - payload = _data[0]; - } - else if (_data.Count > 1) - { - // Find the final size of the payload - var payloadSize = 0; - foreach (var dataLine in _data) - { - payloadSize += dataLine.Length; - } - - payloadSize += _newLine.Length * _data.Count; - - // Allocate space in the payload buffer for the data and the new lines. - // Subtract newLine length because we don't want a trailing newline. - payload = new byte[payloadSize - _newLine.Length]; - - var offset = 0; - foreach (var dataLine in _data) - { - dataLine.CopyTo(payload, offset); - offset += dataLine.Length; - if (offset < payload.Length) - { - _newLine.CopyTo(payload, offset); - offset += _newLine.Length; - } - } - } - - message = payload; - consumed = lineEnd; - examined = consumed; - return ParseResult.Completed; - } - - if (buffer.Length > 0 && buffer.First.Span[0] == ByteCR) - { - _internalParserState = InternalParseState.ReadEndOfMessage; - } - } - return ParseResult.Incomplete; - } - - private static ReadOnlySpan TrimEnding(ReadOnlySpan lineWithEnding) - { - // Up above we ensure that we will have a line ending. - // that can be either CRLF or LF - return lineWithEnding.EndsWith(SseLineEnding) - ? lineWithEnding.Slice(0, lineWithEnding.Length - 2) // CRLF - : lineWithEnding.Slice(0, lineWithEnding.Length - 1); // LF - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static ReadOnlySpan ConvertBufferToSpan(in ReadOnlySequence buffer) - { - if (buffer.IsSingleSegment) - { - return buffer.First.Span; - } - return buffer.ToArray(); - } - - public void Reset() - { - _internalParserState = InternalParseState.ReadMessagePayload; - _data.Clear(); - } - - private static void EnsureStartsWithDataPrefix(ReadOnlySpan line) - { - if (!line.StartsWith(DataPrefix)) - { - throw new FormatException("Expected the message prefix 'data: '"); - } - } - - private static bool IsMessageEnd(ReadOnlySpan line) - { - if (line.Length == 2) - { - return line[0] == ByteCR && line[1] == ByteLF; - } - else if (line.Length == 1) - { - return line[0] == ByteLF; - } - return false; - } - - public enum ParseResult - { - Completed, - Incomplete, - } - - private enum InternalParseState - { - ReadMessagePayload, - ReadEndOfMessage, - Error - } -} diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.Log.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.Log.cs index 9e8b871e56cd..b97fc3868a04 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.Log.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.Log.cs @@ -37,6 +37,7 @@ private static partial class Log [LoggerMessage(8, LogLevel.Debug, "Server-Sent Event Stream ended.", EventName = "EventStreamEnded")] public static partial void EventStreamEnded(ILogger logger); + // No longer used [LoggerMessage(9, LogLevel.Debug, "Received {Count} bytes. Parsing SSE frame.", EventName = "ParsingSSE")] public static partial void ParsingSSE(ILogger logger, long count); } diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs index 19926c8c0600..0921f132df3a 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs @@ -4,9 +4,9 @@ using System; using System.Diagnostics; using System.IO.Pipelines; -using System.Net; using System.Net.Http; using System.Net.Http.Headers; +using System.Net.ServerSentEvents; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; @@ -25,7 +25,6 @@ internal sealed partial class ServerSentEventsTransport : ITransport private volatile Exception? _error; private readonly CancellationTokenSource _transportCts = new CancellationTokenSource(); private readonly CancellationTokenSource _inputCts = new CancellationTokenSource(); - private readonly ServerSentEventsMessageParser _parser = new ServerSentEventsMessageParser(); private IDuplexPipe? _transport; private IDuplexPipe? _application; @@ -130,80 +129,35 @@ private async Task ProcessEventStream(HttpResponseMessage response, Cancellation Log.StartReceive(_logger); - static void CancelReader(object? state) => ((PipeReader)state!).CancelPendingRead(); - using (response) #pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) #pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods { - var reader = PipeReader.Create(stream); - - using var registration = cancellationToken.Register(CancelReader, reader); - try { - while (true) + var parser = SseParser.Create(stream, (eventType, bytes) => bytes.ToArray()); + await foreach (var item in parser.EnumerateAsync(cancellationToken).ConfigureAwait(false)) { - // We rely on the CancelReader callback to cancel pending reads. Do not pass the token to ReadAsync since that would result in an exception on cancelation. - var result = await reader.ReadAsync(default).ConfigureAwait(false); - var buffer = result.Buffer; - var consumed = buffer.Start; - var examined = buffer.End; + Log.MessageToApplication(_logger, item.Data.Length); - try - { - if (result.IsCanceled) - { - Log.ReceiveCanceled(_logger); - break; - } - - if (!buffer.IsEmpty) - { - Log.ParsingSSE(_logger, buffer.Length); - - var parseResult = _parser.ParseMessage(buffer, out consumed, out examined, out var message); - FlushResult flushResult = default; - - switch (parseResult) - { - case ServerSentEventsMessageParser.ParseResult.Completed: - Log.MessageToApplication(_logger, message!.Length); - - // When cancellationToken is canceled the next line will cancel pending flushes on the pipe unblocking the await. - // Avoid passing the passed in context. - flushResult = await _application.Output.WriteAsync(message, default).ConfigureAwait(false); - - _parser.Reset(); - break; - case ServerSentEventsMessageParser.ParseResult.Incomplete: - if (result.IsCompleted) - { - throw new FormatException("Incomplete message."); - } - break; - } - - // We canceled in the middle of applying back pressure - // or if the consumer is done - if (flushResult.IsCanceled || flushResult.IsCompleted) - { - Log.EventStreamEnded(_logger); - break; - } - } - else if (result.IsCompleted) - { - break; - } - } - finally + // When cancellationToken is canceled the next line will cancel pending flushes on the pipe unblocking the await. + // Avoid passing the passed in context. + var flushResult = await _application.Output.WriteAsync(item.Data, default).ConfigureAwait(false); + + // We canceled in the middle of applying back pressure + // or if the consumer is done + if (flushResult.IsCanceled || flushResult.IsCompleted) { - reader.AdvanceTo(consumed, examined); + Log.EventStreamEnded(_logger); + break; } } } + catch (OperationCanceledException) + { + Log.ReceiveCanceled(_logger); + } catch (Exception ex) { _error = ex; @@ -213,8 +167,6 @@ private async Task ProcessEventStream(HttpResponseMessage response, Cancellation _application.Output.Complete(_error); Log.ReceiveStopped(_logger); - - reader.Complete(); } } } diff --git a/src/SignalR/clients/csharp/Http.Connections.Client/src/Microsoft.AspNetCore.Http.Connections.Client.csproj b/src/SignalR/clients/csharp/Http.Connections.Client/src/Microsoft.AspNetCore.Http.Connections.Client.csproj index 2a04e266f950..7c9977be80e2 100644 --- a/src/SignalR/clients/csharp/Http.Connections.Client/src/Microsoft.AspNetCore.Http.Connections.Client.csproj +++ b/src/SignalR/clients/csharp/Http.Connections.Client/src/Microsoft.AspNetCore.Http.Connections.Client.csproj @@ -25,6 +25,7 @@ + diff --git a/src/SignalR/perf/Microbenchmarks/ServerSentEventsBenchmark.cs b/src/SignalR/perf/Microbenchmarks/ServerSentEventsBenchmark.cs index 330621bd093a..86bd860c4bbb 100644 --- a/src/SignalR/perf/Microbenchmarks/ServerSentEventsBenchmark.cs +++ b/src/SignalR/perf/Microbenchmarks/ServerSentEventsBenchmark.cs @@ -12,8 +12,6 @@ namespace Microsoft.AspNetCore.SignalR.Microbenchmarks; public class ServerSentEventsBenchmark { - private ServerSentEventsMessageParser _parser; - private byte[] _sseFormattedData; private ReadOnlySequence _rawData; [Params(Message.NoArguments, Message.FewArguments, Message.ManyArguments, Message.LargeArguments)] @@ -57,24 +55,9 @@ public void GlobalSetup() break; } - _parser = new ServerSentEventsMessageParser(); _rawData = new ReadOnlySequence(protocol.GetMessageBytes(hubMessage)); var ms = new MemoryStream(); ServerSentEventsMessageFormatter.WriteMessageAsync(_rawData, ms, default).GetAwaiter().GetResult(); - _sseFormattedData = ms.ToArray(); - } - - [Benchmark] - public void ReadSingleMessage() - { - var buffer = new ReadOnlySequence(_sseFormattedData); - - if (_parser.ParseMessage(buffer, out _, out _, out _) != ServerSentEventsMessageParser.ParseResult.Completed) - { - throw new InvalidOperationException("Parse failed!"); - } - - _parser.Reset(); } [Benchmark]