Skip to content

Commit b57b31f

Browse files
committed
[Fix serverlessworkflow#484] Removing SortedArrayList
Maybe for larger list that has to be kept sorte that one is useful, but in this case is faster to add everything unsorted (all insertions are really fast) and then sort (there is only one sort of a small amount of items) Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 74fc958 commit b57b31f

19 files changed

+307
-270
lines changed

impl/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,10 @@
5050
<artifactId>assertj-core</artifactId>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>ch.qos.logback</groupId>
55+
<artifactId>logback-classic</artifactId>
56+
<scope>test</scope>
57+
</dependency>
5358
</dependencies>
5459
</project>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.function.BiFunction;
19+
20+
@FunctionalInterface
21+
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,6 @@ public TaskContext(JsonNode input, WorkflowPosition position) {
4141
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
4242
}
4343

44-
public TaskContext<T> copy() {
45-
return new TaskContext<T>(
46-
rawInput,
47-
task,
48-
position.copy(),
49-
startedAt,
50-
input,
51-
output,
52-
rawOutput,
53-
flowDirective,
54-
new HashMap<>(contextVariables));
55-
}
56-
5744
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
5845
this(
5946
input,
@@ -88,6 +75,19 @@ private TaskContext(
8875
this.contextVariables = contextVariables;
8976
}
9077

78+
public TaskContext<T> copy() {
79+
return new TaskContext<T>(
80+
rawInput,
81+
task,
82+
position.copy(),
83+
startedAt,
84+
input,
85+
output,
86+
rawOutput,
87+
flowDirective,
88+
new HashMap<>(contextVariables));
89+
}
90+
9191
public void input(JsonNode input) {
9292
this.input = input;
9393
this.rawOutput = input;

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.fasterxml.jackson.databind.node.NullNode;
22+
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2223
import java.time.Instant;
2324
import java.util.concurrent.atomic.AtomicReference;
2425

@@ -42,7 +43,7 @@ public class WorkflowInstance {
4243
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
4344
state = new AtomicReference<>(WorkflowState.STARTED);
4445
context = new AtomicReference<>(NullNode.getInstance());
45-
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
46+
TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
4647
definition
4748
.outputFilter()
4849
.ifPresent(

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 17 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
import com.fasterxml.jackson.databind.ObjectMapper;
2020
import io.serverlessworkflow.api.WorkflowFormat;
2121
import io.serverlessworkflow.api.types.ExportAs;
22-
import io.serverlessworkflow.api.types.FlowDirective;
2322
import io.serverlessworkflow.api.types.InputFrom;
2423
import io.serverlessworkflow.api.types.OutputAs;
2524
import io.serverlessworkflow.api.types.SchemaExternal;
2625
import io.serverlessworkflow.api.types.SchemaInline;
2726
import io.serverlessworkflow.api.types.SchemaUnion;
28-
import io.serverlessworkflow.api.types.TaskBase;
29-
import io.serverlessworkflow.api.types.TaskItem;
3027
import io.serverlessworkflow.impl.expressions.Expression;
3128
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
3229
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
@@ -38,8 +35,6 @@
3835
import java.io.IOException;
3936
import java.io.InputStream;
4037
import java.io.UncheckedIOException;
41-
import java.util.List;
42-
import java.util.ListIterator;
4338
import java.util.Map;
4439
import java.util.Optional;
4540

@@ -94,18 +89,22 @@ public static Optional<WorkflowFilter> buildWorkflowFilter(
9489

9590
public static StringFilter buildStringFilter(
9691
ExpressionFactory exprFactory, String expression, String literal) {
97-
return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal);
92+
return expression != null
93+
? toString(buildWorkflowFilter(exprFactory, expression))
94+
: toString(literal);
9895
}
9996

10097
public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) {
101-
return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str);
98+
return ExpressionUtils.isExpr(str)
99+
? toString(buildWorkflowFilter(exprFactory, str))
100+
: toString(str);
102101
}
103102

104-
public static StringFilter from(WorkflowFilter filter) {
103+
public static StringFilter toString(WorkflowFilter filter) {
105104
return (w, t) -> filter.apply(w, t, t.input()).asText();
106105
}
107106

108-
private static StringFilter from(String literal) {
107+
private static StringFilter toString(String literal) {
109108
return (w, t) -> literal;
110109
}
111110

@@ -124,76 +123,19 @@ private static WorkflowFilter buildWorkflowFilter(
124123
throw new IllegalStateException("Both object and str are null");
125124
}
126125

127-
private static TaskItem findTaskByName(ListIterator<TaskItem> iter, String taskName) {
128-
int currentIndex = iter.nextIndex();
129-
while (iter.hasPrevious()) {
130-
TaskItem item = iter.previous();
131-
if (item.getName().equals(taskName)) {
132-
return item;
133-
}
134-
}
135-
while (iter.nextIndex() < currentIndex) {
136-
iter.next();
137-
}
138-
while (iter.hasNext()) {
139-
TaskItem item = iter.next();
140-
if (item.getName().equals(taskName)) {
141-
return item;
142-
}
143-
}
144-
throw new IllegalArgumentException("Cannot find task with name " + taskName);
126+
public static LongFilter buildLongFilter(
127+
ExpressionFactory exprFactory, String expression, Long literal) {
128+
return expression != null
129+
? toLong(buildWorkflowFilter(exprFactory, expression))
130+
: toLong(literal);
145131
}
146132

147-
public static void processTaskList(
148-
List<TaskItem> tasks, WorkflowContext context, TaskContext<?> parentTask) {
149-
parentTask.position().addProperty("do");
150-
TaskContext<? extends TaskBase> currentContext = parentTask;
151-
if (!tasks.isEmpty()) {
152-
ListIterator<TaskItem> iter = tasks.listIterator();
153-
TaskItem nextTask = iter.next();
154-
while (nextTask != null) {
155-
TaskItem task = nextTask;
156-
parentTask.position().addIndex(iter.previousIndex());
157-
currentContext = executeTask(context, parentTask, task, currentContext.output());
158-
FlowDirective flowDirective = currentContext.flowDirective();
159-
if (flowDirective.getFlowDirectiveEnum() != null) {
160-
switch (flowDirective.getFlowDirectiveEnum()) {
161-
case CONTINUE:
162-
nextTask = iter.hasNext() ? iter.next() : null;
163-
break;
164-
case END:
165-
context.instance().state(WorkflowState.COMPLETED);
166-
case EXIT:
167-
nextTask = null;
168-
break;
169-
}
170-
} else {
171-
nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString());
172-
}
173-
parentTask.position().back();
174-
}
175-
}
176-
parentTask.position().back();
177-
parentTask.rawOutput(currentContext.output());
133+
public static LongFilter toLong(WorkflowFilter filter) {
134+
return (w, t) -> filter.apply(w, t, t.input()).asLong();
178135
}
179136

180-
public static TaskContext<?> executeTask(
181-
WorkflowContext context, TaskContext<?> parentTask, TaskItem task, JsonNode input) {
182-
parentTask.position().addProperty(task.getName());
183-
TaskContext<?> result =
184-
context
185-
.definition()
186-
.taskExecutors()
187-
.computeIfAbsent(
188-
parentTask.position().jsonPointer(),
189-
k ->
190-
context
191-
.definition()
192-
.taskFactory()
193-
.getTaskExecutor(task.getTask(), context.definition()))
194-
.apply(context, parentTask, input);
195-
parentTask.position().back();
196-
return result;
137+
private static LongFilter toLong(Long literal) {
138+
return (w, t) -> literal;
197139
}
198140

199141
public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public TaskExecutor<? extends TaskBase> getTaskExecutor(
7373
return new TryExecutor(task.getTryTask(), definition);
7474
} else if (task.getForkTask() != null) {
7575
return new ForkExecutor(task.getForkTask(), definition);
76+
} else if (task.getWaitTask() != null) {
77+
return new WaitExecutor(task.getWaitTask(), definition);
7678
}
7779
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
7880
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.serverlessworkflow.impl.TaskContext;
2020
import io.serverlessworkflow.impl.WorkflowContext;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22-
import io.serverlessworkflow.impl.WorkflowUtils;
2322

2423
public class DoExecutor extends AbstractTaskExecutor<DoTask> {
2524

@@ -29,6 +28,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) {
2928

3029
@Override
3130
protected void internalExecute(WorkflowContext workflow, TaskContext<DoTask> taskContext) {
32-
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
31+
TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext);
3332
}
3433
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForTask> ta
5252
JsonNode item = iter.next();
5353
taskContext.variables().put(task.getFor().getEach(), item);
5454
taskContext.variables().put(task.getFor().getAt(), i++);
55-
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
55+
TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext);
5656
}
5757
}
5858
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.executors;
1717

18+
import com.fasterxml.jackson.databind.JsonNode;
1819
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
1920
import io.serverlessworkflow.api.types.ForkTask;
2021
import io.serverlessworkflow.api.types.ForkTaskConfiguration;
@@ -23,17 +24,16 @@
2324
import io.serverlessworkflow.impl.WorkflowContext;
2425
import io.serverlessworkflow.impl.WorkflowDefinition;
2526
import io.serverlessworkflow.impl.WorkflowState;
26-
import io.serverlessworkflow.impl.WorkflowUtils;
27-
import io.serverlessworkflow.impl.generic.SortedArrayList;
2827
import io.serverlessworkflow.impl.json.JsonUtils;
2928
import java.lang.reflect.UndeclaredThrowableException;
29+
import java.util.ArrayList;
3030
import java.util.HashMap;
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.ExecutorService;
3535
import java.util.concurrent.Future;
36-
import java.util.stream.Collectors;
36+
import java.util.stream.Stream;
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

@@ -47,8 +47,6 @@ protected ForkExecutor(ForkTask task, WorkflowDefinition definition) {
4747
service = definition.executorService();
4848
}
4949

50-
private record BranchContext(String taskName, TaskContext<?> taskContext) {}
51-
5250
@Override
5351
protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> taskContext) {
5452
ForkTaskConfiguration forkConfig = task.getFork();
@@ -62,13 +60,10 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
6260
item.getName(),
6361
service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i)));
6462
}
65-
List<BranchContext> results =
66-
new SortedArrayList<>(
67-
(arg1, arg2) ->
68-
arg1.taskContext.completedAt().compareTo(arg2.taskContext.completedAt()));
63+
List<Map.Entry<String, TaskContext<?>>> results = new ArrayList<>();
6964
for (Map.Entry<String, Future<TaskContext<?>>> entry : futures.entrySet()) {
7065
try {
71-
results.add(new BranchContext(entry.getKey(), entry.getValue().get()));
66+
results.add(Map.entry(entry.getKey(), entry.getValue().get()));
7267
} catch (ExecutionException ex) {
7368
Throwable cause = ex.getCause();
7469
if (cause instanceof RuntimeException) {
@@ -77,24 +72,25 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForkTask> t
7772
throw new UndeclaredThrowableException(ex);
7873
}
7974
} catch (InterruptedException ex) {
80-
logger.warn(
81-
"Thred executing branch {} was interrupted, this branch will be ignored",
82-
entry.getKey(),
83-
ex);
75+
logger.warn("Branch {} was interrupted, no result will be recorded", entry.getKey(), ex);
8476
}
8577
}
8678
if (!results.isEmpty()) {
79+
Stream<Map.Entry<String, TaskContext<?>>> sortedStream =
80+
results.stream()
81+
.sorted(
82+
(arg1, arg2) ->
83+
arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt()));
8784
taskContext.rawOutput(
8885
forkConfig.isCompete()
89-
? results.get(0).taskContext().output()
90-
: JsonUtils.fromValue(
91-
results.stream()
92-
.map(
93-
e ->
94-
JsonUtils.mapper()
95-
.createObjectNode()
96-
.set(e.taskName(), e.taskContext().output()))
97-
.collect(Collectors.toList())));
86+
? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow()
87+
: sortedStream
88+
.<JsonNode>map(
89+
e ->
90+
JsonUtils.mapper()
91+
.createObjectNode()
92+
.set(e.getKey(), e.getValue().output()))
93+
.collect(JsonUtils.arrayNodeCollector()));
9894
}
9995
}
10096
}
@@ -103,7 +99,7 @@ private TaskContext<?> executeBranch(
10399
WorkflowContext workflow, TaskContext<ForkTask> taskContext, TaskItem taskItem, int index) {
104100
taskContext.position().addIndex(index);
105101
TaskContext<?> result =
106-
WorkflowUtils.executeTask(workflow, taskContext, taskItem, taskContext.input());
102+
TaskExecutorHelper.executeTask(workflow, taskContext, taskItem, taskContext.input());
107103
if (result.flowDirective() != null
108104
&& result.flowDirective().getFlowDirectiveEnum() == FlowDirectiveEnum.END) {
109105
workflow.instance().state(WorkflowState.COMPLETED);

0 commit comments

Comments
 (0)