Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions protobuf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ From within the Azure Functions language worker repo:
- `git commit -m "Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: <tag-name>. Commit: <commit hash>"`
- `git push`

## Releasing a Language Worker Protobuf version

1. Draft a release in the GitHub UI
- Be sure to include details of the release
2. Create a release version, following semantic versioning guidelines ([semver.org](https://semver.org/))
3. Tag the version with the pattern: `v<M>.<m>.<p>-protofile` (example: `v1.1.0-protofile`)
4. Merge `dev` to `main`
5. Run the release you'd created

## Consuming FunctionRPC.proto
*Note: Update versionNumber before running following commands*

Expand Down
41 changes: 38 additions & 3 deletions protobuf/src/proto/FunctionRpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ message StreamingMessage {
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;

// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;

// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
Expand Down Expand Up @@ -117,13 +120,35 @@ message WorkerInitRequest {

// Worker responds with the result of initializing itself
message WorkerInitResponse {
// Version of worker
// NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;

// A map of worker supported features/capabilities
map<string, string> capabilities = 2;

// Status of the response
StatusResult result = 3;

// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 4;
}

message WorkerMetadata {
// The runtime/stack name
string runtime_name = 1;

// The version of the runtime/stack
string runtime_version = 2;

// The version of the worker
string worker_version = 3;

// The worker bitness/architecture
string worker_bitness = 4;

// Optional additional custom properties
map<string, string> custom_properties = 5;
}

// Used by the host to determine success/failure/cancellation
Expand All @@ -134,6 +159,7 @@ message StatusResult {
Success = 1;
Cancelled = 2;
}

// Status for the given result
Status status = 4;

Expand All @@ -147,6 +173,10 @@ message StatusResult {
repeated RpcLog logs = 3;
}

// NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
Expand Down Expand Up @@ -291,6 +321,11 @@ message RpcFunctionMetadata {

// A flag indicating if managed dependency is enabled or not
bool managed_dependency_enabled = 14;

// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
map<string,string> Properties = 16;
}

// Host tells worker it is ready to receive metadata
Expand Down Expand Up @@ -549,11 +584,11 @@ message RpcException {

// Worker specifies whether exception is a user exception,
// for purpose of application insights logging. Defaults to false.
optional bool is_user_exception = 4;
bool is_user_exception = 4;

// Type of exception. If it's a user exception, the type is passed along to app insights.
// Otherwise, it's ignored for now.
optional string type = 5;
string type = 5;
}

// Http cookie type. Note that only name and value are used for Http requests
Expand Down
44 changes: 38 additions & 6 deletions src/RequestProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ namespace Microsoft.Azure.Functions.PowerShellWorker
{
using System.Diagnostics;
using LogLevel = Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types.Level;
using System.Runtime.InteropServices;

internal class RequestProcessor
{
private readonly MessagingStream _msgStream;
private readonly System.Management.Automation.PowerShell _firstPwshInstance;
private readonly PowerShellManagerPool _powershellPool;
private DependencyManager _dependencyManager;
private string _pwshVersion;

// Holds the exception if an issue is encountered while processing the function app dependencies.
private Exception _initTerminatingError;
Expand All @@ -37,11 +39,12 @@ internal class RequestProcessor
private Dictionary<StreamingMessage.ContentOneofCase, Func<StreamingMessage, StreamingMessage>> _requestHandlers =
new Dictionary<StreamingMessage.ContentOneofCase, Func<StreamingMessage, StreamingMessage>>();

internal RequestProcessor(MessagingStream msgStream, System.Management.Automation.PowerShell firstPwshInstance)
internal RequestProcessor(MessagingStream msgStream, System.Management.Automation.PowerShell firstPwshInstance, string pwshVersion)
{
_msgStream = msgStream;
_firstPwshInstance = firstPwshInstance;
_powershellPool = new PowerShellManagerPool(() => new RpcLogger(msgStream));
_pwshVersion = pwshVersion;

// Host sends capabilities/init data to worker
_requestHandlers.Add(StreamingMessage.ContentOneofCase.WorkerInitRequest, ProcessWorkerInitRequest);
Expand Down Expand Up @@ -95,6 +98,9 @@ internal async Task ProcessRequestLoop()

internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request)
{
var stopwatch = new Stopwatch();
stopwatch.Start();

var workerInitRequest = request.WorkerInitRequest;
Environment.SetEnvironmentVariable("AZUREPS_HOST_ENVIRONMENT", $"AzureFunctions/{workerInitRequest.HostVersion}");
Environment.SetEnvironmentVariable("POWERSHELL_DISTRIBUTION_CHANNEL", $"Azure-Functions:{workerInitRequest.HostVersion}");
Expand All @@ -117,6 +123,22 @@ internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request)
RemoteSessionNamedPipeServer.CreateCustomNamedPipeServer(pipeName);
}

try
{
var rpcLogger = new RpcLogger(_msgStream);
rpcLogger.SetContext(request.RequestId, null);

response.WorkerInitResponse.WorkerMetadata = GetWorkerMetadata(_pwshVersion);

rpcLogger.Log(isUserOnlyLog: false, LogLevel.Trace, string.Format(PowerShellWorkerStrings.WorkerInitCompleted, stopwatch.ElapsedMilliseconds));
}
catch (Exception e)
{
status.Status = StatusResult.Types.Status.Failure;
status.Exception = e.ToRpcException();
return response;
}

return response;
}

Expand Down Expand Up @@ -180,11 +202,10 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request)
return response;
}

// Ideally, the initialization should happen when processing 'WorkerInitRequest', however, the 'WorkerInitRequest'
// message doesn't provide information about the FunctionApp. That information is not available until the first
// 'FunctionLoadRequest' comes in. Therefore, we run initialization here.
// Also, we receive a FunctionLoadRequest when a proxy is configured. Proxies don't have the Metadata.Directory set
// which would cause initialization issues with the PSModulePath. Since they don't have that set, we skip over them.
// Ideally, the initialization should happen when processing 'WorkerInitRequest'. However, we defer the initialization
// until the first 'FunctionLoadRequest' which contains the information about whether Managed Dependencies is enabled for the function app,
// and if it is, we add the Managed Dependencies path to the PSModulePath.
// Also, we receive a FunctionLoadRequest when a proxy is configured. This is just a no-op on the worker size, so we skip over them.
if (!_isFunctionAppInitialized && !functionLoadRequest.Metadata.IsProxy)
{
try
Expand Down Expand Up @@ -519,6 +540,17 @@ private void SetupAppRootPathAndModulePath(FunctionLoadRequest functionLoadReque
.InvokeAndClearCommands();
}

private WorkerMetadata GetWorkerMetadata(string pwshVersion)
{
var data = new WorkerMetadata();
data.WorkerBitness = RuntimeInformation.OSArchitecture.ToString();
data.WorkerVersion = typeof(Worker).Assembly.GetName().Version.ToString();
data.RuntimeVersion = pwshVersion;
data.RuntimeName = "powershell";

return data;
}

#endregion
}
}
103 changes: 86 additions & 17 deletions src/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,70 @@ public async static Task Main(string[] args)
LogLevel.Information,
string.Format(PowerShellWorkerStrings.PowerShellWorkerVersion, typeof(Worker).Assembly.GetName().Version));

