Skip to content

[Enhanced Http] Support for http proxying scenarios #9193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Yarp.ReverseProxy.Forwarder;

using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types;
using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata;
Expand Down Expand Up @@ -81,7 +82,8 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
private bool _isSharedMemoryDataTransferEnabled;
private bool? _cancelCapabilityEnabled;
private bool _isWorkerApplicationInsightsLoggingEnabled;

private IHttpProxyService _httpProxyService;
private Uri _httpProxyEndpoint;
private System.Timers.Timer _timer;

internal GrpcWorkerChannel(
Expand All @@ -96,7 +98,8 @@ internal GrpcWorkerChannel(
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
ISharedMemoryManager sharedMemoryManager,
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions,
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions,
IHttpProxyService httpProxyService)
{
_workerId = workerId;
_eventManager = eventManager;
Expand All @@ -112,6 +115,7 @@ internal GrpcWorkerChannel(
_processInbound = state => ProcessItem((InboundGrpcEvent)state);
_hostingConfigOptions = hostingConfigOptions;

_httpProxyService = httpProxyService;
_workerCapabilities = new GrpcCapabilities(_workerChannelLogger);

if (!_eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound))
Expand All @@ -133,6 +137,8 @@ internal GrpcWorkerChannel(
_state = RpcWorkerChannelState.Default;
}

private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null;

public string Id => _workerId;

public IDictionary<string, BufferBlock<ScriptInvocationContext>> FunctionInputBuffers => _functionInputBuffers;
Expand Down Expand Up @@ -420,6 +426,20 @@ internal void WorkerInitResponse(GrpcEvent initEvent)
_isWorkerApplicationInsightsLoggingEnabled = true;
}

// If http proxying is enabled, we need to get the proxying endpoint of this worker
var httpUri = _workerCapabilities.GetCapabilityState(RpcWorkerConstants.HttpUri);
if (!string.IsNullOrEmpty(httpUri) && FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) will always be false for worker started in placeholder mode

{
try
{
_httpProxyEndpoint = new Uri(httpUri);
}
catch (Exception ex)
{
HandleWorkerInitError(ex);
}
}

_workerInitTask.TrySetResult(true);
}

Expand Down Expand Up @@ -738,6 +758,13 @@ await SendStreamingMessageAsync(new StreamingMessage
{
context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId));
}

if (IsHttpProxyingWorker && FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) && context.FunctionMetadata.IsHttpTriggerFunction())
{
var aspNetTask = _httpProxyService.ForwardAsync(context, _httpProxyEndpoint).AsTask();

context.Properties.Add(ScriptConstants.HttpProxyTask, aspNetTask);
}
}
catch (Exception invokeEx)
{
Expand Down Expand Up @@ -902,6 +929,20 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
{
if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if IsInvocationSuccess is false but the proxy request made it through. Also what happens when IsInvocationSuccess is false and proxy request has failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design for these cases hasn't been discussed but for now if IsInvocationSuccess fails at all we will ignore any proxying results and log an error.

{
if (FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) && IsHttpProxyingWorker)
{
if (context.Properties.TryGetValue(ScriptConstants.HttpProxyTask, out Task<ForwarderError> httpProxyTask))
{
ForwarderError httpProxyTaskResult = await httpProxyTask;

if (httpProxyTaskResult is not ForwarderError.None)
{
// TODO: Understand scenarios where function invocation succeeds but there is an error proxying
// need to investigate different ForwarderErrors and consider how they will be relayed through other services and to users
}
}
}

_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id));

try
Expand Down
11 changes: 7 additions & 4 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory
private readonly ISharedMemoryManager _sharedMemoryManager = null;
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
private readonly IHttpProxyService _httpProxyService;

