From 714da52df4655f9ba92cf5fb4e1fa40563751843 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Wed, 14 Dec 2022 13:04:38 -0800 Subject: [PATCH 01/24] wip prototype --- .../Channel/GrpcWorkerChannel.cs | 40 ++++++++++++++++++- .../Channel/GrpcWorkerChannelFactory.cs | 11 +++-- .../GrpcServiceCollectionsExtensions.cs | 2 + .../WebJobs.Script.Grpc.csproj | 1 + .../WebJobs.Script.WebHost.csproj | 1 + 5 files changed, 50 insertions(+), 5 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index d1954325a8..a6e338307a 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -8,6 +8,7 @@ using System.Diagnostics; using System.IO; using System.Linq; +using System.Net.Http; using System.Reactive.Linq; using System.Text; using System.Threading; @@ -16,6 +17,7 @@ using System.Threading.Tasks.Dataflow; using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; +using Microsoft.AspNetCore.Http; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Description; @@ -31,6 +33,7 @@ using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Yarp.ReverseProxy.Forwarder; using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types; using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata; @@ -81,6 +84,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable private bool _isSharedMemoryDataTransferEnabled; private bool? _cancelCapabilityEnabled; private bool _isWorkerApplicationInsightsLoggingEnabled; + private IHttpForwarder _httpForwarder; private System.Timers.Timer _timer; @@ -96,7 +100,8 @@ internal GrpcWorkerChannel( IOptionsMonitor applicationHostOptions, ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, - IOptions hostingConfigOptions) + IOptions hostingConfigOptions, + IHttpForwarder httpForwarder) { _workerId = workerId; _eventManager = eventManager; @@ -112,6 +117,7 @@ internal GrpcWorkerChannel( _processInbound = state => ProcessItem((InboundGrpcEvent)state); _hostingConfigOptions = hostingConfigOptions; + _httpForwarder = httpForwarder; _workerCapabilities = new GrpcCapabilities(_workerChannelLogger); if (!_eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound)) @@ -738,6 +744,38 @@ await SendStreamingMessageAsync(new StreamingMessage { context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } + + if (context.FunctionMetadata.Trigger.Type.Contains("httpTrigger")) + { + var handler = new SocketsHttpHandler(); + var invoker = new HttpMessageInvoker(handler); + var options = new ForwarderRequestConfig(); + HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.val is HttpRequest).val as HttpRequest; + HttpContext httpContext = httpRequest.HttpContext; + // so http request comes in as an asp.net type (3rd input). + await _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options, static (context, request) => + { + return ValueTask.CompletedTask; + }); + } + else + { + var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); + AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); + _executingInvocations.TryAdd(invocationRequest.InvocationId, context); + + if (_cancelCapabilityEnabled) + { + context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); + } + + _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); + + await SendStreamingMessageAsync(new StreamingMessage + { + InvocationRequest = invocationRequest + }); + } } catch (Exception invokeEx) { diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs index 04ed9b4554..e660544781 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs @@ -13,6 +13,7 @@ using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Yarp.ReverseProxy.Forwarder; namespace Microsoft.Azure.WebJobs.Script.Grpc { @@ -26,10 +27,10 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory private readonly ISharedMemoryManager _sharedMemoryManager = null; private readonly IOptions _workerConcurrencyOptions; private readonly IOptions _hostingConfigOptions; - + private readonly IHttpForwarder _httpForwarder; public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, - IOptions workerConcurrencyOptions, IOptions hostingConfigOptions) + IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpForwarder httpForwarder) { _eventManager = eventManager; _loggerFactory = loggerFactory; @@ -39,6 +40,7 @@ public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment e _sharedMemoryManager = sharedMemoryManager; _workerConcurrencyOptions = workerConcurrencyOptions; _hostingConfigOptions = hostingConfigOptions; + _httpForwarder = httpForwarder; } public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, IEnumerable workerConfigs) @@ -59,7 +61,7 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess, ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor applicationHostOptions, - ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions) + ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpForwarder httpForwarder) { return new GrpcWorkerChannel( workerId, @@ -73,7 +75,8 @@ internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventM applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, - hostingConfigOptions); + hostingConfigOptions, + httpForwarder); } } } diff --git a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs index 5294c4e810..72c7d411cc 100644 --- a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs +++ b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs @@ -16,6 +16,8 @@ public static IServiceCollection AddScriptGrpc(this IServiceCollection services) services.AddSingleton(); + services.AddHttpForwarder(); + return services; } } diff --git a/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj b/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj index 7d7bb7b1b1..30a3e743dc 100644 --- a/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj +++ b/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj @@ -31,6 +31,7 @@ all + diff --git a/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj b/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj index 68d14a629b..5335fddebf 100644 --- a/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj +++ b/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj @@ -90,6 +90,7 @@ + From baaae4b54a11902415b841818052a5e26e342211 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 10 Jan 2023 11:44:11 -0800 Subject: [PATCH 02/24] resolve errors, send messages to both grpc and http pipeline --- .../Channel/GrpcWorkerChannel.cs | 46 +++++++------------ .../Channel/GrpcWorkerChannelFactory.cs | 3 +- 2 files changed, 19 insertions(+), 30 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index a6e338307a..18a4754422 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -730,51 +730,39 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) } var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); - AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); - _executingInvocations.TryAdd(invocationRequest.InvocationId, context); - - _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); - - await SendStreamingMessageAsync(new StreamingMessage - { - InvocationRequest = invocationRequest - }); - - if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value) - { - context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); - } + // hard code this for prototyping - automatically forward http requests for http trigger functions if (context.FunctionMetadata.Trigger.Type.Contains("httpTrigger")) { var handler = new SocketsHttpHandler(); var invoker = new HttpMessageInvoker(handler); var options = new ForwarderRequestConfig(); - HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.val is HttpRequest).val as HttpRequest; + HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest; HttpContext httpContext = httpRequest.HttpContext; + + // add function id to headers as a correlation id + // httpRequest.Headers.Add("function-id", context.FunctionMetadata.GetFunctionId()); + // so http request comes in as an asp.net type (3rd input). await _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options, static (context, request) => { return ValueTask.CompletedTask; }); } - else - { - var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); - AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); - _executingInvocations.TryAdd(invocationRequest.InvocationId, context); - if (_cancelCapabilityEnabled) - { - context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); - } + AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); + _executingInvocations.TryAdd(invocationRequest.InvocationId, context); - _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); + _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); - await SendStreamingMessageAsync(new StreamingMessage - { - InvocationRequest = invocationRequest - }); + await SendStreamingMessageAsync(new StreamingMessage + { + InvocationRequest = invocationRequest + }); + + if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value) + { + context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } } catch (Exception invokeEx) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs index e660544781..ed4089b9eb 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs @@ -28,6 +28,7 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory private readonly IOptions _workerConcurrencyOptions; private readonly IOptions _hostingConfigOptions; private readonly IHttpForwarder _httpForwarder; + public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpForwarder httpForwarder) @@ -56,7 +57,7 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig); return CreateInternal(workerId, _eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount, - _environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions); + _environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions, _httpForwarder); } internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess, From 4257c2021f02b1e8df047ba5b68cbb312e85245b Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Fri, 20 Jan 2023 15:05:45 -0800 Subject: [PATCH 03/24] wip --- src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 18a4754422..bfed7fc7b9 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -740,11 +740,11 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest; HttpContext httpContext = httpRequest.HttpContext; - // add function id to headers as a correlation id - // httpRequest.Headers.Add("function-id", context.FunctionMetadata.GetFunctionId()); + // add invocation id as correlation id + httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); // so http request comes in as an asp.net type (3rd input). - await _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options, static (context, request) => + _ = _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options, static (context, request) => { return ValueTask.CompletedTask; }); From 7f4b66aec6d9c683e1d66f9ec7440dcd2532c638 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 30 Jan 2023 13:53:42 -0800 Subject: [PATCH 04/24] e2e working, first iteration --- .../Channel/GrpcWorkerChannel.cs | 41 ++++++++++--------- .../FunctionInvocationMiddleware.cs | 12 ++++++ .../Host/HostFunctionMetadataProvider.cs | 2 +- .../Workers/Rpc/RpcWorkerConstants.cs | 1 + 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index bfed7fc7b9..c012c3e201 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -731,8 +731,23 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); + AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); + _executingInvocations.TryAdd(invocationRequest.InvocationId, context); + + _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); + + var grpcTask = SendStreamingMessageAsync(new StreamingMessage + { + InvocationRequest = invocationRequest + }); + + if (_cancelCapabilityEnabled) + { + context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); + } + // hard code this for prototyping - automatically forward http requests for http trigger functions - if (context.FunctionMetadata.Trigger.Type.Contains("httpTrigger")) + if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableHttpProxying))) { var handler = new SocketsHttpHandler(); var invoker = new HttpMessageInvoker(handler); @@ -741,28 +756,16 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) HttpContext httpContext = httpRequest.HttpContext; // add invocation id as correlation id + // TODO: add "invocation-id" as a constant somewhere / maybe find a better name httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); - // so http request comes in as an asp.net type (3rd input). - _ = _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options, static (context, request) => - { - return ValueTask.CompletedTask; - }); - } - - AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); - _executingInvocations.TryAdd(invocationRequest.InvocationId, context); - - _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); - - await SendStreamingMessageAsync(new StreamingMessage - { - InvocationRequest = invocationRequest - }); + var aspNetTask = _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options); - if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value) + await Task.WhenAll(grpcTask.AsTask(), aspNetTask.AsTask()); + } + else { - context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); + await grpcTask; } } catch (Exception invokeEx) diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index 979d26b66c..3d4467ec06 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -19,6 +19,7 @@ using Microsoft.Azure.WebJobs.Script.WebHost.Authentication; using Microsoft.Azure.WebJobs.Script.WebHost.Features; using Microsoft.Azure.WebJobs.Script.WebHost.Security.Authorization; +using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -41,6 +42,7 @@ public async Task Invoke(HttpContext context) } var functionExecution = context.Features.Get(); + if (functionExecution != null && !context.Response.HasStarted) { // LiveLogs session id is used to show only contextual logs in the "Code + Test" experience. The id is included in the custom dimension. @@ -61,6 +63,16 @@ public async Task Invoke(HttpContext context) } ActionContext actionContext = new ActionContext(context, context.GetRouteData(), new ActionDescriptor()); + + if (context.Response.Headers.TryGetValue("IsHttpProxying", out var value)) + { + if (value.ToString() == bool.TrueString) + { + return; + } + } + + // TODO: Skip this line and extraneous work to get to this step when HttpProxying await result.ExecuteResultAsync(actionContext); } } diff --git a/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs b/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs index efb069361d..75920dcc1c 100644 --- a/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs +++ b/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs @@ -203,7 +203,7 @@ internal static string ParseLanguage(string scriptFilePath, IEnumerable - /// Determines which script should be considered the "primary" entry point script. Returns null if Primary script file cannot be determined + /// Determines which script should be considered the "primary" entry point script. Returns null if Primary script file cannot be determined. /// internal static string DeterminePrimaryScriptFile(string scriptFile, string scriptDirectory, IFileSystem fileSystem = null) { diff --git a/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs b/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs index 22c7eb2522..8f938bc216 100644 --- a/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs +++ b/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs @@ -54,6 +54,7 @@ public static class RpcWorkerConstants public const string HandlesInvocationCancelMessage = "HandlesInvocationCancelMessage"; public const string HandlesWorkerWarmupMessage = "HandlesWorkerWarmupMessage"; public const string WorkerApplicationInsightsLoggingEnabled = nameof(WorkerApplicationInsightsLoggingEnabled); + public const string EnableHttpProxying = "EnableHttpProxying"; /// /// Indicates whether empty entries in the trigger message should be included when sending RpcInvocation data to OOP workers. From 3052efcdd2a1e1a98ff79d2a0d37af834eb16097 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 31 Jan 2023 16:49:12 -0800 Subject: [PATCH 05/24] new iteration, use httpcontext items to signal proxying to middleware --- src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs | 12 +++++------- .../Middleware/FunctionInvocationMiddleware.cs | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index c012c3e201..27411cf34c 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -736,7 +736,7 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); - var grpcTask = SendStreamingMessageAsync(new StreamingMessage + await SendStreamingMessageAsync(new StreamingMessage { InvocationRequest = invocationRequest }); @@ -746,7 +746,6 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } - // hard code this for prototyping - automatically forward http requests for http trigger functions if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableHttpProxying))) { var handler = new SocketsHttpHandler(); @@ -755,17 +754,16 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest; HttpContext httpContext = httpRequest.HttpContext; + // add IsHttpProxying to HttpContext Items (verify this is internal or not?) + httpContext.Items.Add("IsHttpProxying", bool.TrueString); + // add invocation id as correlation id // TODO: add "invocation-id" as a constant somewhere / maybe find a better name httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); var aspNetTask = _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options); - await Task.WhenAll(grpcTask.AsTask(), aspNetTask.AsTask()); - } - else - { - await grpcTask; + // actually use ScriptInvocationContext, add AspNet task to it and in InvokeResponse wait for that to come back too. } } catch (Exception invokeEx) diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index 3d4467ec06..c132b9074c 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -64,7 +64,7 @@ public async Task Invoke(HttpContext context) ActionContext actionContext = new ActionContext(context, context.GetRouteData(), new ActionDescriptor()); - if (context.Response.Headers.TryGetValue("IsHttpProxying", out var value)) + if (context.Items.TryGetValue("IsHttpProxying", out var value)) { if (value.ToString() == bool.TrueString) { From 0ff22ddf49fb2997e39646f5a1ea017c6618ce3e Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Wed, 1 Feb 2023 10:55:04 -0800 Subject: [PATCH 06/24] wip, waiting on aspnet pipeline to complete before finishing execution --- .../Channel/GrpcWorkerChannel.cs | 17 +++++++++++++++-- .../Workers/ScriptInvocationContext.cs | 2 ++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 27411cf34c..a2599911f9 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -31,6 +31,7 @@ using Microsoft.Azure.WebJobs.Script.Workers; using Microsoft.Azure.WebJobs.Script.Workers.Rpc; using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; +using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Yarp.ReverseProxy.Forwarder; @@ -732,7 +733,6 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); - _executingInvocations.TryAdd(invocationRequest.InvocationId, context); _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); @@ -741,6 +741,8 @@ await SendStreamingMessageAsync(new StreamingMessage InvocationRequest = invocationRequest }); + _executingInvocations.TryAdd(invocationRequest.InvocationId, context); + if (_cancelCapabilityEnabled) { context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); @@ -763,7 +765,7 @@ await SendStreamingMessageAsync(new StreamingMessage var aspNetTask = _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options); - // actually use ScriptInvocationContext, add AspNet task to it and in InvokeResponse wait for that to come back too. + context.Properties.Add("HttpProxyingTask", aspNetTask); } } catch (Exception invokeEx) @@ -929,6 +931,17 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) { if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled)) { + // TODO: We need to check if the Asp.Net pipeline completed successfully or not + if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) + { + ForwarderError httpProxyTaskResult = await httpProxyTask; + + if (httpProxyTaskResult is not ForwarderError.None) + { + // how do we handle an error if invocation succeeds? + } + } + _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id)); try diff --git a/src/WebJobs.Script/Description/Workers/ScriptInvocationContext.cs b/src/WebJobs.Script/Description/Workers/ScriptInvocationContext.cs index 31c2747401..094aaeaaa4 100644 --- a/src/WebJobs.Script/Description/Workers/ScriptInvocationContext.cs +++ b/src/WebJobs.Script/Description/Workers/ScriptInvocationContext.cs @@ -31,5 +31,7 @@ public class ScriptInvocationContext public ILogger Logger { get; set; } public System.Threading.ExecutionContext AsyncExecutionContext { get; set; } + + public Dictionary Properties { get; set; } = new Dictionary(); } } From 93c0b81a58d6386d62013ba21b232d0ade936025 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 6 Feb 2023 16:55:58 -0800 Subject: [PATCH 07/24] working, functions pipeline returns dummy value, aspnet pipeline has the context --- .../Channel/GrpcWorkerChannel.cs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index a2599911f9..6f5541eb3f 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -931,17 +931,6 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) { if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled)) { - // TODO: We need to check if the Asp.Net pipeline completed successfully or not - if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) - { - ForwarderError httpProxyTaskResult = await httpProxyTask; - - if (httpProxyTaskResult is not ForwarderError.None) - { - // how do we handle an error if invocation succeeds? - } - } - _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id)); try @@ -970,6 +959,17 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) IDictionary bindingsDictionary = await invokeResponse.OutputData .ToDictionaryAsync(binding => binding.Name, binding => GetBindingDataAsync(binding, invokeResponse.InvocationId)); + // TODO: We need to check if the Asp.Net pipeline completed successfully or not + if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) + { + ForwarderError httpProxyTaskResult = await httpProxyTask; + + if (httpProxyTaskResult is not ForwarderError.None) + { + // how do we handle this error if Functions pipeline suceeded and this failed? + } + } + var result = new ScriptInvocationResult() { Outputs = bindingsDictionary, From a0e4048b762f844eb1e67d80a4beafd55df74429 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 7 Feb 2023 14:37:44 -0800 Subject: [PATCH 08/24] add env var for http proxy port --- src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 6f5541eb3f..9bbdd05b39 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -763,7 +763,14 @@ await SendStreamingMessageAsync(new StreamingMessage // TODO: add "invocation-id" as a constant somewhere / maybe find a better name httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); - var aspNetTask = _httpForwarder.SendAsync(httpContext, "http://localhost:5555/", invoker, options); + var port = Environment.GetEnvironmentVariable("Azure_Functions_HttpProxyingPort"); + + if (port is null) + { + port = "5555"; + } + + var aspNetTask = _httpForwarder.SendAsync(httpContext, "http://localhost:" + port, invoker, options); context.Properties.Add("HttpProxyingTask", aspNetTask); } From 807b46cc30a168a5204bb9661ed256a2b474ec90 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 14 Feb 2023 15:50:59 -0800 Subject: [PATCH 09/24] cleanup before move to feature branch --- .../Channel/GrpcWorkerChannel.cs | 22 +++++++++---------- .../FunctionInvocationMiddleware.cs | 1 + 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 9bbdd05b39..7346f68922 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -938,6 +938,17 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) { if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled)) { + // TODO: We need to check if the Asp.Net pipeline completed successfully or not + if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) + { + ForwarderError httpProxyTaskResult = await httpProxyTask; + + if (httpProxyTaskResult is not ForwarderError.None) + { + // how do we handle this error if Functions pipeline suceeded and this failed? + } + } + _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id)); try @@ -966,17 +977,6 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) IDictionary bindingsDictionary = await invokeResponse.OutputData .ToDictionaryAsync(binding => binding.Name, binding => GetBindingDataAsync(binding, invokeResponse.InvocationId)); - // TODO: We need to check if the Asp.Net pipeline completed successfully or not - if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) - { - ForwarderError httpProxyTaskResult = await httpProxyTask; - - if (httpProxyTaskResult is not ForwarderError.None) - { - // how do we handle this error if Functions pipeline suceeded and this failed? - } - } - var result = new ScriptInvocationResult() { Outputs = bindingsDictionary, diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index c132b9074c..d9e1bea5bb 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -64,6 +64,7 @@ public async Task Invoke(HttpContext context) ActionContext actionContext = new ActionContext(context, context.GetRouteData(), new ActionDescriptor()); + // TODO: Create constant value for this var? if (context.Items.TryGetValue("IsHttpProxying", out var value)) { if (value.ToString() == bool.TrueString) From 9c7f9674294fdaadf552c59e46a0afa577f7d1fb Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 14 Feb 2023 15:58:56 -0800 Subject: [PATCH 10/24] cleanup following rebase --- src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 7346f68922..845da13bd7 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -731,8 +731,8 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context) } var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager); - AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context); + _executingInvocations.TryAdd(invocationRequest.InvocationId, context); _metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: context.FunctionMetadata.Name); @@ -741,9 +741,7 @@ await SendStreamingMessageAsync(new StreamingMessage InvocationRequest = invocationRequest }); - _executingInvocations.TryAdd(invocationRequest.InvocationId, context); - - if (_cancelCapabilityEnabled) + if (_cancelCapabilityEnabled != null && _cancelCapabilityEnabled.Value) { context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } From 8871f1872a62ef5427a24a07f0ba227cc072e3da Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Fri, 24 Feb 2023 14:30:58 -0800 Subject: [PATCH 11/24] bump yarp version --- src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj b/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj index 5335fddebf..a944525268 100644 --- a/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj +++ b/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj @@ -90,7 +90,7 @@ - + From 142257f6e86ecf714463c48c7da901304d9863ff Mon Sep 17 00:00:00 2001 From: sarah <35204912+satvu@users.noreply.github.com> Date: Tue, 21 Mar 2023 11:19:17 -0700 Subject: [PATCH 12/24] [Enhanced Http] Move Http Proxying logic to its own service (#9149) * cleanup, move proxy logic to service --- .../Channel/GrpcWorkerChannel.cs | 31 ++------ .../Channel/GrpcWorkerChannelFactory.cs | 13 ++-- .../GrpcServiceCollectionsExtensions.cs | 1 + .../Server/DefaultHttpProxyService.cs | 75 +++++++++++++++++++ .../Server/IHttpProxyService.cs | 14 ++++ .../WebJobs.Script.Grpc.csproj | 2 +- .../FunctionInvocationMiddleware.cs | 22 +++--- .../WebJobs.Script.WebHost.csproj | 1 - .../Extensions/FunctionMetadataExtensions.cs | 11 ++- 9 files changed, 122 insertions(+), 48 deletions(-) create mode 100644 src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs create mode 100644 src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 845da13bd7..313e38cc04 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -31,7 +31,6 @@ using Microsoft.Azure.WebJobs.Script.Workers; using Microsoft.Azure.WebJobs.Script.Workers.Rpc; using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; -using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Yarp.ReverseProxy.Forwarder; @@ -85,7 +84,7 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable private bool _isSharedMemoryDataTransferEnabled; private bool? _cancelCapabilityEnabled; private bool _isWorkerApplicationInsightsLoggingEnabled; - private IHttpForwarder _httpForwarder; + private IHttpProxyService _httpProxyService; private System.Timers.Timer _timer; @@ -102,7 +101,7 @@ internal GrpcWorkerChannel( ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, - IHttpForwarder httpForwarder) + IHttpProxyService httpProxyService) { _workerId = workerId; _eventManager = eventManager; @@ -118,7 +117,7 @@ internal GrpcWorkerChannel( _processInbound = state => ProcessItem((InboundGrpcEvent)state); _hostingConfigOptions = hostingConfigOptions; - _httpForwarder = httpForwarder; + _httpProxyService = httpProxyService; _workerCapabilities = new GrpcCapabilities(_workerChannelLogger); if (!_eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound)) @@ -746,29 +745,9 @@ await SendStreamingMessageAsync(new StreamingMessage context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } - if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableHttpProxying))) + if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableHttpProxying)) && context.FunctionMetadata.IsHttpTriggerFunction()) { - var handler = new SocketsHttpHandler(); - var invoker = new HttpMessageInvoker(handler); - var options = new ForwarderRequestConfig(); - HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest; - HttpContext httpContext = httpRequest.HttpContext; - - // add IsHttpProxying to HttpContext Items (verify this is internal or not?) - httpContext.Items.Add("IsHttpProxying", bool.TrueString); - - // add invocation id as correlation id - // TODO: add "invocation-id" as a constant somewhere / maybe find a better name - httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); - - var port = Environment.GetEnvironmentVariable("Azure_Functions_HttpProxyingPort"); - - if (port is null) - { - port = "5555"; - } - - var aspNetTask = _httpForwarder.SendAsync(httpContext, "http://localhost:" + port, invoker, options); + var aspNetTask = _httpProxyService.Forward(context); context.Properties.Add("HttpProxyingTask", aspNetTask); } diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs index ed4089b9eb..ac1c12b9d3 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs @@ -13,7 +13,6 @@ using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Yarp.ReverseProxy.Forwarder; namespace Microsoft.Azure.WebJobs.Script.Grpc { @@ -27,11 +26,11 @@ public class GrpcWorkerChannelFactory : IRpcWorkerChannelFactory private readonly ISharedMemoryManager _sharedMemoryManager = null; private readonly IOptions _workerConcurrencyOptions; private readonly IOptions _hostingConfigOptions; - private readonly IHttpForwarder _httpForwarder; + private readonly IHttpProxyService _httpProxyService; public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, - IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpForwarder httpForwarder) + IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpProxyService httpProxyService) { _eventManager = eventManager; _loggerFactory = loggerFactory; @@ -41,7 +40,7 @@ public GrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment e _sharedMemoryManager = sharedMemoryManager; _workerConcurrencyOptions = workerConcurrencyOptions; _hostingConfigOptions = hostingConfigOptions; - _httpForwarder = httpForwarder; + _httpProxyService = httpProxyService; } public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsLogger metricsLogger, int attemptCount, IEnumerable workerConfigs) @@ -57,12 +56,12 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig); return CreateInternal(workerId, _eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, attemptCount, - _environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions, _httpForwarder); + _environment, _applicationHostOptions, _sharedMemoryManager, _workerConcurrencyOptions, _hostingConfigOptions, _httpProxyService); } internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess, ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor applicationHostOptions, - ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpForwarder httpForwarder) + ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpProxyService httpProxyService) { return new GrpcWorkerChannel( workerId, @@ -77,7 +76,7 @@ internal virtual IRpcWorkerChannel CreateInternal(string workerId, IScriptEventM sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, - httpForwarder); + httpProxyService); } } } diff --git a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs index 72c7d411cc..19a8578263 100644 --- a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs +++ b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs @@ -17,6 +17,7 @@ public static IServiceCollection AddScriptGrpc(this IServiceCollection services) services.AddSingleton(); services.AddHttpForwarder(); + services.AddSingleton(); return services; } diff --git a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs new file mode 100644 index 0000000000..a4755f8dd7 --- /dev/null +++ b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs @@ -0,0 +1,75 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Http; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.Azure.WebJobs.Script.Description; +using Yarp.ReverseProxy.Forwarder; + +namespace Microsoft.Azure.WebJobs.Script.Grpc +{ + internal class DefaultHttpProxyService : IHttpProxyService, IDisposable + { + private SocketsHttpHandler _handler; + private IHttpForwarder _httpForwarder; + private HttpMessageInvoker _messageInvoker; + private ForwarderRequestConfig _forwarderRequestConfig; + private string _proxyEndpoint; + + public DefaultHttpProxyService(IHttpForwarder httpForwarder) + { + _httpForwarder = httpForwarder; + + _handler = new SocketsHttpHandler(); + _messageInvoker = new HttpMessageInvoker(_handler); + _forwarderRequestConfig = new ForwarderRequestConfig(); + + // TODO: Update this logic. Port should come through configuration. + var port = Environment.GetEnvironmentVariable("AZURE_FUNCTIONS_HTTP_PROXY_PORT") ?? "5555"; + + _proxyEndpoint = "http://localhost:" + port; + } + + public void Dispose() + { + _handler?.Dispose(); + _messageInvoker?.Dispose(); + } + + public ValueTask Forward(ScriptInvocationContext context) + { + if (context is null) + { + throw new ArgumentNullException(nameof(context)); + } + + if (context.Inputs is null) + { + throw new InvalidOperationException($"The function {context.FunctionMetadata.Name} can not be evaluated since it has no inputs."); + } + + HttpRequest httpRequest = context.Inputs.FirstOrDefault(i => i.Val is HttpRequest).Val as HttpRequest; + + if (httpRequest is null) + { + throw new InvalidOperationException($"Cannot proxy the HttpTrigger function {context.FunctionMetadata.Name} without an input of type {nameof(HttpRequest)}."); + } + + HttpContext httpContext = httpRequest.HttpContext; + + httpContext.Items.Add("IsHttpProxying", bool.TrueString); + + // add invocation id as correlation id + // TODO: add "invocation-id" as a constant somewhere / maybe find a better name + httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); + + var aspNetTask = _httpForwarder.SendAsync(httpContext, _proxyEndpoint, _messageInvoker, _forwarderRequestConfig); + + return aspNetTask; + } + } +} diff --git a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs new file mode 100644 index 0000000000..a587b5c744 --- /dev/null +++ b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs @@ -0,0 +1,14 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Script.Description; +using Yarp.ReverseProxy.Forwarder; + +namespace Microsoft.Azure.WebJobs.Script.Grpc +{ + public interface IHttpProxyService + { + public ValueTask Forward(ScriptInvocationContext context); + } +} \ No newline at end of file diff --git a/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj b/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj index 30a3e743dc..17183c2357 100644 --- a/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj +++ b/src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj @@ -31,7 +31,7 @@ all - + diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index d9e1bea5bb..8809c81404 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -19,7 +19,6 @@ using Microsoft.Azure.WebJobs.Script.WebHost.Authentication; using Microsoft.Azure.WebJobs.Script.WebHost.Features; using Microsoft.Azure.WebJobs.Script.WebHost.Security.Authorization; -using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -34,6 +33,7 @@ public FunctionInvocationMiddleware(RequestDelegate next) _next = next; } + // TODO: Confirm that only HttpTrigger requests would flow through here public async Task Invoke(HttpContext context) { if (_next != null) @@ -54,15 +54,6 @@ public async Task Invoke(HttpContext context) int nestedProxiesCount = GetNestedProxiesCount(context, functionExecution); IActionResult result = await GetResultAsync(context, functionExecution); - if (nestedProxiesCount > 0) - { - // if Proxy, the rest of the pipeline will be processed by Proxies in - // case there are response overrides and what not. - SetProxyResult(context, nestedProxiesCount, result); - return; - } - - ActionContext actionContext = new ActionContext(context, context.GetRouteData(), new ActionDescriptor()); // TODO: Create constant value for this var? if (context.Items.TryGetValue("IsHttpProxying", out var value)) @@ -73,7 +64,16 @@ public async Task Invoke(HttpContext context) } } - // TODO: Skip this line and extraneous work to get to this step when HttpProxying + if (nestedProxiesCount > 0) + { + // if Proxy, the rest of the pipeline will be processed by Proxies in + // case there are response overrides and what not. + SetProxyResult(context, nestedProxiesCount, result); + return; + } + + ActionContext actionContext = new ActionContext(context, context.GetRouteData(), new ActionDescriptor()); + await result.ExecuteResultAsync(actionContext); } } diff --git a/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj b/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj index a944525268..68d14a629b 100644 --- a/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj +++ b/src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj @@ -90,7 +90,6 @@ - diff --git a/src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs b/src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs index 24fff23eb9..908734f58e 100644 --- a/src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs +++ b/src/WebJobs.Script/Extensions/FunctionMetadataExtensions.cs @@ -14,6 +14,8 @@ public static class FunctionMetadataExtensions private const string IsDisabledKey = "IsDisabled"; private const string IsCodelessKey = "IsCodeless"; private const string FunctionIdKey = "FunctionId"; + private const string HttpTriggerKey = "HttpTrigger"; + private const string HttpOutputKey = "Http"; public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata) { @@ -24,8 +26,8 @@ public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata) BindingMetadata inputBindingMetadata = metadata.InputBindings.ElementAt(0); BindingMetadata outputBindingMetadata = metadata.OutputBindings.ElementAt(0); - if (string.Equals("httptrigger", inputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase) && - string.Equals("http", outputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase)) + if (string.Equals(HttpTriggerKey, inputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase) && + string.Equals(HttpOutputKey, outputBindingMetadata.Type, StringComparison.OrdinalIgnoreCase)) { return true; } @@ -33,6 +35,11 @@ public static bool IsHttpInAndOutFunction(this FunctionMetadata metadata) return false; } + public static bool IsHttpTriggerFunction(this FunctionMetadata metadata) + { + return metadata.InputBindings.Any(b => string.Equals(HttpTriggerKey, b.Type, StringComparison.OrdinalIgnoreCase)); + } + public static string GetFunctionId(this FunctionMetadata metadata) { if (!metadata.Properties.TryGetValue(FunctionIdKey, out object idObj) From 2b53757f16af4a12e9abd708b4667aa272ecb580 Mon Sep 17 00:00:00 2001 From: sarah <35204912+satvu@users.noreply.github.com> Date: Fri, 31 Mar 2023 12:09:32 -0700 Subject: [PATCH 13/24] [Enhanced HTTP] Use proxying endpoint from worker response (#9184) --- .../Channel/GrpcWorkerChannel.cs | 15 +++++++++++++-- .../Server/DefaultHttpProxyService.cs | 16 +++++----------- .../Server/IHttpProxyService.cs | 3 ++- .../Middleware/FunctionInvocationMiddleware.cs | 3 +-- src/WebJobs.Script/ScriptConstants.cs | 4 ++++ .../Workers/Rpc/RpcWorkerConstants.cs | 2 +- 6 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 313e38cc04..6a5907d581 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -10,6 +10,7 @@ using System.Linq; using System.Net.Http; using System.Reactive.Linq; +using System.Reflection.Metadata; using System.Text; using System.Threading; using System.Threading.Channels; @@ -85,6 +86,8 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable private bool? _cancelCapabilityEnabled; private bool _isWorkerApplicationInsightsLoggingEnabled; private IHttpProxyService _httpProxyService; + private Uri _httpProxyEndpoint; + private bool _isHttpProxyingWorker = false; private System.Timers.Timer _timer; @@ -426,6 +429,14 @@ internal void WorkerInitResponse(GrpcEvent initEvent) _isWorkerApplicationInsightsLoggingEnabled = true; } + // If http proxying is enabled, we need to get the proxying endpoint of this worker + var httpUri = _workerCapabilities.GetCapabilityState(RpcWorkerConstants.HttpUri); + if (!string.IsNullOrEmpty(httpUri)) + { + _httpProxyEndpoint = new Uri(httpUri); + _isHttpProxyingWorker = true; + } + _workerInitTask.TrySetResult(true); } @@ -745,9 +756,9 @@ await SendStreamingMessageAsync(new StreamingMessage context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } - if (!string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableHttpProxying)) && context.FunctionMetadata.IsHttpTriggerFunction()) + if (_isHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) { - var aspNetTask = _httpProxyService.Forward(context); + var aspNetTask = _httpProxyService.Forward(context, _httpProxyEndpoint); context.Properties.Add("HttpProxyingTask", aspNetTask); } diff --git a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs index a4755f8dd7..3369902567 100644 --- a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs +++ b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Http; using Microsoft.Azure.WebJobs.Script.Description; +using Microsoft.Azure.WebJobs.Script.Workers.Rpc; using Yarp.ReverseProxy.Forwarder; namespace Microsoft.Azure.WebJobs.Script.Grpc @@ -18,7 +19,6 @@ internal class DefaultHttpProxyService : IHttpProxyService, IDisposable private IHttpForwarder _httpForwarder; private HttpMessageInvoker _messageInvoker; private ForwarderRequestConfig _forwarderRequestConfig; - private string _proxyEndpoint; public DefaultHttpProxyService(IHttpForwarder httpForwarder) { @@ -27,11 +27,6 @@ public DefaultHttpProxyService(IHttpForwarder httpForwarder) _handler = new SocketsHttpHandler(); _messageInvoker = new HttpMessageInvoker(_handler); _forwarderRequestConfig = new ForwarderRequestConfig(); - - // TODO: Update this logic. Port should come through configuration. - var port = Environment.GetEnvironmentVariable("AZURE_FUNCTIONS_HTTP_PROXY_PORT") ?? "5555"; - - _proxyEndpoint = "http://localhost:" + port; } public void Dispose() @@ -40,7 +35,7 @@ public void Dispose() _messageInvoker?.Dispose(); } - public ValueTask Forward(ScriptInvocationContext context) + public ValueTask Forward(ScriptInvocationContext context, Uri httpUri) { if (context is null) { @@ -61,13 +56,12 @@ public ValueTask Forward(ScriptInvocationContext context) HttpContext httpContext = httpRequest.HttpContext; - httpContext.Items.Add("IsHttpProxying", bool.TrueString); + httpContext.Items.Add(ScriptConstants.IsHttpProxyingEnabled, bool.TrueString); // add invocation id as correlation id - // TODO: add "invocation-id" as a constant somewhere / maybe find a better name - httpRequest.Headers.TryAdd("invocation-id", context.ExecutionContext.InvocationId.ToString()); + httpRequest.Headers.TryAdd(ScriptConstants.HttpProxyCorrelationHeader, context.ExecutionContext.InvocationId.ToString()); - var aspNetTask = _httpForwarder.SendAsync(httpContext, _proxyEndpoint, _messageInvoker, _forwarderRequestConfig); + var aspNetTask = _httpForwarder.SendAsync(httpContext, httpUri.ToString(), _messageInvoker, _forwarderRequestConfig); return aspNetTask; } diff --git a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs index a587b5c744..40e11e7ebc 100644 --- a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs +++ b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Script.Description; using Yarp.ReverseProxy.Forwarder; @@ -9,6 +10,6 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc { public interface IHttpProxyService { - public ValueTask Forward(ScriptInvocationContext context); + public ValueTask Forward(ScriptInvocationContext context, Uri httpUri); } } \ No newline at end of file diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index 8809c81404..facaa8c964 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -55,8 +55,7 @@ public async Task Invoke(HttpContext context) int nestedProxiesCount = GetNestedProxiesCount(context, functionExecution); IActionResult result = await GetResultAsync(context, functionExecution); - // TODO: Create constant value for this var? - if (context.Items.TryGetValue("IsHttpProxying", out var value)) + if (context.Items.TryGetValue(ScriptConstants.IsHttpProxyingEnabled, out var value)) { if (value.ToString() == bool.TrueString) { diff --git a/src/WebJobs.Script/ScriptConstants.cs b/src/WebJobs.Script/ScriptConstants.cs index f0073a29e6..1598e95003 100644 --- a/src/WebJobs.Script/ScriptConstants.cs +++ b/src/WebJobs.Script/ScriptConstants.cs @@ -225,5 +225,9 @@ public static class ScriptConstants public static readonly string LiveLogsSessionAIKey = "#AzFuncLiveLogsSessionId"; public static readonly string FunctionsHostingConfigSectionName = "FunctionsHostingConfig"; + + // HTTP Proxying constants + public static readonly string IsHttpProxyingEnabled = "IsHttpProxying"; + public static readonly string HttpProxyCorrelationHeader = "x-invocation-id"; } } diff --git a/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs b/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs index 8f938bc216..ec7894afdc 100644 --- a/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs +++ b/src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs @@ -54,7 +54,7 @@ public static class RpcWorkerConstants public const string HandlesInvocationCancelMessage = "HandlesInvocationCancelMessage"; public const string HandlesWorkerWarmupMessage = "HandlesWorkerWarmupMessage"; public const string WorkerApplicationInsightsLoggingEnabled = nameof(WorkerApplicationInsightsLoggingEnabled); - public const string EnableHttpProxying = "EnableHttpProxying"; + public const string HttpUri = "HttpUri"; /// /// Indicates whether empty entries in the trigger message should be included when sending RpcInvocation data to OOP workers. From b2dbedbcd6cd23a50b5a544567247d5ec271a392 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Fri, 31 Mar 2023 13:36:12 -0700 Subject: [PATCH 14/24] cleanup --- src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs | 7 ++----- .../Middleware/FunctionInvocationMiddleware.cs | 1 - src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 6a5907d581..f3c405d52f 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -8,9 +8,7 @@ using System.Diagnostics; using System.IO; using System.Linq; -using System.Net.Http; using System.Reactive.Linq; -using System.Reflection.Metadata; using System.Text; using System.Threading; using System.Threading.Channels; @@ -18,7 +16,6 @@ using System.Threading.Tasks.Dataflow; using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; -using Microsoft.AspNetCore.Http; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Description; @@ -926,14 +923,14 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) { if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled)) { - // TODO: We need to check if the Asp.Net pipeline completed successfully or not if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) { ForwarderError httpProxyTaskResult = await httpProxyTask; if (httpProxyTaskResult is not ForwarderError.None) { - // how do we handle this error if Functions pipeline suceeded and this failed? + // TODO: Understand scenarios where function invocation succeeds but there is an error proxying + // need to investigate different ForwarderErrors and consider how they will be relayed through other services and to users } } diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index facaa8c964..2e11924c53 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -72,7 +72,6 @@ public async Task Invoke(HttpContext context) } ActionContext actionContext = new ActionContext(context, context.GetRouteData(), new ActionDescriptor()); - await result.ExecuteResultAsync(actionContext); } } diff --git a/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs b/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs index 75920dcc1c..efb069361d 100644 --- a/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs +++ b/src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs @@ -203,7 +203,7 @@ internal static string ParseLanguage(string scriptFilePath, IEnumerable - /// Determines which script should be considered the "primary" entry point script. Returns null if Primary script file cannot be determined. + /// Determines which script should be considered the "primary" entry point script. Returns null if Primary script file cannot be determined /// internal static string DeterminePrimaryScriptFile(string scriptFile, string scriptDirectory, IFileSystem fileSystem = null) { From 2e094eedf9f81a95e0ed3f75207574d3b9c57ff0 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Fri, 31 Mar 2023 13:44:21 -0700 Subject: [PATCH 15/24] add service provider to grpcworkerchannel tests --- .../Middleware/FunctionInvocationMiddleware.cs | 1 - .../ApplicationInsightsTestFixture.cs | 12 ++++++------ .../Workers/Rpc/GrpcWorkerChannelTests.cs | 16 +++++++++++----- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index 2e11924c53..b5688011f0 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -42,7 +42,6 @@ public async Task Invoke(HttpContext context) } var functionExecution = context.Features.Get(); - if (functionExecution != null && !context.Response.HasStarted) { // LiveLogs session id is used to show only contextual logs in the "Code + Test" experience. The id is included in the custom dimension. diff --git a/test/WebJobs.Script.Tests.Integration/ApplicationInsights/ApplicationInsightsTestFixture.cs b/test/WebJobs.Script.Tests.Integration/ApplicationInsights/ApplicationInsightsTestFixture.cs index ffb67c3c98..1130cc9016 100644 --- a/test/WebJobs.Script.Tests.Integration/ApplicationInsights/ApplicationInsightsTestFixture.cs +++ b/test/WebJobs.Script.Tests.Integration/ApplicationInsights/ApplicationInsightsTestFixture.cs @@ -97,23 +97,23 @@ public void Dispose() private class TestGrpcWorkerChannelFactory : GrpcWorkerChannelFactory { - public TestGrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions) - : base(eventManager, environment, loggerFactory, applicationHostOptions, rpcWorkerProcessManager, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions) + public TestGrpcWorkerChannelFactory(IScriptEventManager eventManager, IEnvironment environment, ILoggerFactory loggerFactory, IOptionsMonitor applicationHostOptions, IRpcWorkerProcessFactory rpcWorkerProcessManager, ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpProxyService httpProxyService) + : base(eventManager, environment, loggerFactory, applicationHostOptions, rpcWorkerProcessManager, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService) { } internal override IRpcWorkerChannel CreateInternal(string workerId, IScriptEventManager eventManager, RpcWorkerConfig languageWorkerConfig, IWorkerProcess rpcWorkerProcess, ILogger workerLogger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor applicationHostOptions, - ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions) + ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpProxyService httpProxyService) { return new TestGrpcWorkerChannel(workerId, eventManager, languageWorkerConfig, rpcWorkerProcess, workerLogger, metricsLogger, - attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions); + attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService); } private class TestGrpcWorkerChannel : GrpcWorkerChannel { - internal TestGrpcWorkerChannel(string workerId, IScriptEventManager eventManager, RpcWorkerConfig workerConfig, IWorkerProcess rpcWorkerProcess, ILogger logger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor applicationHostOptions, ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions) - : base(workerId, eventManager, workerConfig, rpcWorkerProcess, logger, metricsLogger, attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions) + internal TestGrpcWorkerChannel(string workerId, IScriptEventManager eventManager, RpcWorkerConfig workerConfig, IWorkerProcess rpcWorkerProcess, ILogger logger, IMetricsLogger metricsLogger, int attemptCount, IEnvironment environment, IOptionsMonitor applicationHostOptions, ISharedMemoryManager sharedMemoryManager, IOptions workerConcurrencyOptions, IOptions hostingConfigOptions, IHttpProxyService httpProxyService) + : base(workerId, eventManager, workerConfig, rpcWorkerProcess, logger, metricsLogger, attemptCount, environment, applicationHostOptions, sharedMemoryManager, workerConcurrencyOptions, hostingConfigOptions, httpProxyService) { } diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs b/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs index 0a7ca47787..d68bbba65b 100644 --- a/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs +++ b/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs @@ -57,6 +57,7 @@ public class GrpcWorkerChannelTests : IDisposable private readonly IOptions _workerConcurrencyOptions; private readonly ITestOutputHelper _testOutput; private readonly IOptions _hostingConfigOptions; + private readonly IHttpProxyService _httpProxyService; private GrpcWorkerChannel _workerChannel; public GrpcWorkerChannelTests(ITestOutputHelper testOutput) @@ -118,7 +119,8 @@ private Task CreateDefaultWorkerChannel(bool autoStart = true, IDictionary(async () => await _workerChannel.StartWorkerProcessAsync(CancellationToken.None)); } @@ -508,7 +511,8 @@ public async Task Drain_Verify() _hostOptionsMonitor, _sharedMemoryManager, _workerConcurrencyOptions, - _hostingConfigOptions); + _hostingConfigOptions, + _httpProxyService); channel.SetupFunctionInvocationBuffers(GetTestFunctionsList("node")); ScriptInvocationContext scriptInvocationContext = GetTestScriptInvocationContext(invocationId, resultSource); await channel.SendInvocationRequest(scriptInvocationContext); @@ -1170,7 +1174,8 @@ public async Task GetLatencies_StartsTimer_WhenDynamicConcurrencyEnabled() _hostOptionsMonitor, _sharedMemoryManager, _workerConcurrencyOptions, - _hostingConfigOptions); + _hostingConfigOptions, + _httpProxyService); IEnumerable latencyHistory = null; @@ -1209,7 +1214,8 @@ public async Task GetLatencies_DoesNot_StartTimer_WhenDynamicConcurrencyDisabled _hostOptionsMonitor, _sharedMemoryManager, _workerConcurrencyOptions, - _hostingConfigOptions); + _hostingConfigOptions, + _httpProxyService); // wait 10 seconds await Task.Delay(10000); From 43bd487b1fa0824f201ca106ca2e8587723941ce Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Fri, 31 Mar 2023 14:03:28 -0700 Subject: [PATCH 16/24] mock proxy service --- .../WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs b/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs index d68bbba65b..8cb06f5704 100644 --- a/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs +++ b/test/WebJobs.Script.Tests/Workers/Rpc/GrpcWorkerChannelTests.cs @@ -57,6 +57,7 @@ public class GrpcWorkerChannelTests : IDisposable private readonly IOptions _workerConcurrencyOptions; private readonly ITestOutputHelper _testOutput; private readonly IOptions _hostingConfigOptions; + private readonly Mock _mockHttpProxyService = new Mock(); private readonly IHttpProxyService _httpProxyService; private GrpcWorkerChannel _workerChannel; @@ -103,6 +104,8 @@ public GrpcWorkerChannelTests(ITestOutputHelper testOutput) _testEnvironment.SetEnvironmentVariable("APPLICATIONINSIGHTS_ENABLE_AGENT", "true"); _hostingConfigOptions = Options.Create(new FunctionsHostingConfigOptions()); + + _httpProxyService = _mockHttpProxyService.Object; } private Task CreateDefaultWorkerChannel(bool autoStart = true, IDictionary capabilities = null) From 06cb2adb52b12de19096f0156ec2da8f2969c56d Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 3 Apr 2023 10:27:57 -0700 Subject: [PATCH 17/24] add new pkgs to deps test --- ...oft.Azure.WebJobs.Script.WebHost.deps.json | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/test/WebJobs.Script.Tests/Microsoft.Azure.WebJobs.Script.WebHost.deps.json b/test/WebJobs.Script.Tests/Microsoft.Azure.WebJobs.Script.WebHost.deps.json index 9904de41c4..57de69cf1f 100644 --- a/test/WebJobs.Script.Tests/Microsoft.Azure.WebJobs.Script.WebHost.deps.json +++ b/test/WebJobs.Script.Tests/Microsoft.Azure.WebJobs.Script.WebHost.deps.json @@ -118,7 +118,7 @@ "Azure.Storage.Common/12.12.0": { "dependencies": { "Azure.Core": "1.25.0", - "System.IO.Hashing": "6.0.0" + "System.IO.Hashing": "7.0.0" }, "runtime": { "lib/netstandard2.0/Azure.Storage.Common.dll": { @@ -2190,11 +2190,11 @@ "System.Runtime": "4.3.1" } }, - "System.IO.Hashing/6.0.0": { + "System.IO.Hashing/7.0.0": { "runtime": { "lib/net6.0/System.IO.Hashing.dll": { - "assemblyVersion": "6.0.0.0", - "fileVersion": "6.0.21.52210" + "assemblyVersion": "7.0.0.0", + "fileVersion": "7.0.22.51805" } } }, @@ -2905,6 +2905,17 @@ "System.Xml.XmlDocument": "4.3.0" } }, + "Yarp.ReverseProxy/2.0.0": { + "dependencies": { + "System.IO.Hashing": "7.0.0" + }, + "runtime": { + "lib/net6.0/Yarp.ReverseProxy.dll": { + "assemblyVersion": "2.0.0.0", + "fileVersion": "2.0.23.11306" + } + } + }, "Microsoft.Azure.WebJobs.Script/4.17.0": { "dependencies": { "Azure.Core": "1.25.0", @@ -2956,7 +2967,8 @@ "Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel": "2.21.0", "Microsoft.Azure.WebJobs.Script": "4.17.0", "System.IO.FileSystem.Primitives": "4.3.0", - "System.Threading.Channels": "6.0.0" + "System.Threading.Channels": "6.0.0", + "Yarp.ReverseProxy": "2.0.0" }, "runtime": { "Microsoft.Azure.WebJobs.Script.Grpc.dll": {} @@ -4484,12 +4496,12 @@ "path": "system.io.filesystem.primitives/4.3.0", "hashPath": "system.io.filesystem.primitives.4.3.0.nupkg.sha512" }, - "System.IO.Hashing/6.0.0": { + "System.IO.Hashing/7.0.0": { "type": "package", "serviceable": true, - "sha512": "sha512-Rfm2jYCaUeGysFEZjDe7j1R4x6Z6BzumS/vUT5a1AA/AWJuGX71PoGB0RmpyX3VmrGqVnAwtfMn39OHR8Y/5+g==", - "path": "system.io.hashing/6.0.0", - "hashPath": "system.io.hashing.6.0.0.nupkg.sha512" + "sha512": "sha512-sDnWM0N3AMCa86LrKTWeF3BZLD2sgWyYUc7HL6z4+xyDZNQRwzmxbo4qP2rX2MqC+Sy1/gOSRDah5ltxY5jPxw==", + "path": "system.io.hashing/7.0.0", + "hashPath": "system.io.hashing.7.0.0.nupkg.sha512" }, "System.Linq/4.3.0": { "type": "package", @@ -4981,6 +4993,13 @@ "path": "system.xml.xmlserializer/4.3.0", "hashPath": "system.xml.xmlserializer.4.3.0.nupkg.sha512" }, + "Yarp.ReverseProxy/2.0.0": { + "type": "package", + "serviceable": true, + "sha512": "sha512-9yMPNkJRPn/hHssKKf7eucF5o1f0nIrQ9QlSybR2RPdO4+Ws3aDd8qAg40YFZMbZKIIqtE3u4fCkXO8vNwm0yQ==", + "path": "yarp.reverseproxy/2.0.0", + "hashPath": "yarp.reverseproxy.2.0.0.nupkg.sha512" + }, "Microsoft.Azure.WebJobs.Script/4.17.0": { "type": "project", "serviceable": false, From 24cfbe702657d3eebc5ae23b035e830370833020 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 3 Apr 2023 16:39:06 -0700 Subject: [PATCH 18/24] add feature flag --- .../Channel/GrpcWorkerChannel.cs | 13 ++++++++++--- src/WebJobs.Script/ScriptConstants.cs | 1 + 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index f3c405d52f..fa8ec3899b 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -428,10 +428,17 @@ internal void WorkerInitResponse(GrpcEvent initEvent) // If http proxying is enabled, we need to get the proxying endpoint of this worker var httpUri = _workerCapabilities.GetCapabilityState(RpcWorkerConstants.HttpUri); - if (!string.IsNullOrEmpty(httpUri)) + if (!string.IsNullOrEmpty(httpUri) && FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying)) { - _httpProxyEndpoint = new Uri(httpUri); - _isHttpProxyingWorker = true; + try + { + _httpProxyEndpoint = new Uri(httpUri); + _isHttpProxyingWorker = true; + } + catch (Exception ex) + { + HandleWorkerInitError(ex); + } } _workerInitTask.TrySetResult(true); diff --git a/src/WebJobs.Script/ScriptConstants.cs b/src/WebJobs.Script/ScriptConstants.cs index 1598e95003..2147f17192 100644 --- a/src/WebJobs.Script/ScriptConstants.cs +++ b/src/WebJobs.Script/ScriptConstants.cs @@ -125,6 +125,7 @@ public static class ScriptConstants public const string FeatureFlagEnableWorkerIndexing = "EnableWorkerIndexing"; public const string FeatureFlagEnableDebugTracing = "EnableDebugTracing"; public const string FeatureFlagEnableProxies = "EnableProxies"; + public const string FeatureFlagEnableHttpProxying = "EnableHttpProxying"; public const string HostingConfigDisableLinuxAppServiceDetailedExecutionEvents = "DisableLinuxExecutionDetails"; public const string HostingConfigDisableLinuxAppServiceExecutionEventLogBackoff = "DisableLinuxLogBackoff"; From 2360393ff8e4ea5fe789c8c4ebb352883840ddbe Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 3 Apr 2023 20:08:01 -0700 Subject: [PATCH 19/24] resolve comments --- src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs | 4 ++-- src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs | 4 ++-- .../Middleware/FunctionInvocationMiddleware.cs | 4 ++-- src/WebJobs.Script/ScriptConstants.cs | 3 ++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index fa8ec3899b..06460f7b36 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -764,7 +764,7 @@ await SendStreamingMessageAsync(new StreamingMessage { var aspNetTask = _httpProxyService.Forward(context, _httpProxyEndpoint); - context.Properties.Add("HttpProxyingTask", aspNetTask); + context.Properties.Add(ScriptConstants.HttpProxyTask, aspNetTask); } } catch (Exception invokeEx) @@ -930,7 +930,7 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) { if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled)) { - if (context.Properties.TryGetValue("HttpProxyingTask", out ValueTask httpProxyTask)) + if (context.Properties.TryGetValue(ScriptConstants.HttpProxyTask, out ValueTask httpProxyTask)) { ForwarderError httpProxyTaskResult = await httpProxyTask; diff --git a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs index 3369902567..2f453cf929 100644 --- a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs +++ b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs @@ -42,7 +42,7 @@ public ValueTask Forward(ScriptInvocationContext context, Uri ht throw new ArgumentNullException(nameof(context)); } - if (context.Inputs is null) + if (context.Inputs is null || context.Inputs?.Count() == 0) { throw new InvalidOperationException($"The function {context.FunctionMetadata.Name} can not be evaluated since it has no inputs."); } @@ -56,7 +56,7 @@ public ValueTask Forward(ScriptInvocationContext context, Uri ht HttpContext httpContext = httpRequest.HttpContext; - httpContext.Items.Add(ScriptConstants.IsHttpProxyingEnabled, bool.TrueString); + httpContext.Items.Add(ScriptConstants.HttpProxyingEnabled, bool.TrueString); // add invocation id as correlation id httpRequest.Headers.TryAdd(ScriptConstants.HttpProxyCorrelationHeader, context.ExecutionContext.InvocationId.ToString()); diff --git a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs index b5688011f0..a3d35292c8 100644 --- a/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs +++ b/src/WebJobs.Script.WebHost/Middleware/FunctionInvocationMiddleware.cs @@ -54,9 +54,9 @@ public async Task Invoke(HttpContext context) int nestedProxiesCount = GetNestedProxiesCount(context, functionExecution); IActionResult result = await GetResultAsync(context, functionExecution); - if (context.Items.TryGetValue(ScriptConstants.IsHttpProxyingEnabled, out var value)) + if (context.Items.TryGetValue(ScriptConstants.HttpProxyingEnabled, out var value)) { - if (value.ToString() == bool.TrueString) + if (value?.ToString() == bool.TrueString) { return; } diff --git a/src/WebJobs.Script/ScriptConstants.cs b/src/WebJobs.Script/ScriptConstants.cs index 2147f17192..967c4202b8 100644 --- a/src/WebJobs.Script/ScriptConstants.cs +++ b/src/WebJobs.Script/ScriptConstants.cs @@ -228,7 +228,8 @@ public static class ScriptConstants public static readonly string FunctionsHostingConfigSectionName = "FunctionsHostingConfig"; // HTTP Proxying constants - public static readonly string IsHttpProxyingEnabled = "IsHttpProxying"; + public static readonly string HttpProxyingEnabled = "HttpProxyingEnabled"; public static readonly string HttpProxyCorrelationHeader = "x-invocation-id"; + public static readonly string HttpProxyTask = "HttpProxyTask"; } } From bfe5633557976a8261d9bea25dc3851781629084 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 3 Apr 2023 20:33:12 -0700 Subject: [PATCH 20/24] adding services behind feature flag --- .../GrpcServiceCollectionsExtensions.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs index 19a8578263..34a5163728 100644 --- a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs +++ b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; using Microsoft.Azure.WebJobs.Script.Workers.Rpc; using Microsoft.Extensions.DependencyInjection; @@ -16,8 +17,11 @@ public static IServiceCollection AddScriptGrpc(this IServiceCollection services) services.AddSingleton(); - services.AddHttpForwarder(); - services.AddSingleton(); + if (FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying)) + { + services.AddHttpForwarder(); + services.AddSingleton(); + } return services; } From 697880f66acca39facdeecdf31b291edf624bc43 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 3 Apr 2023 20:35:04 -0700 Subject: [PATCH 21/24] revert adding services behind feature flag --- .../GrpcServiceCollectionsExtensions.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs index 34a5163728..acad583083 100644 --- a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs +++ b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs @@ -17,11 +17,8 @@ public static IServiceCollection AddScriptGrpc(this IServiceCollection services) services.AddSingleton(); - if (FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying)) - { - services.AddHttpForwarder(); - services.AddSingleton(); - } + services.AddHttpForwarder(); + services.AddSingleton(); return services; } From 1a1d5fe524cc3ddeb5b92b26aa777a28edf9aa66 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Mon, 3 Apr 2023 20:37:11 -0700 Subject: [PATCH 22/24] cleanup --- src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs index acad583083..19a8578263 100644 --- a/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs +++ b/src/WebJobs.Script.Grpc/GrpcServiceCollectionsExtensions.cs @@ -1,7 +1,6 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; using Microsoft.Azure.WebJobs.Script.Workers.Rpc; using Microsoft.Extensions.DependencyInjection; From c9988cdc9a01190add369d23b458e61d3e7bda3b Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 4 Apr 2023 09:51:58 -0700 Subject: [PATCH 23/24] resolve comments --- .../Channel/GrpcWorkerChannel.cs | 9 ++++----- .../Server/DefaultHttpProxyService.cs | 17 +++++++---------- .../Server/IHttpProxyService.cs | 2 +- src/WebJobs.Script/ScriptConstants.cs | 2 +- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 06460f7b36..4d89ab514b 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -84,8 +84,6 @@ internal class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable private bool _isWorkerApplicationInsightsLoggingEnabled; private IHttpProxyService _httpProxyService; private Uri _httpProxyEndpoint; - private bool _isHttpProxyingWorker = false; - private System.Timers.Timer _timer; internal GrpcWorkerChannel( @@ -139,6 +137,8 @@ internal GrpcWorkerChannel( _state = RpcWorkerChannelState.Default; } + private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null; + public string Id => _workerId; public IDictionary> FunctionInputBuffers => _functionInputBuffers; @@ -433,7 +433,6 @@ internal void WorkerInitResponse(GrpcEvent initEvent) try { _httpProxyEndpoint = new Uri(httpUri); - _isHttpProxyingWorker = true; } catch (Exception ex) { @@ -760,9 +759,9 @@ await SendStreamingMessageAsync(new StreamingMessage context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } - if (_isHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) + if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) { - var aspNetTask = _httpProxyService.Forward(context, _httpProxyEndpoint); + var aspNetTask = _httpProxyService.ForwardAsync(context, _httpProxyEndpoint); context.Properties.Add(ScriptConstants.HttpProxyTask, aspNetTask); } diff --git a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs index 2f453cf929..500fd8a69c 100644 --- a/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs +++ b/src/WebJobs.Script.Grpc/Server/DefaultHttpProxyService.cs @@ -15,14 +15,14 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc { internal class DefaultHttpProxyService : IHttpProxyService, IDisposable { - private SocketsHttpHandler _handler; - private IHttpForwarder _httpForwarder; - private HttpMessageInvoker _messageInvoker; - private ForwarderRequestConfig _forwarderRequestConfig; + private readonly SocketsHttpHandler _handler; + private readonly IHttpForwarder _httpForwarder; + private readonly HttpMessageInvoker _messageInvoker; + private readonly ForwarderRequestConfig _forwarderRequestConfig; public DefaultHttpProxyService(IHttpForwarder httpForwarder) { - _httpForwarder = httpForwarder; + _httpForwarder = httpForwarder ?? throw new ArgumentNullException(nameof(httpForwarder)); _handler = new SocketsHttpHandler(); _messageInvoker = new HttpMessageInvoker(_handler); @@ -35,12 +35,9 @@ public void Dispose() _messageInvoker?.Dispose(); } - public ValueTask Forward(ScriptInvocationContext context, Uri httpUri) + public ValueTask ForwardAsync(ScriptInvocationContext context, Uri httpUri) { - if (context is null) - { - throw new ArgumentNullException(nameof(context)); - } + ArgumentNullException.ThrowIfNull(context); if (context.Inputs is null || context.Inputs?.Count() == 0) { diff --git a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs index 40e11e7ebc..88153dce58 100644 --- a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs +++ b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs @@ -10,6 +10,6 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc { public interface IHttpProxyService { - public ValueTask Forward(ScriptInvocationContext context, Uri httpUri); + public ValueTask ForwardAsync(ScriptInvocationContext context, Uri httpUri); } } \ No newline at end of file diff --git a/src/WebJobs.Script/ScriptConstants.cs b/src/WebJobs.Script/ScriptConstants.cs index 967c4202b8..547af39a54 100644 --- a/src/WebJobs.Script/ScriptConstants.cs +++ b/src/WebJobs.Script/ScriptConstants.cs @@ -229,7 +229,7 @@ public static class ScriptConstants // HTTP Proxying constants public static readonly string HttpProxyingEnabled = "HttpProxyingEnabled"; - public static readonly string HttpProxyCorrelationHeader = "x-invocation-id"; + public static readonly string HttpProxyCorrelationHeader = "x-ms-invocation-id"; public static readonly string HttpProxyTask = "HttpProxyTask"; } } From f22dfca0359203d950fe7d6b50f793635de1fd21 Mon Sep 17 00:00:00 2001 From: Sarah Vu Date: Tue, 4 Apr 2023 12:43:34 -0700 Subject: [PATCH 24/24] resolve comments, validated e2e --- .../Channel/GrpcWorkerChannel.cs | 19 +++++++++++-------- .../Server/IHttpProxyService.cs | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs index 4d89ab514b..a30ba6b9de 100644 --- a/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs +++ b/src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs @@ -759,9 +759,9 @@ await SendStreamingMessageAsync(new StreamingMessage context.CancellationToken.Register(() => SendInvocationCancel(invocationRequest.InvocationId)); } - if (IsHttpProxyingWorker && context.FunctionMetadata.IsHttpTriggerFunction()) + if (IsHttpProxyingWorker && FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) && context.FunctionMetadata.IsHttpTriggerFunction()) { - var aspNetTask = _httpProxyService.ForwardAsync(context, _httpProxyEndpoint); + var aspNetTask = _httpProxyService.ForwardAsync(context, _httpProxyEndpoint).AsTask(); context.Properties.Add(ScriptConstants.HttpProxyTask, aspNetTask); } @@ -929,14 +929,17 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse) { if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled)) { - if (context.Properties.TryGetValue(ScriptConstants.HttpProxyTask, out ValueTask httpProxyTask)) + if (FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableHttpProxying) && IsHttpProxyingWorker) { - ForwarderError httpProxyTaskResult = await httpProxyTask; - - if (httpProxyTaskResult is not ForwarderError.None) + if (context.Properties.TryGetValue(ScriptConstants.HttpProxyTask, out Task httpProxyTask)) { - // TODO: Understand scenarios where function invocation succeeds but there is an error proxying - // need to investigate different ForwarderErrors and consider how they will be relayed through other services and to users + ForwarderError httpProxyTaskResult = await httpProxyTask; + + if (httpProxyTaskResult is not ForwarderError.None) + { + // TODO: Understand scenarios where function invocation succeeds but there is an error proxying + // need to investigate different ForwarderErrors and consider how they will be relayed through other services and to users + } } } diff --git a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs index 88153dce58..241bd6f284 100644 --- a/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs +++ b/src/WebJobs.Script.Grpc/Server/IHttpProxyService.cs @@ -10,6 +10,6 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc { public interface IHttpProxyService { - public ValueTask ForwardAsync(ScriptInvocationContext context, Uri httpUri); + ValueTask ForwardAsync(ScriptInvocationContext context, Uri httpUri); } } \ No newline at end of file