Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions guidelines/mwm-workflow-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ Each Artifact contains at least two elements:
|------|------|------|-----|
|name|str|Always|The name of this artifact.|
|value|Artifact identifier|Required for inputs, optional for outputs.|The context variable for this artifact (see example). If defined for output, that value will be used instead of a Task-generated one.|
| mandatory| bool | No (default false) | Determines whether this artifact is mandatory. If a mandatory artifact doesn't exist the task is marked as a failure and workflow execution stops. |
| mandatory| bool | No (default true) | Determines whether this artifact is mandatory. If a mandatory artifact doesn't exist the task is marked as a failure and workflow execution stops. |


As you can see in the example below, input artifacts require a _value_. This is a reference to a previously generated artifact, or to `context.input` - a value for an artifact representing the original input that triggered the workflow.
Expand Down Expand Up @@ -271,7 +271,6 @@ The Execution object contains the following properties:
|------|------|------|
|execution_id|str|The unique ID identifying task execution.|
|task_id|str|The ID of this task in the workflow.|
|input_dir|str|Path to the input directory of this task.|
|output_dir|str|Path to the output directory of this task.|
|task|dict|The details of the executed task. Similar to the Task definition in the workflow spec.|
|start_time|timestamp|The UTC timestamp of when this task execution began.
Expand Down
41 changes: 41 additions & 0 deletions src/ConditionsResolver/Constants/ParameterConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021-2022 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Monai.Deploy.WorkflowManager.ConditionsResolver.Constants
{
public static class ParameterConstants
{
public const string TaskId = "task_id";
public const string Status = "status";
public const string ExecutionId = "execution_id";
public const string OutputDirectory = "output_dir";
public const string TaskType = "task_type";
public const string PreviousTaskId = "previous_task_id";
public const string ErrorMessage = "error_msg";
public const string Result = "result";
public const string StartTime = "start_time";

public const string Name = "name";
public const string Description = "description";

public const string PatientId = "id";
public const string PatientName = "name";
public const string PatientSex = "sex";
public const string PatientDob = "dob";
public const string PatientAge = "age";
public const string PatientHospitalId = "hospital_id";
}
}
88 changes: 69 additions & 19 deletions src/ConditionsResolver/Parser/ConditionalParameterParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* limitations under the License.
*/

using System.Globalization;
using System.Text.RegularExpressions;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Monai.Deploy.WorkflowManager.Common.Interfaces;
using Monai.Deploy.WorkflowManager.ConditionsResolver.Constants;
using Monai.Deploy.WorkflowManager.ConditionsResolver.Resolver;
using Monai.Deploy.WorkflowManager.Contracts.Models;
using Monai.Deploy.WorkflowManager.Storage.Services;
Expand All @@ -36,7 +38,7 @@ public enum ParameterContext

