Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions guidelines/mwm-sadd.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ MWM provides the following APIs:

- **Workflows API**: A set of APIs to create, update and delete workflows.
- **Tasks API**: A set of APIs to query & update task status.
- **Credentials API**: Provides the ability to manager credentials to be used when the MWM communicates with external services.
- **Credentials API**: Provides the ability to manager credentials to be used when the MWM communicates with external services..
---

### Internal Services

- **Payload Listener**: A service responsible for consuming messages about new input data from a message queue, and executing the Workflow Manager service.
- **Workflow Manager**: A service that executes pre-registered workflows on given input data.
- **Task Manager**: A service that orchestrates the execution of a single workflow task, invoking specific task plugins. It also fulfils calls to the Tasks API.
- **Task Dispatcher**: A service that dispatches and records jobs sent to the orchestration engines.
- **Event Broker Adapter**: A service responsible for consuming messages from an event queue, and executing the relevant service based on the event type. This component also provides utility functions to publish new messages to the queue.
- **Workflow Executer Service**: A service that executes pre-registered workflows on given input data.
- **Task Runner Service**: A service that orchestrates the execution of a single workflow task, invoking specific task plugins. It also fulfils calls to the Tasks API.
- **Export Service**: A service responsible emitting output notification events when workflows complete so listeners can retrieve output files.
- **Data Retention Service**: Monitors storage usages, apply data retention policies, and cleans up storage.

Expand Down Expand Up @@ -95,6 +94,7 @@ _TBD_
The Workflows API allows clients to register Clinical Workflows. It will validate & store them.