WorkerArguments arguments = null;
Parser.Default.ParseArguments<WorkerArguments>(args)
.WithParsed(ops => arguments = ops)
.WithNotParsed(err => Environment.Exit(1));
var workerOptions = new WorkerOptions();

var parser = new Parser(settings =>
{
settings.EnableDashDash = true;
settings.IgnoreUnknownArguments = true;
});
parser.ParseArguments<WorkerArguments>(args)
.WithParsed(workerArgs =>
{
// TODO: Remove parsing old command-line arguments that are not prefixed with functions-<argumentname>
// for more information, see https://github.com/Azure/azure-functions-powershell-worker/issues/995
workerOptions.WorkerId = workerArgs.FunctionsWorkerId ?? workerArgs.WorkerId;
workerOptions.RequestId = workerArgs.FunctionsRequestId ?? workerArgs.RequestId;

if (!string.IsNullOrWhiteSpace(workerArgs.FunctionsUri))
{
try
{
// TODO: Update WorkerOptions to have a URI property instead of host name and port number
// for more information, see https://github.com/Azure/azure-functions-powershell-worker/issues/994
var uri = new Uri(workerArgs.FunctionsUri);
workerOptions.Host = uri.Host;
workerOptions.Port = uri.Port;
}
catch (UriFormatException formatEx)
{
var message = $"Invalid URI format: {workerArgs.FunctionsUri}. Error message: {formatEx.Message}";
throw new ArgumentException(message, nameof(workerArgs.FunctionsUri));
}
}
else
{
workerOptions.Host = workerArgs.Host;
workerOptions.Port = workerArgs.Port;
}

// Validate workerOptions
ValidateProperty("WorkerId", workerOptions.WorkerId);
ValidateProperty("RequestId", workerOptions.RequestId);
ValidateProperty("Host", workerOptions.Host);

if (workerOptions.Port <= 0)
{
throw new ArgumentException("Port number has not been initialized", nameof(workerOptions.Port));
}
});

