From 86535bc9d642cdfee1c390fb7cca9e4428a2bf32 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Tue, 7 Jan 2025 17:31:03 +0100 Subject: [PATCH 1/2] Initial commit Signed-off-by: Charles d'Avernas --- .../Services/XmlSchemaHandler.cs | 3 +- src/runner/Synapse.Runner/Program.cs | 5 + .../Executors/AsyncApiCallExecutor.cs | 214 ++++++++++++++++++ .../Services/Executors/OpenApiCallExecutor.cs | 6 +- .../Services/TaskExecutorFactory.cs | 1 + .../Services/WorkflowExecutor.cs | 1 - .../Synapse.Runner/Synapse.Runner.csproj | 2 + 7 files changed, 226 insertions(+), 6 deletions(-) create mode 100644 src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs diff --git a/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs b/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs index 41f919bba..6fc558d33 100644 --- a/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs +++ b/src/core/Synapse.Core.Infrastructure/Services/XmlSchemaHandler.cs @@ -15,9 +15,8 @@ using Neuroglia.Serialization; using ServerlessWorkflow.Sdk; using System.Net; -using System.Xml.Schema; using System.Xml; -using Avro.Generic; +using System.Xml.Schema; namespace Synapse.Core.Infrastructure.Services; diff --git a/src/runner/Synapse.Runner/Program.cs b/src/runner/Synapse.Runner/Program.cs index 292bd6632..48662b4a6 100644 --- a/src/runner/Synapse.Runner/Program.cs +++ b/src/runner/Synapse.Runner/Program.cs @@ -12,6 +12,9 @@ // limitations under the License. using Moq; +using Neuroglia.AsyncApi; +using Neuroglia.AsyncApi.Client; +using Neuroglia.AsyncApi.Client.Bindings; using Neuroglia.Serialization.Xml; using NReco.Logging.File; using ServerlessWorkflow.Sdk.IO; @@ -97,6 +100,8 @@ services.AddServerlessWorkflowIO(); services.AddNodeJSScriptExecutor(); services.AddPythonScriptExecutor(); + services.AddAsyncApi(); + services.AddAsyncApiClient(options => options.AddAllBindingHandlers()); services.AddSingleton(); services.AddSingleton(provider => provider.GetRequiredService()); services.AddSingleton(provider => provider.GetRequiredService()); diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs new file mode 100644 index 000000000..3d09ff2d8 --- /dev/null +++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs @@ -0,0 +1,214 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia; +using Neuroglia.AsyncApi; +using Neuroglia.AsyncApi.Client; +using Neuroglia.AsyncApi.Client.Services; +using Neuroglia.AsyncApi.IO; +using Neuroglia.AsyncApi.v3; +using Neuroglia.Data.Expressions; + +namespace Synapse.Runner.Services.Executors; + +/// +/// Represents an used to execute AsyncAPI s using an +/// +/// The current +/// The service used to perform logging +/// The service used to create s +/// The service used to create s +/// The current +/// The service used to provide implementations +/// The service used to serialize/deserialize objects to/from JSON +/// The service used to create s +/// The service used to read s +/// The service used to create s +public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, + ITaskExecutionContext context, Core.Infrastructure.Services.ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, IHttpClientFactory httpClientFactory, IAsyncApiDocumentReader asyncApiDocumentReader, IAsyncApiClientFactory asyncApiClientFactory) + : TaskExecutor(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer) +{ + + /// + /// Gets the service used to create s + /// + protected IHttpClientFactory HttpClientFactory { get; } = httpClientFactory; + + /// + /// Gets the service used to read s + /// + protected IAsyncApiDocumentReader AsyncApiDocumentReader { get; } = asyncApiDocumentReader; + + /// + /// Gets the service used to create s + /// + protected IAsyncApiClientFactory AsyncApiClientFactory { get; } = asyncApiClientFactory; + + /// + /// Gets the definition of the AsyncAPI call to perform + /// + protected AsyncApiCallDefinition? AsyncApi { get; set; } + + /// + /// Gets/sets the that defines the AsyncAPI operation to call + /// + protected V3AsyncApiDocument? Document { get; set; } + + /// + /// Gets the to call + /// + protected KeyValuePair Operation { get; set; } + + /// + /// Gets an object used to describe the credentials, if any, used to authenticate a user agent with the AsyncAPI application + /// + protected AuthorizationInfo? Authorization { get; set; } + + /// + /// Gets/sets the payload, if any, of the message to publish, in case the 's has been set to + /// + protected object? MessagePayload { get; set; } + + /// + /// Gets/sets the headers, if any, of the message to publish, in case the 's has been set to + /// + protected object? MessageHeaders { get; set; } + + /// + protected override async Task DoInitializeAsync(CancellationToken cancellationToken) + { + this.AsyncApi = (AsyncApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(AsyncApiCallDefinition))!; + using var httpClient = this.HttpClientFactory.CreateClient(); + await httpClient.ConfigureAuthenticationAsync(this.AsyncApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); + var uriString = StringFormatter.NamedFormat(this.AsyncApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary()); + if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The AsyncAPI endpoint URI cannot be null or empty"); + if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI"); + using var request = new HttpRequestMessage(HttpMethod.Get, uriString); + using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + this.Logger.LogInformation("Failed to retrieve the AsyncAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", uri, response.StatusCode); + this.Logger.LogDebug("Response content:\r\n{responseContent}", responseContent ?? "None"); + response.EnsureSuccessStatusCode(); + } + using var responseStream = await response.Content!.ReadAsStreamAsync(cancellationToken)!; + var document = await this.AsyncApiDocumentReader.ReadAsync(responseStream, cancellationToken).ConfigureAwait(false); + if (document is not V3AsyncApiDocument v3Document) throw new NotSupportedException("Synapse only supports AsyncAPI v3.0.0 at the moment"); + this.Document = v3Document; + var operationId = this.AsyncApi.OperationRef; + if (operationId.IsRuntimeExpression()) operationId = await this.Task.Workflow.Expressions.EvaluateAsync(operationId, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(operationId)) throw new NullReferenceException("The operation ref cannot be null or empty"); + var operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId); + if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'"); + if (this.AsyncApi.Authentication != null) this.Authorization = await AuthorizationInfo.CreateAsync(this.AsyncApi.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); + switch (this.Operation.Value.Action) + { + case V3OperationAction.Receive: + await this.BuildMessagePayloadAsync(cancellationToken).ConfigureAwait(false); + await this.BuildMessageHeadersAsync(cancellationToken).ConfigureAwait(false); + break; + case V3OperationAction.Send: + + break; + default: + throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); + } + } + + /// + /// Builds the payload, if any, of the message to publish, in case the 's has been set to + /// + /// A + /// A new awaitable + protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancellationToken = default) + { + if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.Task.Input == null) this.MessagePayload = new { }; + if (this.AsyncApi.Payload == null) return; + var arguments = this.GetExpressionEvaluationArguments(); + if (this.Authorization != null) + { + arguments ??= new Dictionary(); + arguments.Add("authorization", this.Authorization); + } + this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); + } + + /// + /// Builds the headers, if any, of the message to publish, in case the 's has been set to + /// + /// A + /// A new awaitable + protected virtual async Task BuildMessageHeadersAsync(CancellationToken cancellationToken = default) + { + if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.AsyncApi.Headers == null) return; + var arguments = this.GetExpressionEvaluationArguments(); + if (this.Authorization != null) + { + arguments ??= new Dictionary(); + arguments.Add("authorization", this.Authorization); + } + this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); + } + + /// + protected override Task DoExecuteAsync(CancellationToken cancellationToken) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + switch (this.Operation.Value.Action) + { + case V3OperationAction.Receive: + return this.DoExecutePublishOperationAsync(cancellationToken); + case V3OperationAction.Send: + return this.DoExecuteSubscribeOperationAsync(cancellationToken); + default: + throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); + } + } + + /// + /// Executes an AsyncAPI publish operation + /// + /// A + /// A new awaitable + protected virtual async Task DoExecutePublishOperationAsync(CancellationToken cancellationToken) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); + var parameters = new AsyncApiPublishOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol) + { + Payload = this.MessagePayload, + Headers = this.MessageHeaders + }; + await using var result = await asyncApiClient.PublishAsync(parameters, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI publish operation"); + } + + /// + /// Executes an AsyncAPI subscribe operation + /// + /// A + /// A new awaitable + protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken cancellationToken) + { + if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); + var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol); + await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); + if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation"); + } + +} \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs index 8536661c6..179de2078 100644 --- a/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs @@ -22,7 +22,7 @@ namespace Synapse.Runner.Services.Executors; /// -/// Represents an used to execute http s using an +/// Represents an used to execute OpenAPI s using an /// /// The current /// The service used to perform logging @@ -111,7 +111,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo await httpClient.ConfigureAuthenticationAsync(this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); var uriString = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary()); if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); - if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or whitespace"); + if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or empty"); if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI"); using var request = new HttpRequestMessage(HttpMethod.Get, uriString); using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); @@ -287,4 +287,4 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } -} \ No newline at end of file +} diff --git a/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs b/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs index 7804050fb..524a068bc 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutorFactory.cs @@ -67,6 +67,7 @@ protected virtual ITaskExecutor CreateCallTaskExecutor(IServiceProvider serviceP ArgumentNullException.ThrowIfNull(context); return context.Definition.Call switch { + Function.AsyncApi => ActivatorUtilities.CreateInstance(serviceProvider, context), Function.Grpc => ActivatorUtilities.CreateInstance(serviceProvider, context), Function.Http => ActivatorUtilities.CreateInstance(serviceProvider, context), Function.OpenApi => ActivatorUtilities.CreateInstance(serviceProvider, context), diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs index 26c3948dc..4fb81d77f 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs @@ -12,7 +12,6 @@ // limitations under the License. using Neuroglia.Data.Infrastructure.ResourceOriented; -using ServerlessWorkflow.Sdk.Models; namespace Synapse.Runner.Services; diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index 10b855089..bd007de3a 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -61,6 +61,8 @@ + + From 076ce9330befbfdad44695ee4e8fff1a39559999 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Fri, 10 Jan 2025 15:14:24 +0100 Subject: [PATCH 2/2] feat(Runner): Implement the `AsyncApiCallExecutor` Signed-off-by: Charles d'Avernas --- .../Synapse.Api.Client.Http.csproj | 2 +- .../Synapse.Api.Http/Synapse.Api.Http.csproj | 2 +- .../Synapse.Api.Server.csproj | 2 +- src/cli/Synapse.Cli/Synapse.Cli.csproj | 2 +- .../Synapse.Core.Infrastructure.csproj | 2 +- src/core/Synapse.Core/Synapse.Core.csproj | 2 +- .../Synapse.Correlator.csproj | 4 +- .../Services/WorkflowGraphBuilder.cs | 4 +- .../Synapse.Dashboard.csproj | 2 +- .../Executors/AsyncApiCallExecutor.cs | 61 ++++++++++++------- .../Executors/FunctionCallExecutor.cs | 2 +- .../Synapse.Runner/Synapse.Runner.csproj | 6 +- .../Synapse.IntegrationTests.csproj | 8 +-- .../Synapse.UnitTests.csproj | 10 +-- 14 files changed, 63 insertions(+), 46 deletions(-) diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj index e1837226a..d9b461d81 100644 --- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj +++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj @@ -43,7 +43,7 @@ - + diff --git a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj index 132ec7a17..06d312628 100644 --- a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj +++ b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj @@ -45,7 +45,7 @@ - + diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj index 676057672..77eeac22f 100644 --- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj +++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj @@ -35,7 +35,7 @@ - + diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj index 8ceee1914..0808cab49 100644 --- a/src/cli/Synapse.Cli/Synapse.Cli.csproj +++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj @@ -33,7 +33,7 @@ - + diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj index fedb03923..90a9fdfcd 100644 --- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj +++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj @@ -51,7 +51,7 @@ - + diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index e1deacf55..63e4011b5 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -70,7 +70,7 @@ - + diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj index 7046016bc..d67a4e3cf 100644 --- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj +++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj @@ -42,8 +42,8 @@ - - + + diff --git a/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs b/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs index 45d2acd3a..027730a64 100644 --- a/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs +++ b/src/dashboard/Synapse.Dashboard/Services/WorkflowGraphBuilder.cs @@ -226,7 +226,7 @@ protected INodeViewModel BuildTaskNode(TaskNodeRenderingContext context) protected virtual NodeViewModel BuildCallTaskNode(TaskNodeRenderingContext context) { ArgumentNullException.ThrowIfNull(context); - var content = string.Empty; + string content; ; string callType; switch (context.TaskDefinition.Call.ToLower()) { @@ -234,7 +234,7 @@ protected virtual NodeViewModel BuildCallTaskNode(TaskNodeRenderingContext - + diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs index 3d09ff2d8..9f0843f2d 100644 --- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs @@ -107,11 +107,12 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo var document = await this.AsyncApiDocumentReader.ReadAsync(responseStream, cancellationToken).ConfigureAwait(false); if (document is not V3AsyncApiDocument v3Document) throw new NotSupportedException("Synapse only supports AsyncAPI v3.0.0 at the moment"); this.Document = v3Document; - var operationId = this.AsyncApi.OperationRef; + if (string.IsNullOrWhiteSpace(this.AsyncApi.Operation)) throw new NullReferenceException("The 'operation' parameter must be set when performing an AsyncAPI v3 call"); + var operationId = this.AsyncApi.Operation; if (operationId.IsRuntimeExpression()) operationId = await this.Task.Workflow.Expressions.EvaluateAsync(operationId, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); if (string.IsNullOrWhiteSpace(operationId)) throw new NullReferenceException("The operation ref cannot be null or empty"); - var operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId); - if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'"); + this.Operation = this.Document.Operations.FirstOrDefault(o => o.Key == operationId); + if (this.Operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{operationId}' in AsyncAPI document at '{uri}'"); if (this.AsyncApi.Authentication != null) this.Authorization = await AuthorizationInfo.CreateAsync(this.AsyncApi.Authentication, this.ServiceProvider, this.Task.Workflow.Definition, cancellationToken).ConfigureAwait(false); switch (this.Operation.Value.Action) { @@ -119,11 +120,8 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo await this.BuildMessagePayloadAsync(cancellationToken).ConfigureAwait(false); await this.BuildMessageHeadersAsync(cancellationToken).ConfigureAwait(false); break; - case V3OperationAction.Send: - - break; - default: - throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); + case V3OperationAction.Send: break; + default: throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); } } @@ -134,16 +132,16 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo /// A new awaitable protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancellationToken = default) { - if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.AsyncApi == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); if (this.Task.Input == null) this.MessagePayload = new { }; - if (this.AsyncApi.Payload == null) return; + if (this.AsyncApi.Message?.Payload == null) return; var arguments = this.GetExpressionEvaluationArguments(); if (this.Authorization != null) { arguments ??= new Dictionary(); arguments.Add("authorization", this.Authorization); } - this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); + this.MessagePayload = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Message.Payload, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); } /// @@ -153,30 +151,27 @@ protected virtual async Task BuildMessagePayloadAsync(CancellationToken cancella /// A new awaitable protected virtual async Task BuildMessageHeadersAsync(CancellationToken cancellationToken = default) { - if (this.AsyncApi == null || this.Operation == null) throw new InvalidOperationException("The executor must be initialized before execution"); - if (this.AsyncApi.Headers == null) return; + if (this.AsyncApi == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.AsyncApi.Message?.Headers == null) return; var arguments = this.GetExpressionEvaluationArguments(); if (this.Authorization != null) { arguments ??= new Dictionary(); arguments.Add("authorization", this.Authorization); } - this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); + this.MessageHeaders = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Message.Headers, this.Task.Input!, arguments, cancellationToken).ConfigureAwait(false); } /// protected override Task DoExecuteAsync(CancellationToken cancellationToken) { if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); - switch (this.Operation.Value.Action) + return this.Operation.Value.Action switch { - case V3OperationAction.Receive: - return this.DoExecutePublishOperationAsync(cancellationToken); - case V3OperationAction.Send: - return this.DoExecuteSubscribeOperationAsync(cancellationToken); - default: - throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"); - } + V3OperationAction.Receive => this.DoExecutePublishOperationAsync(cancellationToken), + V3OperationAction.Send => this.DoExecuteSubscribeOperationAsync(cancellationToken), + _ => throw new NotSupportedException($"The specified operation action '{this.Operation.Value.Action}' is not supported"), + }; } /// @@ -195,6 +190,7 @@ protected virtual async Task DoExecutePublishOperationAsync(CancellationToken ca }; await using var result = await asyncApiClient.PublishAsync(parameters, cancellationToken).ConfigureAwait(false); if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI publish operation"); + await this.SetResultAsync(null, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } /// @@ -205,10 +201,31 @@ protected virtual async Task DoExecutePublishOperationAsync(CancellationToken ca protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken cancellationToken) { if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); + if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation"); await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol); await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation"); + if(result.Messages == null) + { + await this.SetResultAsync(null, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } + var observable = result.Messages; + if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan())); + if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value); + else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () => + { + var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false); + return (message, keepGoing); + })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); + else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () => + { + var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)); + return (message, keepGoing); + })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); + var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false); + await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } } \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/Executors/FunctionCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/FunctionCallExecutor.cs index 7d9a6e0a0..f3ad32534 100644 --- a/src/runner/Synapse.Runner/Services/Executors/FunctionCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/FunctionCallExecutor.cs @@ -113,7 +113,7 @@ protected virtual async Task GetCustomFunctionAsync(string funct if (components.Length != 2) throw new Exception($"The specified value '{functionName}' is not a valid custom function qualified name ({{name}}:{{version}})"); var name = components[0]; var version = components[1]; - if (!SemVersion.TryParse(version, SemVersionStyles.Strict, out _)) throw new Exception($"The specified value '{version}' is not a valid semantic version 2.0"); + if (!Semver.SemVersion.TryParse(version, SemVersionStyles.Strict, out _)) throw new Exception($"The specified value '{version}' is not a valid semantic version 2.0"); if (catalogName == SynapseDefaults.Tasks.CustomFunctions.Catalogs.Default) { var function = await this.Task.Workflow.CustomFunctions.GetAsync(name, cancellationToken).ConfigureAwait(false) ?? throw new NullReferenceException($"Failed to find the specified custom function '{name}'"); diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index bd007de3a..176f1413e 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -52,14 +52,14 @@ - + - + - + diff --git a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj index 15a1cf9de..153c360c1 100644 --- a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj +++ b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj @@ -10,17 +10,17 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj index 8952fcaf4..7006de666 100644 --- a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj +++ b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj @@ -10,7 +10,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -22,14 +22,14 @@ - - + + - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive