Skip to content

Commit 91c129a

Browse files
98 support router tasks (#217)
* support router tasks Signed-off-by: Jack Schofield <[email protected]>
1 parent 9bf0307 commit 91c129a

File tree

9 files changed

+1283
-935
lines changed

9 files changed

+1283
-935
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace Monai.Deploy.WorkflowManager.Contracts.Constants
2+
{
3+
public static class TaskTypeConstants
4+
{
5+
public const string RouterTask = "router";
6+
}
7+
}

src/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Monai.Deploy.WorkflowManager.Common.Interfaces;
1414
using Monai.Deploy.WorkflowManager.ConditionsResolver.Parser;
1515
using Monai.Deploy.WorkflowManager.Configuration;
16+
using Monai.Deploy.WorkflowManager.Contracts.Constants;
1617
using Monai.Deploy.WorkflowManager.Contracts.Models;
1718
using Monai.Deploy.WorkflowManager.Database.Interfaces;
1819
using Monai.Deploy.WorkflowManager.Logging.Logging;
@@ -125,6 +126,15 @@ await _workflowRepository.GetWorkflowsByAeTitleAsync(message.CalledAeTitle) as L
125126
continue;
126127
}
127128

129+
if (string.Equals(task.TaskType, TaskTypeConstants.RouterTask, StringComparison.InvariantCultureIgnoreCase))
130+
{
131+
var workflow = await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId);
132+
133+
await HandleTaskDestinations(workflowInstance, workflow, task, message.CorrelationId);
134+
135+
continue;
136+
}
137+
128138
if (task.Status != TaskExecutionStatus.Created)
129139
{
130140
_logger.TaskPreviouslyDispatched(workflowInstance.PayloadId, task.TaskId);
@@ -213,20 +223,7 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)
213223
_logger.ExportFilesNotFound(currentTask.TaskId, workflowInstance.Id);
214224
}
215225

216-
var newTaskExecutions = await CreateTaskDestinations(workflowInstance, workflow, message.TaskId);
217-
218-
if (!newTaskExecutions.Any())
219-
{
220-
await UpdateWorkflowInstanceStatus(workflowInstance, message.TaskId, message.Status);
221-
222-
return await CompleteTask(currentTask, workflowInstance, message.CorrelationId, message.Status);
223-
}
224-
225-
var processed = await HandleTaskDestinations(workflowInstance, message.CorrelationId, newTaskExecutions);
226-
227-
processed &= await CompleteTask(currentTask, workflowInstance, message.CorrelationId, message.Status);
228-
229-
return processed;
226+
return await HandleTaskDestinations(workflowInstance, workflow, currentTask, message.CorrelationId);
230227
}
231228

232229
public async Task<bool> ProcessExportComplete(ExportCompleteEvent message, string correlationId)
@@ -251,20 +248,7 @@ public async Task<bool> ProcessExportComplete(ExportCompleteEvent message, strin
251248
return false;
252249
}
253250

254-
var newTaskExecutions = await CreateTaskDestinations(workflowInstance, workflow, task.TaskId);
255-
256-
if (!newTaskExecutions.Any())
257-
{
258-
await UpdateWorkflowInstanceStatus(workflowInstance, task.TaskId, TaskExecutionStatus.Succeeded);
259-
260-
return await CompleteTask(task, workflowInstance, correlationId, TaskExecutionStatus.Succeeded);
261-
}
262-
263-
var processed = await HandleTaskDestinations(workflowInstance, correlationId, newTaskExecutions);
264-
265-
processed &= await CompleteTask(task, workflowInstance, correlationId, TaskExecutionStatus.Succeeded);
266-
267-
return processed;
251+
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
268252
}
269253