public class ConditionalParameterParser : IConditionalParameterParser
{
private const string ExecutionsTask = "context.executions.task";
private const string ExecutionsTask = "context.executions";
private const string ContextDicomSeries = "context.dicom.series";
private const string PatientDetails = "context.input.patient_details";
private const string ContextWorkflow = "context.workflow";
Expand Down Expand Up @@ -182,7 +184,7 @@ private void ClearWorkflowParser()
/// <summary>
/// Resolves a query between two brackets {{ query }}
/// </summary>
/// <param name="value">The query Example: {{ context.executions.task['other task'].'Fred' }}</param>
/// <param name="value">The query Example: {{ context.executions.other_task.Result.'Fred' }}</param>
/// <returns>
/// Tuple:
/// Result of the resolution
Expand Down Expand Up @@ -243,29 +245,77 @@ private void ClearWorkflowParser()
private (string? Result, ParameterContext Context) ResolveExecutionTasks(string value)
{
var subValue = value.Trim().Substring(ExecutionsTask.Length, value.Length - ExecutionsTask.Length);
var subValues = subValue.Split('[', ']');
var subValues = subValue.Split('.');
var id = subValues[1].Trim('\'');

var task = WorkflowInstance?.Tasks.First(t => t.TaskId == id);
var task = WorkflowInstance?.Tasks.FirstOrDefault(t => t.TaskId == id);

if (task is null || task is not null && !task.Metadata.Any())
if (task is null)
{
return (Result: null, Context: ParameterContext.TaskExecutions);
}

var metadataKey = subValues[2].Split('\'')[1];
var subValueKey = subValues[2];
string? keyValue = null;

if (task is not null && task.Metadata.ContainsKey(metadataKey))
if (subValues.Length > 3)
{
var result = task.Metadata[metadataKey];
keyValue = subValues[3]?.Split('\'')[1];
}

var resultStr = null as string;
switch (subValueKey.ToLower())
{
case ParameterConstants.TaskId:
resultStr = task.TaskId;
break;
case ParameterConstants.Status:
resultStr = task.Status.ToString();
break;
case ParameterConstants.ExecutionId:
resultStr = task.ExecutionId;
break;
case ParameterConstants.OutputDirectory:
resultStr = task.OutputDirectory;
break;
case ParameterConstants.TaskType:
resultStr = task.TaskType;
break;
case ParameterConstants.PreviousTaskId:
resultStr = task.PreviousTaskId;
break;
case ParameterConstants.ErrorMessage:
resultStr = task.Reason.ToString();
break;
case ParameterConstants.Result:
resultStr = GetValueFromDictionary(task.ResultMetadata, keyValue);
break;
case ParameterConstants.StartTime:
resultStr = task.TaskStartTime?.ToString("dd/MM/yyyy HH:mm:ss");
break;
default:
break;
}

return (Result: resultStr, Context: ParameterContext.TaskExecutions);
}

if (result is string resultStr)
private static string? GetValueFromDictionary(Dictionary<string, object> dictionary, string? key)
{
if (key is null)
{
return null;
}

if (dictionary.TryGetValue(key, out var value))
{
if (value is string valueStr)
{
return (Result: resultStr, Context: ParameterContext.TaskExecutions);
return valueStr;
}
}

return (Result: null, Context: ParameterContext.TaskExecutions);
return null;
}

private (string? Result, ParameterContext Context) ResolveContextWorkflow(string value)
Expand All @@ -288,10 +338,10 @@ private void ClearWorkflowParser()
var resultStr = null as string;
switch (keyValue)
{
case "name":
case ParameterConstants.Name:
resultStr = workflowSpecValue.Name;
break;
case "description":
case ParameterConstants.Description:
resultStr = workflowSpecValue.Description;
break;
default:
Expand Down Expand Up @@ -324,22 +374,22 @@ private void ClearWorkflowParser()
var resultStr = null as string;
switch (keyValue)
{
case "id":
case ParameterConstants.PatientId:
resultStr = patientValue.PatientId;
break;
case "name":
case ParameterConstants.PatientName:
resultStr = patientValue.PatientName;
break;
case "sex":
case ParameterConstants.PatientSex:
resultStr = patientValue.PatientSex;
break;
case "dob":
case ParameterConstants.PatientDob:
resultStr = patientValue.PatientDob?.ToString("dd/MM/yyyy");
break;
case "age":
case ParameterConstants.PatientAge:
resultStr = patientValue.PatientAge;
break;
case "hospital_id":
case ParameterConstants.PatientHospitalId:
resultStr = patientValue.PatientHospitalId;
break;
default:
Expand Down
4 changes: 2 additions & 2 deletions src/Contracts/Models/TaskExecution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class TaskExecution
[JsonProperty(PropertyName = "output_directory")]
public string OutputDirectory { get; set; }

[JsonProperty(PropertyName = "metadata")]
public Dictionary<string, object> Metadata { get; set; }
[JsonProperty(PropertyName = "result")]
public Dictionary<string, object> ResultMetadata { get; set; }

[JsonProperty(PropertyName = "input_parameters")]
public Dictionary<string, object> InputParameters { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class AideClinicalReviewPlugin : TaskPluginBase, IAsyncDisposable
private string _patientHospitalId;
private string _queueName;
private string _workflowName;
private string _reviewedTaskId;
private string _reviewedExecutionId;

public AideClinicalReviewPlugin(
IServiceScopeFactory serviceScopeFactory,
Expand Down Expand Up @@ -102,6 +104,16 @@ private void Initialize()
{
_workflowName = Event.TaskPluginArguments[Keys.WorkflowName];
}

if (Event.TaskPluginArguments.ContainsKey(Keys.ReviewedExecutionId))
{
_reviewedExecutionId = Event.TaskPluginArguments[Keys.ReviewedExecutionId];
}

if (Event.TaskPluginArguments.ContainsKey(Keys.ReviewedTaskId))
{
_reviewedTaskId = Event.TaskPluginArguments[Keys.ReviewedTaskId];
}
}

private void ValidateEventAndInit()
Expand Down Expand Up @@ -144,6 +156,8 @@ private JsonMessage<ClinicalReviewRequestEvent> GenerateClinicalReviewRequestEve
CorrelationId = Event.CorrelationId,
ExecutionId = Event.ExecutionId,
TaskId = Event.TaskId,
ReviewedTaskId = _reviewedTaskId,
ReviewedExecutionId = _reviewedExecutionId,
WorkflowName = _workflowName,
Files = Event.Inputs,
PatientMetadata = new PatientMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public class ClinicalReviewRequestEvent : EventBase
[JsonProperty(PropertyName = "task_id")]
public string TaskId { get; set; }

/// <summary>
/// Gets or sets the ID of the reviewed task ID.
/// </summary>
[Required]
[JsonProperty(PropertyName = "reviewed_task_id")]
public string ReviewedTaskId { get; set; }

/// <summary>
/// Gets or sets the ID of the reviewed execution ID.
/// </summary>
[Required]
[JsonProperty(PropertyName = "reviewed_execution_id")]
public string ReviewedExecutionId { get; set; }

/// <summary>
/// Gets or sets the correlation ID.
/// </summary>
Expand Down
12 changes: 9 additions & 3 deletions src/TaskManager/Plug-ins/AideClinicalReview/Keys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ public class Keys
public static readonly string WorkflowName = "workflow_name";

/// <summary>
/// Key for the reviewed task details.
/// Key for the reviewed task id.
/// </summary>
public static readonly string ReviewedTaskDetails = "reviewed_task_details";
public static readonly string ReviewedTaskId = "reviewed_task_id";

/// <summary>
/// Key for the reviewed execution id.
/// </summary>
public static readonly string ReviewedExecutionId = "reviewed_execution_id";

/// <summary>
/// Key for the queue name to send the clinical review message.
Expand All @@ -69,7 +74,8 @@ public class Keys
public static readonly IReadOnlyList<string> RequiredParameters =
new List<string> {
QueueName,
WorkflowName
WorkflowName,
ReviewedTaskId
};
}
}
2 changes: 1 addition & 1 deletion src/WorkflowExecuter/Common/EventMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static TaskDispatchEvent ToTaskDispatchEvent(TaskExecution task, Workflow
Inputs = inputs,
Outputs = outputs,
TaskPluginType = task.TaskType,
Metadata = task.Metadata,
Metadata = { },
PayloadId = workflowInstance.PayloadId,
IntermediateStorage = new Messaging.Common.Storage
{
Expand Down
7 changes: 4 additions & 3 deletions src/WorkflowExecuter/Services/WorkflowExecuterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)

if (message.Metadata.Any())
{
currentTask.Metadata = message.Metadata;
currentTask.ResultMetadata = message.Metadata;
}

await HandleOutputArtifacts(workflowInstance, message.Outputs, currentTask);
Expand Down Expand Up @@ -592,9 +592,10 @@ private async Task<TaskExecution> CreateTaskExecutionAsync(TaskObject task,
TaskStartTime = DateTime.UtcNow,
TaskId = task.Id,
Status = TaskExecutionStatus.Created,
InputArtifacts = await _artifactMapper.ConvertArtifactVariablesToPath(task?.Artifacts?.Input ?? new Artifact[] { }, payloadId, workflowInstanceId, bucketName),
Reason = FailureReason.None,
InputArtifacts = await _artifactMapper.ConvertArtifactVariablesToPath(task?.Artifacts?.Input ?? Array.Empty<Artifact>(), payloadId, workflowInstanceId, bucketName),
OutputDirectory = $"{payloadId}/workflows/{workflowInstanceId}/{executionId}",
Metadata = { },
ResultMetadata = { },
InputParameters = newInputParameters,
PreviousTaskId = previousTaskId
};
Expand Down
Loading