##### POST /workflows
This endpoint saves the a workflow.
###### Body
A workflow definition as per the [workflow definition spec](mwm-workflow-spec.md).
![register](static/mwm-workflows-register.png)
Expand All @@ -106,6 +106,7 @@ A workflow definition as per the [workflow definition spec](mwm-workflow-spec.md
- Content: `{"id": UUID}`

##### PUT /workflows/WORKFLOW_ID
Updates a workflow with the workflow specification sent in the body. This will create a new version of the workflow.
###### Body
A workflow definition as per the [workflow definition spec](mwm-workflow-spec.md).
May include a partial schema - the included attributes will be updated in the Workflow.
Expand All @@ -114,7 +115,7 @@ May include a partial schema - the included attributes will be updated in the Wo

- `200`:
- Description: Workflow updated successfully.
- Content: none.
- Content: `{"id": UUID, "version_id": string}`.

##### DELETE /workflows/WORKFLOW_ID
![delete](static/mwm-workflows-delete.png)
Expand All @@ -133,6 +134,7 @@ May include a partial schema - the included attributes will be updated in the Wo
[
{
"workflow_id": UUID,
"version_id": string,
"status": "active"
}
]
Expand All @@ -150,32 +152,43 @@ May include a partial schema - the included attributes will be updated in the Wo
URL: `/tasks`
Endpoints:
- Retrieve task by ID.
- Provide task result by ID: used by app servers to perform callbacks.
- Retrieve tasks (filter by current/time period)

_More details to be added._

### Internal Modules

#### Payload Listener
The Payload Listener monitors an input queue. The MIG (or a custom ingestion service) will add an event to that queue when new data is sent to the system.
#### Event Broker Adapter
And the adapter communicates with an event broker.

The adapter will listen on multiple queues, as follows:

| Queue Name | Publisher | Consumer |
|------|------|------|
|md.workflow.request|Informatics Gateway|Workflow Executer
|md.workflow.task_dispatch|Workflow Executer|Task Executer
|md.workflow.task_callback|App Servers|Task Executer
|md.export.complete|Workflow Executer|Informatics Gateway

##### The Workflow Request Queue
The MIG (or a custom ingestion service) will add an event to that queue when new data is sent to the system.
For more details & the event schema see [input](mwm-input.md#notification-message-schema)

![payloadlistener](static/mwm-payload-listener.png)

#### Workflow Manager
The workflow manager is responsible for running a workflow - executing tasks, passing metadata from one task to the other, and evaluating [Evaluators](mwm-workflow-spec.md#evaluators). Finally, it will hand over responsibility to the Export Notification service when the workflow is complete.
#### Workflow Executer
The workflow executer is responsible for running a workflow - scheduling tasks tasks, passing metadata from one task to the other, and evaluating [Evaluators](mwm-workflow-spec.md#evaluators). Finally, it will hand over responsibility to the Export Notification service when the workflow is complete.

#### Task Manager
The task manager is responsible for orchestrating tasks. The workflow manager triggers it for each Task in a workflow. It's responsible for:
#### Task Executer
The task executer is responsible for running tasks. It is invoked by the Event Broker Adapter whenever there is a new event in the task dispatch or task callback queues.
It's responsible for:
* Launching a specific task plugin
* Providing the task plugin with the metadata given to it by the Workflow Manager.
* Providing the task plugin with the path to an empty output directory.
* Providing the task plugin with the path to the input data.
* Listening for task Callbacks (see [Tasks API](#tasks-api)).
* Listening for task Callbacks.
* Adding result metadata from a Task to the workflow execution context.
* Notifying the Workflow Manager when a task has completed.
* Notifying the Workflow Executer when a task has completed.


#### Task Plugin
Expand All @@ -193,7 +206,7 @@ Responsibilities of plugins:
* Adding task output files to the output directory

#### Export Service
The export service is used when a Task has an [output destination](mwm-workflow-spec.md#output-destinations) external to the system (ie not another task). The export services publishes an output event to a Pub/Sub service. Once published, this export operation is considered complete.
The export service is used when a Task has an [output destination](mwm-workflow-spec.md#destinations) external to the system (ie not another task). The export services publishes an output event to a Pub/Sub service. Once published, this export operation is considered complete.

#### Data Retention Service

Expand Down
60 changes: 40 additions & 20 deletions guidelines/mwm-workflow-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The MONAI Workflow Manager is responsible for executing pre-registered clinical
- [HTTP](#http-calls)
- [Task Templates](#task-templates)
- [Evaluators](#evaluators)
- [Output Destinations](#output)
- [Task Destinations](#task-destinations)
- [Export Destinations](#export-destinations)
- [Pre Evaluators](#pre-evaluators)
- [Retention Policies](#retention-policies)

Expand Down Expand Up @@ -44,18 +45,18 @@ This section contains the IG configuration. Specifically, it contains the follow

| Property | Type | Description |
|------|------|------|
|ae-title|str|The AE title for this workflow. Only data sent to this AE title will be processed by this workflow.|
|data-origins|list[str]|List of possible origin systems. These should be registered with the informatics gateway.|
|export-destinations|list[str]|List of possible destinations for the output of tasks in this workflow. Informatics gateways can subscribe to notifications of output to these destinations.|
|ae_title|str|The AE title for this workflow. Only data sent to this AE title will be processed by this workflow.|
|data_origins|list[str]|List of possible origin systems. These should be registered with the informatics gateway.|
|export_destinations|list[str]|List of possible destinations for the output of tasks in this workflow. Informatics gateways can subscribe to notifications of output to these destinations.|

```json
{
"ae-title": "MY_AET",
"data-origins": ["MY_MODALITY"],
"export-destinations": ["PROD_PACS"]
"ae_title": "MY_AET",
"data_origins": ["MY_MODALITY"],
"export_destinations": ["PROD_PACS"]
}
```
The above specifies that the workflow should be triggered for inputs sent to the ae-title "MYAET" from "MY_MODALITY".
The above specifies that the workflow should be triggered for inputs sent to the ae-title "MY_AET" from "MY_MODALITY".
It also defines the "PROD_PACS" output destination, meaning that it can be used:
* By tasks as the [destination of their output](#output).
* By subscribers to [export notifications](mwm-sadd.md#export-service).
Expand All @@ -74,7 +75,8 @@ The task object contains the information:
|args|object|An object that will be available to the task plugin when executing this task. The expected contents differ based on the task type.|
|ref|Optional[str]|A reference to a [task template](#task-templates). Values provided by the template are overridden by the task's definition.|
|pre_evaluators|list[[PreEvaluator](#pre-evaluators)]||
|destinations|list[[Output](#output)]|List of possible destinations for the output of tasks in this workflow. Informatics gateways can subscribe to notifications of output to these destinations.|
|task_destinations|Optional[list[[TaskDestination](#task-destinations)]]|An optional list of possible tasks that could be executed following this task. They will be executed if their conditions are true.|
|export_destinations|Optional[list[[ExportDestination](#export-destinations)]]|An optional lists of possible export destinations to which the output of this task can be sent.|
|artifacts|[ArtifactMap](#artifacts)|Input & output artifacts of this task.


Expand Down Expand Up @@ -266,7 +268,12 @@ In order to check a certain tag across _all_ series, use the study level tags. F
```

### Destinations
Destinations define the next task to be executed. They can either be export destinations, or another task.
Destinations allow the workflow manager to determine what should happen to the output of a task. There are two types of destinations – task destinations, which reference another task in the workflow to be executed and export destinations, which reference a location external to the workflow manager.

#### Task Destinations

Task destinations define the next task to be executed.
Sometimes the destination will differ based on some condition. For this, [evaluators](#evaluators) can be used as conditions for output destinations.

The basic format is as follows:

Expand All @@ -275,35 +282,48 @@ The basic format is as follows:
|name|str|The name of the destination. This can either be an export destinations or a task's ID.|
|conditions|Optional[list[Evaluator]]|An optional array of [Evaluators](#evaluators) that need to be met in order for this destination to be used.|

Example (simple output to an export destination):

Example (run my-task-id when the patient is female):

```json
{
...task...
"destinations": [
"task_destinations": [
{
"name": "PROD_PACS",
}
"name": "my-task-id",
"conditions": ["{{context.dicom.tags[('0010','0040')]}} == 'F'"]
},
],
...
}
```

Sometimes the destination will differ based on some condition. For this, evaluators can be used as conditions for output destinations.

#### Export Destinations
Export destinations define an external location to which the output of the task can be sent. This will take the form of an event published to a pub/sub service notifying of an available export to a specific destination reference. Most commonly, the export location will be a PACs system and the notification will be picked up by the Monai Informatics Gateway.

| Property | Type | Description |
|------|------|------|
|name|str|The name of the destination. This can either be an export destinations already defined within the [Informatics Gateway](#informatics-gateway) section of the workflow configuration.|
|conditions|Optional[list[Evaluator]]|An optional array of [Evaluators](#evaluators) that need to be met in order for this destination to be used.|
|artifacts|list[Artifact]|An array of [Artifacts](#artifacts) that should be sent to this export destination.|

Example (output sent to another task if the patient is female, otherwise to PACS):
```json
{
...task...
"destinations": [
{
"name": "my-task-id",
"conditions": ["{{context.dicom.tags[('0010','0040')]}} == 'F'"]
},
"export_destinations": [
{
"name": "PROD_PACS",
"conditions": ["{{context.dicom.tags[('0010','0040')]}} != 'F'"]
}
],
"task_destinations": [
{
"name": "my-task-id",
"conditions": ["{{context.dicom.tags[('0010','0040')]}} == 'F'"]
}
],
...
}
```
Expand Down
Binary file modified guidelines/static/mwm-detailed.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.