diff --git a/src/ReverseProxy.Core/Service/Proxy/AutoFlushingStream.cs b/src/ReverseProxy.Core/Service/Proxy/AutoFlushingStream.cs new file mode 100644 index 000000000..e023f7ed9 --- /dev/null +++ b/src/ReverseProxy.Core/Service/Proxy/AutoFlushingStream.cs @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.ReverseProxy.Utilities; + +namespace Microsoft.ReverseProxy.Core.Service.Proxy +{ + /// + /// Delegates to a wrapped stream and calls its + /// or + /// on every write. + /// + /// + /// This is used by to work around some undesirable behavior + /// in the HttpClient machinery (as of .NET Core 3.1.201) where an internal buffer + /// doesn't get written to the outgoing socket stream when we write to the outgoing stream. + /// Calling Flush on that stream sends the bytes to the underlying socket, + /// but does not call flush on the socket, so perf impact is expected to be small. + /// + internal sealed class AutoFlushingStream : Stream + { + private readonly Stream _stream; + + public AutoFlushingStream(Stream stream) + { + Contracts.CheckValue(stream, nameof(stream)); + _stream = stream; + } + + public override bool CanRead => _stream.CanRead; + + public override bool CanSeek => _stream.CanSeek; + + public override bool CanWrite => _stream.CanWrite; + + public override long Length => _stream.Length; + + public override long Position + { + get => _stream.Position; + set => _stream.Position = value; + } + + public override void Write(byte[] buffer, int offset, int count) + { + _stream.Write(buffer, offset, count); + _stream.Flush(); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _stream.WriteAsync(buffer, offset, count, cancellationToken); + await _stream.FlushAsync(cancellationToken); + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await _stream.WriteAsync(buffer, cancellationToken); + await _stream.FlushAsync(cancellationToken); + } + + public override void Flush() + { + _stream.Flush(); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + return _stream.FlushAsync(cancellationToken); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _stream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) + { + return _stream.Seek(offset, origin); + } + + public override void SetLength(long value) + { + _stream.SetLength(value); + } + } +} diff --git a/src/ReverseProxy.Core/Service/Proxy/HttpProxy.cs b/src/ReverseProxy.Core/Service/Proxy/HttpProxy.cs index ad559da36..e3b59b9d5 100644 --- a/src/ReverseProxy.Core/Service/Proxy/HttpProxy.cs +++ b/src/ReverseProxy.Core/Service/Proxy/HttpProxy.cs @@ -13,10 +13,10 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Server.Kestrel.Core.Features; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Primitives; using Microsoft.ReverseProxy.Common; -using Microsoft.ReverseProxy.Core.Abstractions; using Microsoft.ReverseProxy.Core.Service.Metrics; using Microsoft.ReverseProxy.Core.Service.Proxy.Infra; using Microsoft.ReverseProxy.Utilities; @@ -108,16 +108,16 @@ public Task ProxyAsync( /// /// /// Normal proxying comprises the following steps: - /// (1) Create outgoing HttpRequestMessage - /// (2) Setup copy of request body (background) Downstream --► Proxy --► Upstream - /// (3) Copy request headers Downstream --► Proxy --► Upstream - /// (4) Send the outgoing request using HttpMessageInvoker Downstream --► Proxy --► Upstream - /// (5) Copy response status line Downstream ◄-- Proxy ◄-- Upstream - /// (6) Copy response headers Downstream ◄-- Proxy ◄-- Upstream - /// (7) Send response headers Downstream ◄-- Proxy ◄-- Upstream - /// (8) Copy response body Downstream ◄-- Proxy ◄-- Upstream - /// (9) Wait for completion of step 2: copying request body Downstream --► Proxy --► Upstream - /// (10) Copy response trailer headers Downstream ◄-- Proxy ◄-- Upstream + /// (0) Disable ASP .NET Core limits for streaming requests + /// (1) Create outgoing HttpRequestMessage + /// (2) Setup copy of request body (background) Downstream --► Proxy --► Upstream + /// (3) Copy request headers Downstream --► Proxy --► Upstream + /// (4) Send the outgoing request using HttpMessageInvoker Downstream --► Proxy --► Upstream + /// (5) Copy response status line Downstream ◄-- Proxy ◄-- Upstream + /// (6) Copy response headers Downstream ◄-- Proxy ◄-- Upstream + /// (7) Copy response body Downstream ◄-- Proxy ◄-- Upstream + /// (8) Copy response trailer headers and finish response Downstream ◄-- Proxy ◄-- Upstream + /// (9) Wait for completion of step 2: copying request body Downstream --► Proxy --► Upstream /// /// ASP .NET Core (Kestrel) will finally send response trailers (if any) /// after we complete the steps above and relinquish control. @@ -134,6 +134,18 @@ private async Task NormalProxyAsync( Contracts.CheckValue(targetUri, nameof(targetUri)); Contracts.CheckValue(httpClient, nameof(httpClient)); + // ::::::::::::::::::::::::::::::::::::::::::::: + // :: Step 0: Disable ASP .NET Core limits for streaming requests + var isIncomingHttp2 = HttpProtocol.IsHttp2(context.Request.Protocol); + + // NOTE: We heuristically assume gRPC-looking requests may require streaming semantics. + // See https://github.com/microsoft/reverse-proxy/issues/118 for design discussion. + var isStreamingRequest = isIncomingHttp2 && GrpcProtocolHelper.IsGrpcContentType(context.Request.ContentType); + if (isStreamingRequest) + { + DisableMinRequestBodyDataRateAndMaxRequestBodySize(context); + } + // ::::::::::::::::::::::::::::::::::::::::::::: // :: Step 1: Create outgoing HttpRequestMessage var upstreamRequest = new HttpRequestMessage(HttpUtilities.GetHttpMethod(context.Request.Method), targetUri) @@ -147,7 +159,7 @@ private async Task NormalProxyAsync( // ::::::::::::::::::::::::::::::::::::::::::::: // :: Step 2: Setup copy of request body (background) Downstream --► Proxy --► Upstream // Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here. - var bodyToUpstreamContent = SetupCopyBodyUpstream(context.Request.Body, upstreamRequest, in proxyTelemetryContext, longCancellation); + var bodyToUpstreamContent = SetupCopyBodyUpstream(context.Request.Body, upstreamRequest, in proxyTelemetryContext, isStreamingRequest, longCancellation); // ::::::::::::::::::::::::::::::::::::::::::::: // :: Step 3: Copy request headers Downstream --► Proxy --► Upstream @@ -159,7 +171,7 @@ private async Task NormalProxyAsync( var upstreamResponse = await httpClient.SendAsync(upstreamRequest, shortCancellation); // Detect connection downgrade, which may be problematic for e.g. gRPC. - if (upstreamResponse.Version.Major != 2 && HttpProtocol.IsHttp2(context.Request.Protocol)) + if (isIncomingHttp2 && upstreamResponse.Version.Major != 2) { // TODO: Do something on connection downgrade... Log.HttpDowngradeDeteced(_logger); @@ -184,18 +196,37 @@ private async Task NormalProxyAsync( // :: Step 6: Copy response headers Downstream ◄-- Proxy ◄-- Upstream CopyHeadersToDownstream(upstreamResponse, context.Response.Headers); - // ::::::::::::::::::::::::::::::::::::::::::::: - // :: Step 7: Send response headers Downstream ◄-- Proxy ◄-- Upstream - // This is important to avoid any extra delays in sending response headers - // e.g. if the upstream server is slow to provide its response body. - ////this.logger.LogInformation($" Starting downstream <-- Proxy response"); + // NOTE: it may *seem* wise to call `context.Response.StartAsync()` at this point + // since it looks like we are ready to send back response headers + // (and this might help reduce extra delays while we wait to receive the body from upstream). + // HOWEVER, this would produce the wrong result if it turns out that there is no content + // from the upstream -- instead of sending headers and terminating the stream at once, + // we would send headers thinking a body may be coming, and there is none. + // This is problematic on gRPC connections when the upstream server encounters an error, + // in which case it immediately returns the response headers and trailing headers, but no content, + // and clients misbehave if the initial headers response does not indicate stream end. + // TODO: Some of the tasks in steps (7) - (9) may go unobserved depending on what fails first. Needs more consideration. - await context.Response.StartAsync(shortCancellation); // ::::::::::::::::::::::::::::::::::::::::::::: - // :: Step 8: Copy response body Downstream ◄-- Proxy ◄-- Upstream + // :: Step 7: Copy response body Downstream ◄-- Proxy ◄-- Upstream await CopyBodyDownstreamAsync(upstreamResponse.Content, context.Response.Body, proxyTelemetryContext, longCancellation); + // ::::::::::::::::::::::::::::::::::::::::::::: + // :: Step 8: Copy response trailer headers and finish response Downstream ◄-- Proxy ◄-- Upstream + CopyTrailingHeadersToDownstream(upstreamResponse, context); + + if (isStreamingRequest) + { + // NOTE: We must call `CompleteAsync` so that Kestrel will flush all bytes to the client. + // In the case where there was no response body, + // this is also when headers and trailing headers are sent to the client. + // Without this, the client might wait forever waiting for response bytes, + // while we might wait forever waiting for request bytes, + // leading to a stuck connection and no way to make progress. + await context.Response.CompleteAsync(); + } + // ::::::::::::::::::::::::::::::::::::::::::::: // :: Step 9: Wait for completion of step 2: copying request body Downstream --► Proxy --► Upstream if (bodyToUpstreamContent != null) @@ -203,10 +234,6 @@ private async Task NormalProxyAsync( ////this.logger.LogInformation($" Waiting for downstream --> Proxy --> upstream body proxying to complete"); await bodyToUpstreamContent.ConsumptionTask; } - - // ::::::::::::::::::::::::::::::::::::::::::::: - // :: Step 10: Copy response trailer headers Downstream ◄-- Proxy ◄-- Upstream - CopyTrailingHeadersToDownstream(upstreamResponse, context); } /// @@ -312,13 +339,22 @@ private async Task UpgradableProxyAsync( await Task.WhenAll(upstreamTask, downstreamTask); } - private StreamCopyHttpContent SetupCopyBodyUpstream(Stream source, HttpRequestMessage upstreamRequest, in ProxyTelemetryContext proxyTelemetryContext, CancellationToken cancellation) + private StreamCopyHttpContent SetupCopyBodyUpstream(Stream source, HttpRequestMessage upstreamRequest, in ProxyTelemetryContext proxyTelemetryContext, bool isStreamingRequest, CancellationToken cancellation) { StreamCopyHttpContent contentToUpstream = null; if (source != null) { ////this.logger.LogInformation($" Setting up downstream --> Proxy --> upstream body proxying"); + // Note on `autoFlushHttpClientOutgoingStream: isStreamingRequest`: + // The.NET Core HttpClient stack keeps its own buffers on top of the underlying outgoing connection socket. + // We flush those buffers down to the socket on every write when this is set, + // but it does NOT result in calls to flush on the underlying socket. + // This is necessary because we proxy http2 transparently, + // and we are deliberately unaware of packet structure used e.g. in gRPC duplex channels. + // Because the sockets aren't flushed, the perf impact of this choice is expected to be small. + // Future: It may be wise to set this to true for *all* http2 incoming requests, + // but for now, out of an abundance of caution, we only do it for requests that look like gRPC. var streamCopier = new StreamCopier( _metrics, new StreamCopyTelemetryContext( @@ -326,7 +362,11 @@ private StreamCopyHttpContent SetupCopyBodyUpstream(Stream source, HttpRequestMe backendId: proxyTelemetryContext.BackendId, routeId: proxyTelemetryContext.RouteId, endpointId: proxyTelemetryContext.EndpointId)); - contentToUpstream = new StreamCopyHttpContent(source, streamCopier, cancellation); + contentToUpstream = new StreamCopyHttpContent( + source: source, + streamCopier: streamCopier, + autoFlushHttpClientOutgoingStream: isStreamingRequest, + cancellation: cancellation); upstreamRequest.Content = contentToUpstream; } @@ -417,6 +457,40 @@ private void CopyTrailingHeadersToDownstream(HttpResponseMessage source, HttpCon } } + /// + /// Disable some ASP .NET Core server limits so that we can handle long-running gRPC requests unconstrained. + /// Note that the gRPC server implementation on ASP .NET Core does the same for client-streaming and duplex methods. + /// Since in Gateway we have no way to determine if the current request requires client-streaming or duplex comm, + /// we do this for *all* incoming requests that look like they might be gRPC. + /// + /// + /// Inspired on + /// . + /// + private void DisableMinRequestBodyDataRateAndMaxRequestBodySize(HttpContext httpContext) + { + var minRequestBodyDataRateFeature = httpContext.Features.Get(); + if (minRequestBodyDataRateFeature != null) + { + minRequestBodyDataRateFeature.MinDataRate = null; + } + + var maxRequestBodySizeFeature = httpContext.Features.Get(); + if (maxRequestBodySizeFeature != null) + { + if (!maxRequestBodySizeFeature.IsReadOnly) + { + maxRequestBodySizeFeature.MaxRequestBodySize = null; + } + else + { + // IsReadOnly could be true if middleware has already started reading the request body + // In that case we can't disable the max request body size for the request stream + _logger.LogWarning("Unable to disable max request body size."); + } + } + } + private static class Log { private static readonly Action _httpDowngradeDeteced = LoggerMessage.Define( diff --git a/src/ReverseProxy.Core/Service/Proxy/StreamCopyHttpContent.cs b/src/ReverseProxy.Core/Service/Proxy/StreamCopyHttpContent.cs index 8e429d0cc..55181bef4 100644 --- a/src/ReverseProxy.Core/Service/Proxy/StreamCopyHttpContent.cs +++ b/src/ReverseProxy.Core/Service/Proxy/StreamCopyHttpContent.cs @@ -39,16 +39,18 @@ internal class StreamCopyHttpContent : HttpContent { private readonly Stream _source; private readonly IStreamCopier _streamCopier; + private readonly bool _autoFlushHttpClientOutgoingStream; private readonly CancellationToken _cancellation; private readonly TaskCompletionSource _tcs = new TaskCompletionSource(); - public StreamCopyHttpContent(Stream source, IStreamCopier streamCopier, CancellationToken cancellation) + public StreamCopyHttpContent(Stream source, IStreamCopier streamCopier, bool autoFlushHttpClientOutgoingStream, CancellationToken cancellation) { Contracts.CheckValue(source, nameof(source)); Contracts.CheckValue(streamCopier, nameof(streamCopier)); _source = source; _streamCopier = streamCopier; + _autoFlushHttpClientOutgoingStream = autoFlushHttpClientOutgoingStream; _cancellation = cancellation; } @@ -122,6 +124,16 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon Started = true; try { + if (_autoFlushHttpClientOutgoingStream) + { + // HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write. + // Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it. + // AutoFlushingStream delegates to the provided stream, adding calls to FlushAsync on every WriteAsync. + // Note that HttpClient does NOT call Flush on the underlying socket, so the perf impact of this is expected to be small. + // This statement is based on current knowledge as of .NET Core 3.1.201. + stream = new AutoFlushingStream(stream); + } + // Immediately flush request stream to send headers // https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081 await stream.FlushAsync(); diff --git a/src/ReverseProxy.Core/Util/GrpcProtocolHelper.cs b/src/ReverseProxy.Core/Util/GrpcProtocolHelper.cs new file mode 100644 index 000000000..436edfdea --- /dev/null +++ b/src/ReverseProxy.Core/Util/GrpcProtocolHelper.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; + +namespace Microsoft.ReverseProxy.Core +{ + internal static class GrpcProtocolHelper + { + internal const string GrpcContentType = "application/grpc"; + + // NOTE: When https://github.com/dotnet/aspnetcore/issues/21265 is addressed, + // this can be replaced with `MediaTypeHeaderValue.IsSubsetOf(...)`. + /// + /// Checks whether the provided content type header value represents a gRPC request. + /// Takes inspiration from + /// . + /// + public static bool IsGrpcContentType(string contentType) + { + if (contentType == null) + { + return false; + } + + if (!contentType.StartsWith(GrpcContentType, StringComparison.OrdinalIgnoreCase)) + { + return false; + } + + if (contentType.Length == GrpcContentType.Length) + { + // Exact match + return true; + } + + // Support variations on the content-type (e.g. +proto, +json) + var nextChar = contentType[GrpcContentType.Length]; + if (nextChar == ';') + { + return true; + } + if (nextChar == '+') + { + // Accept any message format. Marshaller could be set to support third-party formats + return true; + } + + return false; + } + } +} diff --git a/test/ReverseProxy.Core.Tests/Service/Proxy/StreamCopyHttpContentTests.cs b/test/ReverseProxy.Core.Tests/Service/Proxy/StreamCopyHttpContentTests.cs index f0e24725f..b6e64937b 100644 --- a/test/ReverseProxy.Core.Tests/Service/Proxy/StreamCopyHttpContentTests.cs +++ b/test/ReverseProxy.Core.Tests/Service/Proxy/StreamCopyHttpContentTests.cs @@ -30,7 +30,7 @@ public async Task CopyToAsync_InvokesStreamCopier() .Setup(s => s.CopyAsync(source, destination, It.IsAny())) .Returns(() => source.CopyToAsync(destination)); - var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); // Act & Assert Assert.False(sut.ConsumptionTask.IsCompleted); @@ -42,6 +42,38 @@ public async Task CopyToAsync_InvokesStreamCopier() Assert.Equal(sourceBytes, destination.ToArray()); } + [Theory] + [InlineData(false, 1)] // we expect to always flush at least once to trigger sending request headers + [InlineData(true, 2)] + public async Task CopyToAsync_AutoFlushing(bool autoFlush, int expectedFlushes) + { + // Arrange + const int SourceSize = (128 * 1024) - 3; + + var sourceBytes = Enumerable.Range(0, SourceSize).Select(i => (byte)(i % 256)).ToArray(); + var source = new MemoryStream(sourceBytes); + var destination = new MemoryStream(); + var flushCountingDestination = new FlushCountingStream(destination); + + var streamCopierMock = new Mock(); + streamCopierMock + .Setup(s => s.CopyAsync(source, It.IsAny(), It.IsAny())) + .Returns((Stream source_, Stream destination_, CancellationToken cancellation_) => + source_.CopyToAsync(destination_)); + + var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, autoFlushHttpClientOutgoingStream: autoFlush, CancellationToken.None); + + // Act & Assert + Assert.False(sut.ConsumptionTask.IsCompleted); + Assert.False(sut.Started); + await sut.CopyToAsync(flushCountingDestination); + + Assert.True(sut.Started); + Assert.True(sut.ConsumptionTask.IsCompleted); + Assert.Equal(sourceBytes, destination.ToArray()); + Assert.Equal(expectedFlushes, flushCountingDestination.NumFlushes); + } + [Fact] public async Task CopyToAsync_AsyncSequencing() { @@ -57,7 +89,7 @@ public async Task CopyToAsync_AsyncSequencing() await tcs.Task; }); - var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); // Act & Assert Assert.False(sut.ConsumptionTask.IsCompleted); @@ -78,7 +110,7 @@ public Task ReadAsStreamAsync_Throws() // Arrange var source = new MemoryStream(); var destination = new MemoryStream(); - var sut = new StreamCopyHttpContent(source, new Mock().Object, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, new Mock().Object, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); // Act Func func = () => sut.ReadAsStreamAsync(); @@ -93,7 +125,7 @@ public void AllowDuplex_ReturnsTrue() // Arrange var source = new MemoryStream(); var streamCopierMock = new Mock(); - var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, streamCopierMock.Object, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); // Assert // This is an internal property that HttpClient and friends use internally and which must be true @@ -104,5 +136,72 @@ public void AllowDuplex_ReturnsTrue() var allowDuplex = (bool)allowDuplexProperty.GetValue(sut); Assert.True(allowDuplex); } + + private class FlushCountingStream : Stream + { + private readonly Stream _stream; + + public FlushCountingStream(Stream stream) + { + _stream = stream; + } + + public int NumFlushes { get; private set; } + + public override bool CanRead => _stream.CanRead; + + public override bool CanSeek => _stream.CanSeek; + + public override bool CanWrite => _stream.CanWrite; + + public override long Length => _stream.Length; + + public override long Position + { + get => _stream.Position; + set => _stream.Position = value; + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _stream.WriteAsync(buffer, offset, count, cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return _stream.WriteAsync(buffer, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await _stream.FlushAsync(cancellationToken); + NumFlushes++; + } + + public override void Flush() + { + _stream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _stream.Read(buffer, offset, count); + } + + public override long Seek(long offset, SeekOrigin origin) + { + return _stream.Seek(offset, origin); + } + + public override void SetLength(long value) + { + _stream.SetLength(value); + } + + public override void Write(byte[] buffer, int offset, int count) + { + _stream.Write(buffer, offset, count); + } + } } }