Skip to content

Commit 96000ad

Browse files
authored
Merge pull request #511 from fjtirado/Fix_#505
[Fix #505] Switching to CompletableFuture
2 parents a2ca9f7 + 8101d81 commit 96000ad

35 files changed

+1154
-559
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
import java.util.function.BiFunction;
1919

2020
@FunctionalInterface
21-
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}
21+
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext, Long> {}

impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
import java.util.function.BiFunction;
1919

2020
@FunctionalInterface
21-
public interface StringFilter extends BiFunction<WorkflowContext, TaskContext<?>, String> {}
21+
public interface StringFilter extends BiFunction<WorkflowContext, TaskContext, String> {}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

+52-54
Original file line numberDiff line numberDiff line change
@@ -16,76 +16,64 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
19-
import io.serverlessworkflow.api.types.FlowDirective;
20-
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
2119
import io.serverlessworkflow.api.types.TaskBase;
20+
import io.serverlessworkflow.impl.executors.TransitionInfo;
2221
import java.time.Instant;
2322
import java.util.HashMap;
2423
import java.util.Map;
24+
import java.util.Optional;
2525

26-
public class TaskContext<T extends TaskBase> {
26+
public class TaskContext {
2727

2828
private final JsonNode rawInput;
29-
private final T task;
29+
private final TaskBase task;
3030
private final WorkflowPosition position;
3131
private final Instant startedAt;
32+
private final String taskName;
33+
private final Map<String, Object> contextVariables;
34+
private final Optional<TaskContext> parentContext;
3235

3336
private JsonNode input;
3437
private JsonNode output;
3538
private JsonNode rawOutput;
36-
private FlowDirective flowDirective;
37-
private Map<String, Object> contextVariables;
3839
private Instant completedAt;
40+
private TransitionInfo transition;
3941

40-
public TaskContext(JsonNode input, WorkflowPosition position) {
41-
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
42-
}
43-
44-
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
45-
this(
46-
input,
47-
task,
48-
taskContext.position,
49-
Instant.now(),
50-
input,
51-
input,
52-
input,
53-
task.getThen(),
54-
new HashMap<>(taskContext.variables()));
42+
public TaskContext(
43+
JsonNode input,
44+
WorkflowPosition position,
45+
Optional<TaskContext> parentContext,
46+
String taskName,
47+
TaskBase task) {
48+
this(input, parentContext, taskName, task, position, Instant.now(), input, input, input);
5549
}
5650

5751
private TaskContext(
5852
JsonNode rawInput,
59-
T task,
53+
Optional<TaskContext> parentContext,
54+
String taskName,
55+
TaskBase task,
6056
WorkflowPosition position,
6157
Instant startedAt,
6258
JsonNode input,
6359
JsonNode output,
64-
JsonNode rawOutput,
65-
FlowDirective flowDirective,
66-
Map<String, Object> contextVariables) {
60+
JsonNode rawOutput) {
6761
this.rawInput = rawInput;
62+
this.parentContext = parentContext;
63+
this.taskName = taskName;
6864
this.task = task;
6965
this.position = position;
7066
this.startedAt = startedAt;
7167
this.input = input;
7268
this.output = output;
7369
this.rawOutput = rawOutput;
74-
this.flowDirective = flowDirective;
75-
this.contextVariables = contextVariables;
70+
this.contextVariables =
71+
parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new);
7672
}
7773

78-
public TaskContext<T> copy() {
79-
return new TaskContext<T>(
80-
rawInput,
81-
task,
82-
position.copy(),
83-
startedAt,
84-
input,
85-
output,
86-
rawOutput,
87-
flowDirective,
88-
new HashMap<>(contextVariables));
74+
public TaskContext copy() {
75+
return new TaskContext(
76+
rawInput, parentContext, taskName, task, position, startedAt, input, output, rawOutput);
8977
}
9078

9179
public void input(JsonNode input) {
@@ -102,54 +90,64 @@ public JsonNode rawInput() {
10290
return rawInput;
10391
}
10492

105-
public T task() {
93+
public TaskBase task() {
10694
return task;
10795
}
10896

109-
public void rawOutput(JsonNode output) {
97+
public TaskContext rawOutput(JsonNode output) {
11098
this.rawOutput = output;
11199
this.output = output;
100+
return this;
112101
}
113102

114103
public JsonNode rawOutput() {
115104
return rawOutput;
116105
}
117106

118-
public void output(JsonNode output) {
107+
public TaskContext output(JsonNode output) {
119108
this.output = output;
109+
return this;
120110
}
121111

122112
public JsonNode output() {
123113
return output;
124114
}
125115

126-
public void flowDirective(FlowDirective flowDirective) {
127-
this.flowDirective = flowDirective;
128-
}
129-
130-
public FlowDirective flowDirective() {
131-
return flowDirective == null
132-
? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE)
133-
: flowDirective;
116+
public WorkflowPosition position() {
117+
return position;
134118
}
135119

136120
public Map<String, Object> variables() {
137121
return contextVariables;
138122
}
139123

140-
public WorkflowPosition position() {
141-
return position;
142-
}
143-
144124
public Instant startedAt() {
145125
return startedAt;
146126
}
147127

148-
public void completedAt(Instant instant) {
128+
public Optional<TaskContext> parent() {
129+
return parentContext;
130+
}
131+
132+
public String taskName() {
133+
return taskName;
134+
}
135+
136+
public TaskContext completedAt(Instant instant) {
149137
this.completedAt = instant;
138+
return this;
150139
}
151140

152141
public Instant completedAt() {
153142
return completedAt;
154143
}
144+
145+
public TransitionInfo transition() {
146+
return transition;
147+
}
148+
149+
public TaskContext transition(TransitionInfo transition) {
150+
this.transition = transition;
151+
return this;
152+
}
155153
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
public class WorkflowContext {
2121
private final WorkflowDefinition definition;
2222
private final WorkflowInstance instance;
23+
private JsonNode context;
2324

2425
WorkflowContext(WorkflowDefinition definition, WorkflowInstance instance) {
2526
this.definition = definition;
@@ -31,11 +32,11 @@ public WorkflowInstance instance() {
3132
}
3233

3334
public JsonNode context() {
34-
return instance.context();
35+
return context;
3536
}
3637

3738
public void context(JsonNode context) {
38-
this.instance.context(context);
39+
this.context = context;
3940
}
4041

4142
public WorkflowDefinition definition() {

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

+13-39
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,15 @@
1919

2020
import io.serverlessworkflow.api.types.Input;
2121
import io.serverlessworkflow.api.types.Output;
22-
import io.serverlessworkflow.api.types.TaskBase;
2322
import io.serverlessworkflow.api.types.Workflow;
2423
import io.serverlessworkflow.impl.executors.TaskExecutor;
25-
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
26-
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
24+
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2725
import io.serverlessworkflow.impl.json.JsonUtils;
2826
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
29-
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
3027
import io.serverlessworkflow.impl.resources.ResourceLoader;
3128
import java.nio.file.Path;
3229
import java.util.Collection;
33-
import java.util.Map;
3430
import java.util.Optional;
35-
import java.util.concurrent.ConcurrentHashMap;
36-
import java.util.concurrent.ExecutorService;
3731

3832
public class WorkflowDefinition implements AutoCloseable {
3933

@@ -42,16 +36,13 @@ public class WorkflowDefinition implements AutoCloseable {
4236
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
4337
private Optional<WorkflowFilter> inputFilter = Optional.empty();
4438
private Optional<WorkflowFilter> outputFilter = Optional.empty();
45-
private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
46-
new ConcurrentHashMap<>();
47-
private final ResourceLoader resourceLoader;
4839
private final WorkflowApplication application;
40+
private final TaskExecutor<?> taskExecutor;
4941

5042
private WorkflowDefinition(
5143
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
5244
this.workflow = workflow;
5345
this.application = application;
54-
this.resourceLoader = resourceLoader;
5546
if (workflow.getInput() != null) {
5647
Input input = workflow.getInput();
5748
this.inputSchemaValidator =
@@ -64,6 +55,13 @@ private WorkflowDefinition(
6455
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
6556
this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs());
6657
}
58+
this.taskExecutor =
59+
TaskExecutorHelper.createExecutorList(
60+
application.positionFactory().get(),
61+
workflow.getDo(),
62+
workflow,
63+
application,
64+
resourceLoader);
6765
}
6866

6967
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
@@ -83,6 +81,10 @@ public Optional<SchemaValidator> inputSchemaValidator() {
8381
return inputSchemaValidator;
8482
}
8583

84+
public TaskExecutor<?> startTask() {
85+
return taskExecutor;
86+
}
87+
8688
public Optional<WorkflowFilter> inputFilter() {
8789
return inputFilter;
8890
}
@@ -95,14 +97,6 @@ public Collection<WorkflowExecutionListener> listeners() {
9597
return application.listeners();
9698
}
9799

98-
public Map<String, TaskExecutor<? extends TaskBase>> taskExecutors() {
99-
return taskExecutors;
100-
}
101-
102-
public TaskExecutorFactory taskFactory() {
103-
return application.taskFactory();
104-
}
105-
106100
public Optional<WorkflowFilter> outputFilter() {
107101
return outputFilter;
108102
}
@@ -115,26 +109,6 @@ public Optional<SchemaValidator> outputSchemaValidator() {
115109
return outputSchemaValidator;
116110
}
117111

118-
public ExpressionFactory expressionFactory() {
119-
return application.expressionFactory();
120-
}
121-
122-
public SchemaValidatorFactory validatorFactory() {
123-
return application.validatorFactory();
124-
}
125-
126-
public ResourceLoader resourceLoader() {
127-
return resourceLoader;
128-
}
129-
130-
public WorkflowPositionFactory positionFactory() {
131-
return application.positionFactory();
132-
}
133-
134-
public ExecutorService executorService() {
135-
return application.executorService();
136-
}
137-
138112
public RuntimeDescriptorFactory runtimeDescriptorFactory() {
139113
return application.runtimeDescriptorFactory();
140114
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ public static Builder error(String type, int status) {
2626
return new Builder(type, status);
2727
}
2828

29-
public static Builder communication(int status, TaskContext<?> context, Exception ex) {
29+
public static Builder communication(int status, TaskContext context, Exception ex) {
3030
return new Builder(COMM_TYPE, status)
3131
.instance(context.position().jsonPointer())
3232
.title(ex.getMessage());
3333
}
3434

35-
public static Builder runtime(int status, TaskContext<?> context, Exception ex) {
35+
public static Builder runtime(int status, TaskContext context, Exception ex) {
3636
return new Builder(RUNTIME_TYPE, status)
3737
.instance(context.position().jsonPointer())
3838
.title(ex.getMessage());

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@
1919

2020
@FunctionalInterface
2121
public interface WorkflowFilter {
22-
JsonNode apply(WorkflowContext workflow, TaskContext<?> task, JsonNode node);
22+
JsonNode apply(WorkflowContext workflow, TaskContext task, JsonNode node);
2323
}

0 commit comments

Comments
 (0)