// Create the very first Runspace so the debugger has the target to attach to.
// This PowerShell instance is shared by the first PowerShellManager instance created in the pool,
// and the dependency manager (used to download dependent modules if needed).
var firstPowerShellInstance = Utils.NewPwshInstance();
LogPowerShellVersion(firstPowerShellInstance);
var pwshVersion = Utils.GetPowerShellVersion(firstPowerShellInstance);
LogPowerShellVersion(pwshVersion);
WarmUpPowerShell(firstPowerShellInstance);

var msgStream = new MessagingStream(arguments.Host, arguments.Port);
var requestProcessor = new RequestProcessor(msgStream, firstPowerShellInstance);
var msgStream = new MessagingStream(workerOptions.Host, workerOptions.Port);
var requestProcessor = new RequestProcessor(msgStream, firstPowerShellInstance, pwshVersion);

// Send StartStream message
var startedMessage = new StreamingMessage()
{
RequestId = arguments.RequestId,
StartStream = new StartStream() { WorkerId = arguments.WorkerId }
RequestId = workerOptions.RequestId,
StartStream = new StartStream() { WorkerId = workerOptions.WorkerId }
};

msgStream.Write(startedMessage);
Expand All @@ -75,28 +119,53 @@ private static void WarmUpPowerShell(System.Management.Automation.PowerShell fir
.InvokeAndClearCommands();
}

private static void LogPowerShellVersion(System.Management.Automation.PowerShell pwsh)
private static void LogPowerShellVersion(string pwshVersion)
{
var message = string.Format(PowerShellWorkerStrings.PowerShellVersion, Utils.GetPowerShellVersion(pwsh));
var message = string.Format(PowerShellWorkerStrings.PowerShellVersion, pwshVersion);
RpcLogger.WriteSystemLog(LogLevel.Information, message);
}

private static void ValidateProperty(string name, string value)
{
if (string.IsNullOrWhiteSpace(value))
{
throw new ArgumentException($"{name} is null or empty", name);
}
}
}

internal class WorkerArguments
{
[Option("host", Required = true, HelpText = "IP Address used to connect to the Host via gRPC.")]
[Option("host", Required = false, HelpText = "IP Address used to connect to the Host via gRPC.")]
public string Host { get; set; }

[Option("port", Required = true, HelpText = "Port used to connect to the Host via gRPC.")]
[Option("port", Required = false, HelpText = "Port used to connect to the Host via gRPC.")]
public int Port { get; set; }

[Option("workerId", Required = true, HelpText = "Worker ID assigned to this language worker.")]
[Option("workerId", Required = false, HelpText = "Worker ID assigned to this language worker.")]
public string WorkerId { get; set; }

[Option("requestId", Required = true, HelpText = "Request ID used for gRPC communication with the Host.")]
[Option("requestId", Required = false, HelpText = "Request ID used for gRPC communication with the Host.")]
public string RequestId { get; set; }

[Option("grpcMaxMessageLength", Required = false, HelpText = "[Deprecated and ignored] gRPC Maximum message size.")]
public int MaxMessageLength { get; set; }
[Option("functions-uri", Required = false, HelpText = "URI with IP Address and Port used to connect to the Host via gRPC.")]
public string FunctionsUri { get; set; }

[Option("functions-workerid", Required = false, HelpText = "Worker ID assigned to this language worker.")]
public string FunctionsWorkerId { get; set; }

[Option("functions-requestid", Required = false, HelpText = "Request ID used for gRPC communication with the Host.")]
public string FunctionsRequestId { get; set; }
}

internal class WorkerOptions
{
public string Host { get; set; }

public int Port { get; set; }

public string WorkerId { get; set; }

public string RequestId { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/resources/PowerShellWorkerStrings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,7 @@
<data name="AutomaticUpgradesAreDisabled" xml:space="preserve">
<value>Automatic upgrades are disabled in PowerShell 7.0 function apps. To enable this functionality back, please migrate your function app to PowerShell 7.2. For more details, see https://aka.ms/functions-powershell-7.0-to-7.2.</value>
</data>
<data name="WorkerInitCompleted" xml:space="preserve">
<value>Worker init request completed in {0} ms.</value>
</data>
</root>