270254
if ((message.Status.Equals(ExportStatus.Failure) || message.Status.Equals(ExportStatus.PartialFailure)) &&
@@ -349,7 +333,7 @@ private async Task<bool> HandleOutputArtifacts(WorkflowInstance workflowInstance
349333
return true;
350334
}
351335

352-
private async Task<bool> HandleTaskDestinations(WorkflowInstance workflowInstance, string correlationId, IList<TaskExecution> taskExecutions)
336+
private async Task<bool> DispatchTaskDestinations(WorkflowInstance workflowInstance, WorkflowRevision workflow, string correlationId, IList<TaskExecution> taskExecutions)
353337
{
354338
workflowInstance.Tasks?.AddRange(taskExecutions);
355339

@@ -362,9 +346,23 @@ private async Task<bool> HandleTaskDestinations(WorkflowInstance workflowInstanc
362346

363347
foreach (var taskExec in taskExecutions)
364348
{
349+
if (string.Equals(taskExec.TaskType, TaskTypeConstants.RouterTask, StringComparison.InvariantCultureIgnoreCase))
350+
{
351+
processed &= await HandleTaskDestinations(workflowInstance, workflow, taskExec, correlationId);
352+
353+
if (processed is false)
354+
{
355+
continue;
356+
}
357+
358+
await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, taskExec.TaskId, TaskExecutionStatus.Succeeded);
359+
360+
continue;
361+
}
362+
365363
processed &= await DispatchTask(workflowInstance, taskExec, correlationId);
366364

367-
if (!processed)
365+
if (processed is false)
368366
{
369367
continue;
370368
}
@@ -375,6 +373,22 @@ private async Task<bool> HandleTaskDestinations(WorkflowInstance workflowInstanc
375373
return processed;
376374
}
377375

376+
private async Task<bool> HandleTaskDestinations(WorkflowInstance workflowInstance, WorkflowRevision workflow, TaskExecution task, string correlationId)
377+
{
378+
var newTaskExecutions = await CreateTaskDestinations(workflowInstance, workflow, task.TaskId);
379+
380+
if (newTaskExecutions.Any() is false)
381+
{
382+
await UpdateWorkflowInstanceStatus(workflowInstance, task.TaskId, TaskExecutionStatus.Succeeded);
383+
384+
return await CompleteTask(task, workflowInstance, correlationId, TaskExecutionStatus.Succeeded);
385+
}
386+
387+
await DispatchTaskDestinations(workflowInstance, workflow, correlationId, newTaskExecutions);
388+
389+
return await CompleteTask(task, workflowInstance, correlationId, TaskExecutionStatus.Succeeded);
390+
}
391+
378392
private async Task<List<TaskExecution>> CreateTaskDestinations(WorkflowInstance workflowInstance, WorkflowRevision workflow, string taskId)
379393
{
380394
var currentTaskDestinations = workflow.Workflow?.Tasks?.SingleOrDefault(t => t.Id == taskId)?.TaskDestinations;

src/WorkflowManager/appsettings.Development.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
"accessToken": "minioadmin",
1515
"bucket": "test-bucket",
1616
"region": "eu-west-2",
17-
"securedConnection": false
17+
"securedConnection": false,
18+
"executableLocation": "/.",
19+
"serviceName": "serviceName"
1820
}
1921
},
2022
"messaging": {

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/WorkflowRequest.feature

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ Scenario Outline: Publish a valid workflow request which creates a single workfl
1414
| Basic_Workflow_1 | Basic_AeTitle_WF_Request |
1515
| Basic_Workflow_1 | Basic_Id_WF_Request |
1616

17+
@WorkflowRequest
18+
Scenario Outline: Publish a valid workflow request which creates a single workflow instance with routing task
19+
Given I have a clinical workflow <workflow>
20+
And I have a bucket in MinIO bucket1
21+
When I publish a Workflow Request Message <workflowRequestMessage>
22+
Then I can see 1 Workflow Instance is created
23+
And 2 Task Dispatch event is published
24+
Examples:
25+
| workflow | workflowRequestMessage |
26+
| Routing_Workflow_1 | Routing_Id_WF_Request |
27+
1728
@WorkflowRequest
1829
Scenario Outline: Publish a valid workflow request which creates multiple workflow instances
1930
Given I have a clinical workflow <workflow_1>

0 commit comments

Comments
 (0)