Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/ReverseProxy.Core/Service/Proxy/AutoFlushingStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ReverseProxy.Utilities;

namespace Microsoft.ReverseProxy.Core.Service.Proxy
{
/// <summary>
/// Delegates to a wrapped stream and calls its
/// <see cref="Stream.Flush"/> or <see cref="Stream.FlushAsync(CancellationToken)"/>
/// on every write.
/// </summary>
/// <remarks>
/// This is used by <see cref="StreamCopyHttpContent"/> to work around some undesirable behavior
/// in the HttpClient machinery (as of .NET Core 3.1.201) where an internal buffer
/// doesn't get written to the outgoing socket stream when we write to the outgoing stream.
/// Calling Flush on that stream sends the bytes to the underlying socket,
/// but does not call flush on the socket, so perf impact is expected to be small.
/// </remarks>
internal sealed class AutoFlushingStream : Stream
{
private readonly Stream _stream;

public AutoFlushingStream(Stream stream)
{
Contracts.CheckValue(stream, nameof(stream));
_stream = stream;
}

public override bool CanRead => _stream.CanRead;

public override bool CanSeek => _stream.CanSeek;

public override bool CanWrite => _stream.CanWrite;

public override long Length => _stream.Length;

public override long Position
{
get => _stream.Position;
set => _stream.Position = value;
}

public override void Write(byte[] buffer, int offset, int count)
{
_stream.Write(buffer, offset, count);
_stream.Flush();
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _stream.WriteAsync(buffer, offset, count, cancellationToken);
await _stream.FlushAsync(cancellationToken);
}

public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await _stream.WriteAsync(buffer, cancellationToken);
await _stream.FlushAsync(cancellationToken);
}

public override void Flush()
{
_stream.Flush();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return _stream.FlushAsync(cancellationToken);
}

public override int Read(byte[] buffer, int offset, int count)
{
return _stream.Read(buffer, offset, count);
}

public override long Seek(long offset, SeekOrigin origin)
{
return _stream.Seek(offset, origin);
}

public override void SetLength(long value)
{
_stream.SetLength(value);
}
}
}
126 changes: 100 additions & 26 deletions src/ReverseProxy.Core/Service/Proxy/HttpProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.ReverseProxy.Common;
using Microsoft.ReverseProxy.Core.Abstractions;
using Microsoft.ReverseProxy.Core.Service.Metrics;
using Microsoft.ReverseProxy.Core.Service.Proxy.Infra;
using Microsoft.ReverseProxy.Utilities;
Expand Down Expand Up @@ -108,16 +108,16 @@ public Task ProxyAsync(
/// </summary>
/// <remarks>
/// Normal proxying comprises the following steps:
/// (1) Create outgoing HttpRequestMessage
/// (2) Setup copy of request body (background) Downstream --► Proxy --► Upstream
/// (3) Copy request headers Downstream --► Proxy --► Upstream
/// (4) Send the outgoing request using HttpMessageInvoker Downstream --► Proxy --► Upstream
/// (5) Copy response status line Downstream ◄-- Proxy ◄-- Upstream
/// (6) Copy response headers Downstream ◄-- Proxy ◄-- Upstream
/// (7) Send response headers Downstream ◄-- Proxy ◄-- Upstream
/// (8) Copy response body Downstream ◄-- Proxy ◄-- Upstream
/// (9) Wait for completion of step 2: copying request body Downstream --► Proxy --► Upstream
/// (10) Copy response trailer headers Downstream ◄-- Proxy ◄-- Upstream
/// (0) Disable ASP .NET Core limits for streaming requests
/// (1) Create outgoing HttpRequestMessage
/// (2) Setup copy of request body (background) Downstream --► Proxy --► Upstream
/// (3) Copy request headers Downstream --► Proxy --► Upstream
/// (4) Send the outgoing request using HttpMessageInvoker Downstream --► Proxy --► Upstream
/// (5) Copy response status line Downstream ◄-- Proxy ◄-- Upstream
/// (6) Copy response headers Downstream ◄-- Proxy ◄-- Upstream
/// (7) Copy response body Downstream ◄-- Proxy ◄-- Upstream
/// (8) Copy response trailer headers and finish response Downstream ◄-- Proxy ◄-- Upstream
/// (9) Wait for completion of step 2: copying request body Downstream --► Proxy --► Upstream
///
/// ASP .NET Core (Kestrel) will finally send response trailers (if any)
/// after we complete the steps above and relinquish control.
Expand All @@ -134,6 +134,18 @@ private async Task NormalProxyAsync(
Contracts.CheckValue(targetUri, nameof(targetUri));
Contracts.CheckValue(httpClient, nameof(httpClient));

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 0: Disable ASP .NET Core limits for streaming requests
var isIncomingHttp2 = HttpProtocol.IsHttp2(context.Request.Protocol);

// NOTE: We heuristically assume gRPC-looking requests may require streaming semantics.
// See https://github.com/microsoft/reverse-proxy/issues/118 for design discussion.
var isStreamingRequest = isIncomingHttp2 && GrpcProtocolHelper.IsGRpcContentType(context.Request.ContentType);
if (isStreamingRequest)
{
DisableMinRequestBodyDataRateAndMaxRequestBodySize(context);
}

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 1: Create outgoing HttpRequestMessage
var upstreamRequest = new HttpRequestMessage(HttpUtilities.GetHttpMethod(context.Request.Method), targetUri)
Expand All @@ -147,7 +159,7 @@ private async Task NormalProxyAsync(
// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 2: Setup copy of request body (background) Downstream --► Proxy --► Upstream
// Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here.
var bodyToUpstreamContent = SetupCopyBodyUpstream(context.Request.Body, upstreamRequest, in proxyTelemetryContext, longCancellation);
var bodyToUpstreamContent = SetupCopyBodyUpstream(context.Request.Body, upstreamRequest, in proxyTelemetryContext, isStreamingRequest, longCancellation);

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 3: Copy request headers Downstream --► Proxy --► Upstream
Expand All @@ -159,7 +171,7 @@ private async Task NormalProxyAsync(
var upstreamResponse = await httpClient.SendAsync(upstreamRequest, shortCancellation);

// Detect connection downgrade, which may be problematic for e.g. gRPC.
if (upstreamResponse.Version.Major != 2 && HttpProtocol.IsHttp2(context.Request.Protocol))
if (isIncomingHttp2 && upstreamResponse.Version.Major != 2)
{
// TODO: Do something on connection downgrade...
Log.HttpDowngradeDeteced(_logger);
Expand All @@ -184,29 +196,44 @@ private async Task NormalProxyAsync(
// :: Step 6: Copy response headers Downstream ◄-- Proxy ◄-- Upstream
CopyHeadersToDownstream(upstreamResponse, context.Response.Headers);

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 7: Send response headers Downstream ◄-- Proxy ◄-- Upstream
// This is important to avoid any extra delays in sending response headers
// e.g. if the upstream server is slow to provide its response body.
////this.logger.LogInformation($" Starting downstream <-- Proxy response");
// NOTE: it may *seem* wise to call `context.Response.StartAsync()` at this point
// since it looks like we are ready to send back response headers
// (and this might help reduce extra delays while we wait to receive the body from upstream).
// HOWEVER, this would produce the wrong result if it turns out that there is no content
// from the upstream -- instead of sending headers and terminating the stream at once,
// we would send headers thinking a body may be coming, and there is none.
// This is problematic on gRPC connections when the upstream server encounters an error,
// in which case it immediately returns the response headers and trailing headers, but no content,
// and clients misbehave if the initial headers response does not indicate stream end.

// TODO: Some of the tasks in steps (7) - (9) may go unobserved depending on what fails first. Needs more consideration.
await context.Response.StartAsync(shortCancellation);

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 8: Copy response body Downstream ◄-- Proxy ◄-- Upstream
// :: Step 7: Copy response body Downstream ◄-- Proxy ◄-- Upstream
await CopyBodyDownstreamAsync(upstreamResponse.Content, context.Response.Body, proxyTelemetryContext, longCancellation);

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 8: Copy response trailer headers and finish response Downstream ◄-- Proxy ◄-- Upstream
CopyTrailingHeadersToDownstream(upstreamResponse, context);

if (isStreamingRequest)
{
// NOTE: We must call `CompleteAsync` so that Kestrel will flush all bytes to the client.
// In the case where there was no response body,
// this is also when headers and trailing headers are sent to the client.
// Without this, the client might wait forever waiting for response bytes,
// while we might wait forever waiting for request bytes,
// leading to a stuck connection and no way to make progress.
await context.Response.CompleteAsync();
}

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 9: Wait for completion of step 2: copying request body Downstream --► Proxy --► Upstream
if (bodyToUpstreamContent != null)
{
////this.logger.LogInformation($" Waiting for downstream --> Proxy --> upstream body proxying to complete");
await bodyToUpstreamContent.ConsumptionTask;
}

// :::::::::::::::::::::::::::::::::::::::::::::
// :: Step 10: Copy response trailer headers Downstream ◄-- Proxy ◄-- Upstream
CopyTrailingHeadersToDownstream(upstreamResponse, context);
}

/// <summary>
Expand Down Expand Up @@ -312,21 +339,34 @@ private async Task UpgradableProxyAsync(
await Task.WhenAll(upstreamTask, downstreamTask);
}

private StreamCopyHttpContent SetupCopyBodyUpstream(Stream source, HttpRequestMessage upstreamRequest, in ProxyTelemetryContext proxyTelemetryContext, CancellationToken cancellation)
private StreamCopyHttpContent SetupCopyBodyUpstream(Stream source, HttpRequestMessage upstreamRequest, in ProxyTelemetryContext proxyTelemetryContext, bool isStreamingRequest, CancellationToken cancellation)
{
StreamCopyHttpContent contentToUpstream = null;
if (source != null)
{
////this.logger.LogInformation($" Setting up downstream --> Proxy --> upstream body proxying");

// Note on `autoFlushHttpClientOutgoingStream: isStreamingRequest`:
// The.NET Core HttpClient stack keeps its own buffers on top of the underlying outgoing connection socket.
// We flush those buffers down to the socket on every write when this is set,
// but it does NOT result in calls to flush on the underlying socket.
// This is necessary because we proxy http2 transparently,
// and we are deliberately unaware of packet structure used e.g. in gRPC duplex channels.
// Because the sockets aren't flushed, the perf impact of this choice is expected to be small.
// Future: It may be wise to set this to true for *all* http2 incoming requests,
// but for now, out of an abundance of caution, we only do it for requests that look like gRPC.
var streamCopier = new StreamCopier(
_metrics,
new StreamCopyTelemetryContext(
direction: "upstream",
backendId: proxyTelemetryContext.BackendId,
routeId: proxyTelemetryContext.RouteId,
endpointId: proxyTelemetryContext.EndpointId));
contentToUpstream = new StreamCopyHttpContent(source, streamCopier, cancellation);
contentToUpstream = new StreamCopyHttpContent(
source: source,
streamCopier: streamCopier,
autoFlushHttpClientOutgoingStream: isStreamingRequest,
cancellation: cancellation);
upstreamRequest.Content = contentToUpstream;
}

Expand Down Expand Up @@ -417,6 +457,40 @@ private void CopyTrailingHeadersToDownstream(HttpResponseMessage source, HttpCon
}
}

/// <summary>
/// Disable some ASP .NET Core server limits so that we can handle long-running gRPC requests unconstrained.
/// Note that the gRPC server implementation on ASP .NET Core does the same for client-streaming and duplex methods.
/// Since in Gateway we have no way to determine if the current request requires client-streaming or duplex comm,
/// we do this for *all* incoming requests that look like they might be gRPC.
/// </summary>
/// <remarks>
/// Inspired on
/// <see href="https://github.com/grpc/grpc-dotnet/blob/3ce9b104524a4929f5014c13cd99ba9a1c2431d4/src/Grpc.AspNetCore.Server/Internal/CallHandlers/ServerCallHandlerBase.cs#L127"/>.
/// </remarks>
private void DisableMinRequestBodyDataRateAndMaxRequestBodySize(HttpContext httpContext)
{
var minRequestBodyDataRateFeature = httpContext.Features.Get<IHttpMinRequestBodyDataRateFeature>();
if (minRequestBodyDataRateFeature != null)
{
minRequestBodyDataRateFeature.MinDataRate = null;
}

var maxRequestBodySizeFeature = httpContext.Features.Get<IHttpMaxRequestBodySizeFeature>();
if (maxRequestBodySizeFeature != null)
{
if (!maxRequestBodySizeFeature.IsReadOnly)
{
maxRequestBodySizeFeature.MaxRequestBodySize = null;
}
else
{
// IsReadOnly could be true if middleware has already started reading the request body
// In that case we can't disable the max request body size for the request stream
_logger.LogWarning("Unable to disable max request body size.");
}
}
}

private static class Log
{
private static readonly Action<ILogger, Exception> _httpDowngradeDeteced = LoggerMessage.Define(
Expand Down
14 changes: 13 additions & 1 deletion src/ReverseProxy.Core/Service/Proxy/StreamCopyHttpContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ internal class StreamCopyHttpContent : HttpContent
{
private readonly Stream _source;
private readonly IStreamCopier _streamCopier;
private readonly bool _autoFlushHttpClientOutgoingStream;
private readonly CancellationToken _cancellation;
private readonly TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();

public StreamCopyHttpContent(Stream source, IStreamCopier streamCopier, CancellationToken cancellation)
public StreamCopyHttpContent(Stream source, IStreamCopier streamCopier, bool autoFlushHttpClientOutgoingStream, CancellationToken cancellation)
{
Contracts.CheckValue(source, nameof(source));
Contracts.CheckValue(streamCopier, nameof(streamCopier));

_source = source;
_streamCopier = streamCopier;
_autoFlushHttpClientOutgoingStream = autoFlushHttpClientOutgoingStream;
_cancellation = cancellation;
}

Expand Down Expand Up @@ -122,6 +124,16 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
Started = true;
try
{
if (_autoFlushHttpClientOutgoingStream)
{
// HttpClient's machinery keeps an internal buffer that doesn't get flushed to the socket on every write.
// Some protocols (e.g. gRPC) may rely on specific bytes being sent, and HttpClient's buffering would prevent it.
// AutoFlushingStream delegates to the provided stream, adding calls to FlushAsync on every WriteAsync.
// Note that HttpClient does NOT call Flush on the underlying socket, so the perf impact of this is expected to be small.
// This statement is based on current knowledge as of .NET Core 3.1.201.
stream = new AutoFlushingStream(stream);
}

// Immediately flush request stream to send headers
// https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081
await stream.FlushAsync();
Expand Down
Loading