diff --git a/dsl-reference.md b/dsl-reference.md
index 789ca497..b10f3b3f 100644
--- a/dsl-reference.md
+++ b/dsl-reference.md
@@ -58,8 +58,9 @@
+ [Container Lifetime](#container-lifetime)
+ [Process Result](#process-result)
+ [AsyncAPI Server](#asyncapi-server)
- + [AsyncAPI Message](#asyncapi-message)
+ + [AsyncAPI Outbound Message](#asyncapi-outbound-message)
+ [AsyncAPI Subscription](#asyncapi-subscription)
+ + [Subscription Iterator](#subscription-iterator)
## Abstract
@@ -311,7 +312,7 @@ The [AsyncAPI Call](#asyncapi-call) enables workflows to interact with external
| operation | `string` | `yes` | A reference to the AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call.
*Used only in case the referenced document uses AsyncAPI `v3.0.0`.* |
| server | [`asyncApiServer`](#asyncapi-server) | `no` | An object used to configure to the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) on.
If not set, default to the first [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) matching the operation's channel. |
| protocol | `string` | `no` | The [protocol](https://www.asyncapi.com/docs/reference/specification/v3.0.0#definitionsProtocol) to use to select the target [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject).
Ignored if `server` has been set.
*Supported values are: `amqp`, `amqp1`, `anypointmq`, `googlepubsub`, `http`, `ibmmq`, `jms`, `kafka`, `mercure`, `mqtt`, `mqtt5`, `nats`, `pulsar`, `redis`, `sns`, `solace`, `sqs`, `stomp` and `ws`* |
-| message | [`asyncApiMessage`](#asyncapi-message) | `no` | An object used to configure the message to publish using the target operation.
*Required if `subscription` has not been set.* |
+| message | [`asyncApiMessage`](#asyncapi-outbound-message) | `no` | An object used to configure the message to publish using the target operation.
*Required if `subscription` has not been set.* |
| subscription | [`asyncApiSubscription`](#asyncapi-subscription) | `no` | An object used to configure the subscription to messages consumed using the target operation.
*Required if `message` has not been set.* |
| authentication | `string`
[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |
@@ -650,7 +651,19 @@ Provides a mechanism for workflows to await and react to external events, enabli
| Name | Type | Required | Description|
|:--|:---:|:---:|:---|
-| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the event(s) the workflow must listen to. |
+| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the [event(s)](https://cloudevents.io/) the workflow must listen to. |
+| listen.read | `string` | `no` | Specifies how [events](https://cloudevents.io/) are read during the listen operation.
*Supported values are:*
*- `data`: Reads the [event's](https://cloudevents.io/) data.*
*- `envelope`: Reads the [event's](https://cloudevents.io/) envelope, including its [context attributes](https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#context-attributes).*
*- `raw`: Reads the [event's](https://cloudevents.io/) raw data.*
*Defaults to `data`.*|
+| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [event](https://cloudevents.io/). |
+
+> [!NOTE]
+> A `listen` task produces a sequentially ordered array of all the [events](https://cloudevents.io/) it has consumed, and potentially transformed using `foreach.output.as`.
+
+> [!NOTE]
+> When `foreach` is set, the configured operations for a [events](https://cloudevents.io/) must complete before moving on to the next one. As a result, consumed [events](https://cloudevents.io/) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.
+
+> [!WARNING]
+> [Events](https://cloudevents.io/) consumed by an `until` clause should not be included in the task's output. These [events](https://cloudevents.io/) are used solely to determine when the until condition has been met, and they do not contribute to the result or data produced by the task itself
+
##### Examples
@@ -2038,7 +2051,7 @@ do:
bar: baz
```
-### AsyncAPI Message
+### AsyncAPI Outbound Message
Configures an AsyncAPI message to publish.
@@ -2073,6 +2086,29 @@ do:
bar: baz
```
+### AsyncAPI Inbound Message
+
+Configures an AsyncAPI message consumed by a subscription.
+
+#### Properties
+
+| Name | Type | Required | Description |
+|:-------|:------:|:----------:|:--------------|
+| payload | `object` | `no` | The message's payload, if any. |
+| headers | `object` | `no` | The message's headers, if any. |
+| correlationId | `string` | `no` | The message's correlation id, if any. |
+
+#### Examples
+
+```yaml
+payload:
+ greetings: Hello, World!
+headers:
+ foo: bar
+ bar: baz
+correlationid: '123456'
+```
+
### AsyncAPI Subscription
Configures a subscription to an AsyncAPI operation.
@@ -2081,8 +2117,15 @@ Configures a subscription to an AsyncAPI operation.
| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
-| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed messages. |
+| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed [messages](#asyncapi-inbound-message). |
| consume | [`subscriptionLifetime`](#asyncapi-subscription-lifetime) | `yes` | An object used to configure the subscription's lifetime. |
+| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [message](#asyncapi-inbound-message). |
+
+> [!NOTE]
+> An AsyncAPI subscribe operation call produces a sequentially ordered array of all the [messages](#asyncapi-inbound-message) it has consumed, and potentially transformed using `foreach.output.as`.
+
+> [!NOTE]
+> When `foreach` is set, the configured operations for a [message](#asyncapi-inbound-message) must complete before moving on to the next one. As a result, consumed [messages](#asyncapi-inbound-message) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.
#### Examples
@@ -2115,7 +2158,7 @@ Configures the lifetime of an AsyncAPI subscription
#### Properties
| Name | Type | Required | Description |
-|:-------|:------:|:----------:|:--------------|
+|:-----|:----:|:--------:|:------------|
| amount | `integer` | `no` | The amount of messages to consume.
*Required if `while` and `until` have not been set.* |
| for | [`duration`](#duration) | `no` | The [`duration`](#duration) that defines for how long to consume messages. |
| while | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine whether or not to keep consuming messages.
*Required if `amount` and `until` have not been set.* |
@@ -2143,4 +2186,54 @@ do:
until: '${ ($context.messages | length) == 5 }'
for:
seconds: 10
+```
+
+### Subscription Iterator
+
+Configures the iteration over each item (event or message) consumed by a subscription. It encapsulates configuration for processing tasks, output formatting, and export behavior for every item encountered.
+
+#### Properties
+
+| Name | Type | Required | Description |
+|:-----|:----:|:--------:|:------------|
+| item | `string` | `no` | The name of the variable used to store the current item being enumerated.
*Defaults to `item`.* |
+| at | `string` | `no` | The name of the variable used to store the index of the current item being enumerated.
*Defaults to `index`.* |
+| do | [`map[string, task][]`](#task) | `no` | The tasks to perform for each consumed item. |
+| output | [`output`](#output) | `no` | An object, if any, used to customize the item's output and to document its schema. |
+| export | [`export`](#export) | `no` | An object, if any, used to customize the content of the workflow context. |
+
+#### Examples
+
+```yaml
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: test
+ name: asyncapi-example
+ version: '0.1.0'
+do:
+ - subscribeToChatInboxUntil:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: chat-inbox
+ protocol: http
+ subscription:
+ filter: ${ . == $workflow.input.chat.roomId }
+ consume:
+ until: '${ ($context.messages | length) == 5 }'
+ for:
+ seconds: 10
+ foreach:
+ item: message
+ at: index
+ do:
+ - emitEvent:
+ emit:
+ event:
+ with:
+ source: https://serverlessworkflow.io/samples
+ type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
+ data:
+ message: '${ $message }'
```
\ No newline at end of file
diff --git a/examples/call-asyncapi-subscribe-consume-forever-foreach.yaml b/examples/call-asyncapi-subscribe-consume-forever-foreach.yaml
new file mode 100644
index 00000000..b9b9fe95
--- /dev/null
+++ b/examples/call-asyncapi-subscribe-consume-forever-foreach.yaml
@@ -0,0 +1,28 @@
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: examples
+ name: bearer-auth
+ version: '0.1.0'
+do:
+ - getNotifications:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: getNotifications
+ subscription:
+ filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
+ consume:
+ while: '${ true }'
+ foreach:
+ item: message
+ do:
+ - publishCloudEvent:
+ emit:
+ event:
+ with:
+ source: https://serverlessworkflow.io/samples
+ type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
+ data:
+ message: '${ $message }'
+
diff --git a/examples/listen-to-all read-envelope.yaml b/examples/listen-to-all read-envelope.yaml
new file mode 100644
index 00000000..c54682e2
--- /dev/null
+++ b/examples/listen-to-all read-envelope.yaml
@@ -0,0 +1,17 @@
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: test
+ name: listen-to-all-read-envelope
+ version: '0.1.0'
+do:
+ - callDoctor:
+ listen:
+ to:
+ all:
+ - with:
+ type: com.fake-hospital.vitals.measurements.temperature
+ data: ${ .temperature > 38 }
+ - with:
+ type: com.fake-hospital.vitals.measurements.bpm
+ data: ${ .bpm < 60 or .bpm > 100 }
+ read: envelope
\ No newline at end of file
diff --git a/examples/listen-to-any-forever-foreach.yaml b/examples/listen-to-any-forever-foreach.yaml
new file mode 100644
index 00000000..22c01a2e
--- /dev/null
+++ b/examples/listen-to-any-forever-foreach.yaml
@@ -0,0 +1,22 @@
+document:
+ dsl: '1.0.0-alpha1'
+ namespace: test
+ name: listen-to-any-while-foreach
+ version: '0.1.0'
+do:
+ - listenToGossips:
+ listen:
+ to:
+ any: []
+ until: '${ false }'
+ foreach:
+ item: event
+ at: i
+ do:
+ - postToChatApi:
+ call: http
+ with:
+ method: post
+ endpoint: https://fake-chat-api.com/room/{roomId}
+ body:
+ event: ${ $event }
\ No newline at end of file
diff --git a/schema/workflow.yaml b/schema/workflow.yaml
index daaedc41..59565a6d 100644
--- a/schema/workflow.yaml
+++ b/schema/workflow.yaml
@@ -558,7 +558,17 @@ $defs:
$ref: '#/$defs/eventConsumptionStrategy'
title: ListenTo
description: Defines the event(s) to listen to.
+ read:
+ type: string
+ enum: [ data, envelope, raw ]
+ default: data
+ title: ListenAndReadAs
+ description: Specifies how events are read during the listen operation.
required: [ to ]
+ foreach:
+ $ref: '#/$defs/subscriptionIterator'
+ title: ListenIterator
+ description: Configures the iterator, if any, for processing consumed event(s).
raiseTask:
type: object
$ref: '#/$defs/taskBase'
@@ -1710,6 +1720,10 @@ $defs:
$ref: '#/$defs/asyncApiMessageConsumptionPolicy'
title: AsyncApiMessageConsumptionPolicy
description: An object used to configure the subscription's message consumption policy.
+ foreach:
+ $ref: '#/$defs/subscriptionIterator'
+ title: AsyncApiSubscriptionIterator
+ description: Configures the iterator, if any, for processing consumed messages(s).
required: [ consume ]
asyncApiMessageConsumptionPolicy:
type: object
@@ -1740,3 +1754,31 @@ $defs:
title: AsyncApiMessageConsumptionPolicyUntil
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
required: [ until ]
+ subscriptionIterator:
+ type: object
+ title: SubscriptionIterator
+ description: Configures the iteration over each item (event or message) consumed by a subscription.
+ unevaluatedProperties: false
+ properties:
+ item:
+ type: string
+ title: SubscriptionIteratorItem
+ description: The name of the variable used to store the current item being enumerated.
+ default: item
+ at:
+ type: string
+ title: SubscriptionIteratorIndex
+ description: The name of the variable used to store the index of the current item being enumerated.
+ default: index
+ do:
+ $ref: '#/$defs/taskList'
+ title: SubscriptionIteratorTasks
+ description: The tasks to perform for each consumed item.
+ output:
+ $ref: '#/$defs/output'
+ title: SubscriptionIteratorOutput
+ description: An object, if any, used to customize the item's output and to document its schema.
+ export:
+ $ref: '#/$defs/export'
+ title: SubscriptionIteratorExport
+ description: An object, if any, used to customize the content of the workflow context.
\ No newline at end of file