public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory,
IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager,
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
{
_eventManager = eventManager;
_loggerFactory = loggerFactory;
Expand All @@ -39,6 +40,7 @@ public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment e
_sharedMemoryManager = sharedMemoryManager;
_workerConcurrencyOptions = workerConcurrencyOptions;
_hostingConfigOptions = hostingConfigOptions;
_httpProxyService = httpProxyService;
}

public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, IEnumerable<RpcWorkerConfig> workerConfigs)
Expand All @@ -54,12 +56,12 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig);

return CreateInternal(workerId, _eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount,
_environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions);
_environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions, _httpProxyService);
}

internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess,
ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> applicationHostOptions,
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions)
ISharedMemoryManager sharedMemoryManager, IOptions<WorkerConcurrencyOptions> workerConcurrencyOptions, IOptions<FunctionsHostingConfigOptions> hostingConfigOptions, IHttpProxyService httpProxyService)
{
return new GrpcWorkerChannel(
workerId,
Expand All @@ -73,7 +75,8 @@ internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventM
applicationHostOptions,
sharedMemoryManager,
workerConcurrencyOptions,
hostingConfigOptions);
hostingConfigOptions,
httpProxyService);
}
}
}
3 changes: 3 additions & 0 deletions src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public static IServiceCollection AddScriptGrpc(this IServiceCollection services)

services.AddSingleton<IRpcServer, AspNetCoreGrpcServer>();

services.AddHttpForwarder();
services.AddSingleton<IHttpProxyService, DefaultHttpProxyService>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we can't get this behind the feature flag (can't access FeatureFlags yet at this point, host will crash) so these services will be added but not used.


return services;
}
}
Expand Down
66 changes: 66 additions & 0 deletions src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Yarp.ReverseProxy.Forwarder;

namespace Microsoft.Azure.WebJobs.Script.Grpc
{
internal class DefaultHttpProxyService : IHttpProxyService, IDisposable
{
private readonly SocketsHttpHandler _handler;
private readonly IHttpForwarder _httpForwarder;
private readonly HttpMessageInvoker _messageInvoker;
private readonly ForwarderRequestConfig _forwarderRequestConfig;

public DefaultHttpProxyService(IHttpForwarder httpForwarder)
{
_httpForwarder = httpForwarder ?? throw new ArgumentNullException(nameof(httpForwarder));

_handler = new SocketsHttpHandler();
_messageInvoker = new HttpMessageInvoker(_handler);
_forwarderRequestConfig = new ForwarderRequestConfig();
}

public void Dispose()
{
_handler?.Dispose();
_messageInvoker?.Dispose();
}

public ValueTask<ForwarderError> ForwardAsync(ScriptInvocationContext context, Uri httpUri)
{
ArgumentNullException.ThrowIfNull(context);

if (context.Inputs is null || context.Inputs?.Count() == 0)
{
throw new InvalidOperationException($"The function {context.FunctionMetadata.Name} can not be evaluated since it has no inputs.");
}

HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest;

if (httpRequest is null)
{
throw new InvalidOperationException($"Cannot proxy the HttpTrigger function {context.FunctionMetadata.Name} without an input of type {nameof(HttpRequest)}.");
}

HttpContext httpContext = httpRequest.HttpContext;

httpContext.Items.Add(ScriptConstants.HttpProxyingEnabled, bool.TrueString);

// add invocation id as correlation id
httpRequest.Headers.TryAdd(ScriptConstants.HttpProxyCorrelationHeader, context.ExecutionContext.InvocationId.ToString());

var aspNetTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig);

return aspNetTask;
}
}
}
15 changes: 15 additions & 0 deletions src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Description;
using Yarp.ReverseProxy.Forwarder;

namespace Microsoft.Azure.WebJobs.Script.Grpc
{
public interface IHttpProxyService
{
ValueTask<ForwarderError> ForwardAsync(ScriptInvocationContext context, Uri httpUri);
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
<PackageReference Include="Yarp.ReverseProxy" Version="2.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public FunctionInvocationMiddleware(RequestDelegate next)
_next = next;
}

// TODO: Confirm that only HttpTrigger requests would flow through here
public async Task Invoke(HttpContext context)
{
if (_next != null)
Expand All @@ -52,6 +53,15 @@ public async Task Invoke(HttpContext context)

int nestedProxiesCount = GetNestedProxiesCount(context, functionExecution);
IActionResult result = await GetResultAsync(context, functionExecution);

if (context.Items.TryGetValue(ScriptConstants.HttpProxyingEnabled, out var value))
{
if (value?.ToString() == bool.TrueString)
{
return;
}
}

if (nestedProxiesCount > 0)
{
// if Proxy, the rest of the pipeline will be processed by Proxies in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ public class ScriptInvocationContext
public ILogger Logger { get; set; }

public System.Threading.ExecutionContext AsyncExecutionContext { get; set; }

public Dictionary<string, object> Properties { get; set; } = new Dictionary<string, object>();
}
}
11 changes: 9 additions & 2 deletions src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public static class FunctionMetadataExtensions
private const string IsDisabledKey = "IsDisabled";
private const string IsCodelessKey = "IsCodeless";
private const string FunctionIdKey = "FunctionId";
private const string HttpTriggerKey = "HttpTrigger";
private const string HttpOutputKey = "Http";

public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata)
{
Expand All @@ -24,15 +26,20 @@ public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata)

BindingMetadata inputBindingMetadata = metadata.InputBindings.ElementAt(0);
BindingMetadata outputBindingMetadata = metadata.OutputBindings.ElementAt(0);
if (string.Equals("httptrigger", inputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase) &&
string.Equals("http", outputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase))
if (string.Equals(HttpTriggerKey, inputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase) &&
string.Equals(HttpOutputKey, outputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase))
{
return true;
}

return false;
}

public static bool IsHttpTriggerFunction(this FunctionMetadata metadata)
{
return metadata.InputBindings.Any(b => string.Equals(HttpTriggerKey, b.Type, StringComparison.OrdinalIgnoreCase));
}

public static string GetFunctionId(this FunctionMetadata metadata)
{
if (!metadata.Properties.TryGetValue(FunctionIdKey, out object idObj)
Expand Down
6 changes: 6 additions & 0 deletions src/WebJobs.Script/ScriptConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static class ScriptConstants
public const string FeatureFlagEnableWorkerIndexing = "EnableWorkerIndexing";
public const string FeatureFlagEnableDebugTracing = "EnableDebugTracing";
public const string FeatureFlagEnableProxies = "EnableProxies";
public const string FeatureFlagEnableHttpProxying = "EnableHttpProxying";
public const string HostingConfigDisableLinuxAppServiceDetailedExecutionEvents = "DisableLinuxExecutionDetails";
public const string HostingConfigDisableLinuxAppServiceExecutionEventLogBackoff = "DisableLinuxLogBackoff";

Expand Down Expand Up @@ -225,5 +226,10 @@ public static class ScriptConstants
public static readonly string LiveLogsSessionAIKey = "#AzFuncLiveLogsSessionId";

public static readonly string FunctionsHostingConfigSectionName = "FunctionsHostingConfig";

// HTTP Proxying constants
public static readonly string HttpProxyingEnabled = "HttpProxyingEnabled";
public static readonly string HttpProxyCorrelationHeader = "x-ms-invocation-id";
public static readonly string HttpProxyTask = "HttpProxyTask";
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static class RpcWorkerConstants
public const string HandlesInvocationCancelMessage = "HandlesInvocationCancelMessage";
public const string HandlesWorkerWarmupMessage = "HandlesWorkerWarmupMessage";
public const string WorkerApplicationInsightsLoggingEnabled = nameof(WorkerApplicationInsightsLoggingEnabled);
public const string HttpUri = "HttpUri";

/// <summary>
/// Indicates whether empty entries in the trigger message should be included when sending RpcInvocation data to OOP workers.
Expand Down
Loading