15
15
*/
16
16
package io .serverlessworkflow .impl ;
17
17
18
- import static io .serverlessworkflow .impl .JsonUtils .*;
18
+ import static io .serverlessworkflow .impl .json . JsonUtils .*;
19
19
20
- import com .fasterxml .jackson .core .JsonPointer ;
21
20
import com .fasterxml .jackson .databind .JsonNode ;
22
21
import io .serverlessworkflow .api .types .TaskBase ;
23
22
import io .serverlessworkflow .api .types .TaskItem ;
24
23
import io .serverlessworkflow .api .types .Workflow ;
24
+ import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
25
+ import io .serverlessworkflow .impl .executors .TaskExecutor ;
26
+ import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
27
+ import io .serverlessworkflow .impl .json .JsonUtils ;
25
28
import java .util .Collection ;
26
29
import java .util .Collections ;
27
30
import java .util .HashSet ;
@@ -43,7 +46,7 @@ private WorkflowDefinition(
43
46
private final Workflow workflow ;
44
47
private final Collection <WorkflowExecutionListener > listeners ;
45
48
private final TaskExecutorFactory taskFactory ;
46
- private final Map <JsonPointer , TaskExecutor <? extends TaskBase >> taskExecutors =
49
+ private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
47
50
new ConcurrentHashMap <>();
48
51
49
52
public static class Builder {
@@ -94,40 +97,32 @@ enum State {
94
97
95
98
public class WorkflowInstance {
96
99
97
- private final JsonNode input ;
98
100
private JsonNode output ;
99
101
private State state ;
100
-
101
- private JsonPointer currentPos ;
102
+ private WorkflowContext context ;
102
103
103
104
private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
104
- this .input = input ;
105
- this .output = object ();
105
+ this .output = input ;
106
106
this .state = State .STARTED ;
107
- this .currentPos = JsonPointer . compile ( "/" );
107
+ this .context = WorkflowContext . builder ( input ). build ( );
108
108
processDo (workflow .getDo ());
109
109
}
110
110
111
111
private void processDo (List <TaskItem > tasks ) {
112
- currentPos = currentPos . appendProperty ("do" );
112
+ context . position (). addProperty ("do" );
113
113
int index = 0 ;
114
114
for (TaskItem task : tasks ) {
115
- currentPos = currentPos . appendIndex ( index ).appendProperty (task .getName ());
116
- listeners .forEach (l -> l .onTaskStarted (currentPos , task .getTask ()));
115
+ context . position (). addIndex (++ index ).addProperty (task .getName ());
116
+ listeners .forEach (l -> l .onTaskStarted (context . position () , task .getTask ()));
117
117
this .output =
118
- MergeUtils . merge (
119
- taskExecutors
120
- . computeIfAbsent ( currentPos , k -> taskFactory . getTaskExecutor ( task . getTask ()))
121
- . apply ( input ),
122
- output );
123
- listeners .forEach (l -> l .onTaskEnded (currentPos , task .getTask ()));
124
- currentPos = currentPos . head ().head ();
118
+ taskExecutors
119
+ . computeIfAbsent (
120
+ context . position (). jsonPointer (),
121
+ k -> taskFactory . getTaskExecutor ( task . getTask ()))
122
+ . apply ( context , output );
123
+ listeners .forEach (l -> l .onTaskEnded (context . position () , task .getTask ()));
124
+ context . position ().back (). back ();
125
125
}
126
- currentPos = currentPos .head ();
127
- }
128
-
129
- public String currentPos () {
130
- return currentPos .toString ();
131
126
}
132
127
133
128
public State state () {
0 commit comments