Skip to content

Commit 9ea16cd

Browse files
committed
[Fix #484] Removing SortedArrayList
Maybe for larger list that has to be kept sorte that one is useful, but in this case is faster to add everything unsorted (all insertions are really fast) and then sort (there is only one sort of a small amount of items) Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 74fc958 commit 9ea16cd

21 files changed

+314
-283
lines changed

impl/core/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,10 @@
5050
<artifactId>assertj-core</artifactId>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>ch.qos.logback</groupId>
55+
<artifactId>logback-classic</artifactId>
56+
<scope>test</scope>
57+
</dependency>
5358
</dependencies>
5459
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import java.util.function.BiFunction;
19+
20+
@FunctionalInterface
21+
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,6 @@ public TaskContext(JsonNode input, WorkflowPosition position) {
4141
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
4242
}
4343

44-
public TaskContext<T> copy() {
45-
return new TaskContext<T>(
46-
rawInput,
47-
task,
48-
position.copy(),
49-
startedAt,
50-
input,
51-
output,
52-
rawOutput,
53-
flowDirective,
54-
new HashMap<>(contextVariables));
55-
}
56-
5744
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
5845
this(
5946
input,
@@ -88,6 +75,19 @@ private TaskContext(
8875
this.contextVariables = contextVariables;
8976
}
9077

78+
public TaskContext<T> copy() {
79+
return new TaskContext<T>(
80+
rawInput,
81+
task,
82+
position.copy(),
83+
startedAt,
84+
input,
85+
output,
86+
rawOutput,
87+
flowDirective,
88+
new HashMap<>(contextVariables));
89+
}
90+
9191
public void input(JsonNode input) {
9292
this.input = input;
9393
this.rawOutput = input;

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,19 @@ public class WorkflowDefinition implements AutoCloseable {
4949

5050
private WorkflowDefinition(
5151
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
52-
5352
this.workflow = workflow;
5453
this.application = application;
5554
this.resourceLoader = resourceLoader;
5655
if (workflow.getInput() != null) {
5756
Input input = workflow.getInput();
5857
this.inputSchemaValidator =
59-
getSchemaValidator(
60-
application.validatorFactory(), schemaToNode(resourceLoader, input.getSchema()));
58+
getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema());
6159
this.inputFilter = buildWorkflowFilter(application.expressionFactory(), input.getFrom());
6260
}
6361
if (workflow.getOutput() != null) {
6462
Output output = workflow.getOutput();
6563
this.outputSchemaValidator =
66-
getSchemaValidator(
67-
application.validatorFactory(), schemaToNode(resourceLoader, output.getSchema()));
64+
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
6865
this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs());
6966
}
7067
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.fasterxml.jackson.databind.node.NullNode;
22+
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2223
import java.time.Instant;
2324
import java.util.concurrent.atomic.AtomicReference;
2425

@@ -42,7 +43,7 @@ public class WorkflowInstance {
4243
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
4344
state = new AtomicReference<>(WorkflowState.STARTED);
4445
context = new AtomicReference<>(NullNode.getInstance());
45-
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
46+
TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
4647
definition
4748
.outputFilter()
4849
.ifPresent(

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

+19-77
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
import com.fasterxml.jackson.databind.ObjectMapper;
2020
import io.serverlessworkflow.api.WorkflowFormat;
2121
import io.serverlessworkflow.api.types.ExportAs;
22-
import io.serverlessworkflow.api.types.FlowDirective;
2322
import io.serverlessworkflow.api.types.InputFrom;
2423
import io.serverlessworkflow.api.types.OutputAs;
2524
import io.serverlessworkflow.api.types.SchemaExternal;
2625
import io.serverlessworkflow.api.types.SchemaInline;
2726
import io.serverlessworkflow.api.types.SchemaUnion;
28-
import io.serverlessworkflow.api.types.TaskBase;
29-
import io.serverlessworkflow.api.types.TaskItem;
3027
import io.serverlessworkflow.impl.expressions.Expression;
3128
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
3229
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
@@ -38,8 +35,6 @@
3835
import java.io.IOException;
3936
import java.io.InputStream;
4037
import java.io.UncheckedIOException;
41-
import java.util.List;
42-
import java.util.ListIterator;
4338
import java.util.Map;
4439
import java.util.Optional;
4540

@@ -48,8 +43,8 @@ public class WorkflowUtils {
4843
private WorkflowUtils() {}
4944

5045
public static Optional<SchemaValidator> getSchemaValidator(
51-
SchemaValidatorFactory validatorFactory, Optional<JsonNode> node) {
52-
return node.map(n -> validatorFactory.getValidator(n));
46+
SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) {
47+
return schemaToNode(resourceLoader, schema).map(n -> validatorFactory.getValidator(n));
5348
}
5449

5550
public static Optional<JsonNode> schemaToNode(ResourceLoader resourceLoader, SchemaUnion schema) {
@@ -94,18 +89,22 @@ public static Optional<WorkflowFilter> buildWorkflowFilter(
9489

9590
public static StringFilter buildStringFilter(
9691
ExpressionFactory exprFactory, String expression, String literal) {
97-
return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal);
92+
return expression != null
93+
? toString(buildWorkflowFilter(exprFactory, expression))
94+
: toString(literal);
9895
}
9996

10097
public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) {
101-
return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str);
98+
return ExpressionUtils.isExpr(str)
99+
? toString(buildWorkflowFilter(exprFactory, str))
100+
: toString(str);
102101
}
103102

104-
public static StringFilter from(WorkflowFilter filter) {
103+
private static StringFilter toString(WorkflowFilter filter) {
105104
return (w, t) -> filter.apply(w, t, t.input()).asText();
106105
}
107106

108-
private static StringFilter from(String literal) {
107+
private static StringFilter toString(String literal) {
109108
return (w, t) -> literal;
110109
}
111110

@@ -124,76 +123,19 @@ private static WorkflowFilter buildWorkflowFilter(
124123
throw new IllegalStateException("Both object and str are null");
125124
}
126125

127-
private static TaskItem findTaskByName(ListIterator<TaskItem> iter, String taskName) {
128-
int currentIndex = iter.nextIndex();
129-
while (iter.hasPrevious()) {
130-
TaskItem item = iter.previous();
131-
if (item.getName().equals(taskName)) {
132-
return item;
133-
}
134-
}
135-
while (iter.nextIndex() < currentIndex) {
136-
iter.next();
137-
}
138-
while (iter.hasNext()) {
139-
TaskItem item = iter.next();
140-
if (item.getName().equals(taskName)) {
141-
return item;
142-
}
143-
}
144-
throw new IllegalArgumentException("Cannot find task with name " + taskName);
126+
public static LongFilter buildLongFilter(
127+
ExpressionFactory exprFactory, String expression, Long literal) {
128+
return expression != null
129+
? toLong(buildWorkflowFilter(exprFactory, expression))
130+
: toLong(literal);
145131
}
146132

147-
public static void processTaskList(
148-
List<TaskItem> tasks, WorkflowContext context, TaskContext<?> parentTask) {
149-
parentTask.position().addProperty("do");
150-
TaskContext<? extends TaskBase> currentContext = parentTask;
151-
if (!tasks.isEmpty()) {
152-
ListIterator<TaskItem> iter = tasks.listIterator();
153-
TaskItem nextTask = iter.next();
154-
while (nextTask != null) {
155-
TaskItem task = nextTask;
156-
parentTask.position().addIndex(iter.previousIndex());
157-
currentContext = executeTask(context, parentTask, task, currentContext.output());
158-
FlowDirective flowDirective = currentContext.flowDirective();
159-
if (flowDirective.getFlowDirectiveEnum() != null) {
160-
switch (flowDirective.getFlowDirectiveEnum()) {
161-
case CONTINUE:
162-
nextTask = iter.hasNext() ? iter.next() : null;
163-
break;
164-
case END:
165-
context.instance().state(WorkflowState.COMPLETED);
166-
case EXIT:
167-
nextTask = null;
168-
break;
169-
}
170-
} else {
171-
nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString());
172-
}
173-
parentTask.position().back();
174-
}
175-
}
176-
parentTask.position().back();
177-
parentTask.rawOutput(currentContext.output());
133+
private static LongFilter toLong(WorkflowFilter filter) {
134+
return (w, t) -> filter.apply(w, t, t.input()).asLong();
178135
}
179136

180-
public static TaskContext<?> executeTask(
181-
WorkflowContext context, TaskContext<?> parentTask, TaskItem task, JsonNode input) {
182-
parentTask.position().addProperty(task.getName());
183-
TaskContext<?> result =
184-
context
185-
.definition()
186-
.taskExecutors()
187-
.computeIfAbsent(
188-
parentTask.position().jsonPointer(),
189-
k ->
190-
context
191-
.definition()
192-
.taskFactory()
193-
.getTaskExecutor(task.getTask(), context.definition()))
194-
.apply(context, parentTask, input);
195-
parentTask.position().back();
196-
return result;
137+
private static LongFilter toLong(Long literal) {
138+
return (w, t) -> literal;
197139
}
198140

199141
public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ private void buildInputProcessors(WorkflowDefinition definition) {
5555
this.inputProcessor = buildWorkflowFilter(definition.expressionFactory(), input.getFrom());
5656
this.inputSchemaValidator =
5757
getSchemaValidator(
58-
definition.validatorFactory(),
59-
schemaToNode(definition.resourceLoader(), input.getSchema()));
58+
definition.validatorFactory(), definition.resourceLoader(), input.getSchema());
6059
}
6160
}
6261

@@ -66,8 +65,7 @@ private void buildOutputProcessors(WorkflowDefinition definition) {
6665
this.outputProcessor = buildWorkflowFilter(definition.expressionFactory(), output.getAs());
6766
this.outputSchemaValidator =
6867
getSchemaValidator(
69-
definition.validatorFactory(),
70-
schemaToNode(definition.resourceLoader(), output.getSchema()));
68+
definition.validatorFactory(), definition.resourceLoader(), output.getSchema());
7169
}
7270
}
7371

@@ -79,8 +77,7 @@ private void buildContextProcessors(WorkflowDefinition definition) {
7977
}
8078
this.contextSchemaValidator =
8179
getSchemaValidator(
82-
definition.validatorFactory(),
83-
schemaToNode(definition.resourceLoader(), export.getSchema()));
80+
definition.validatorFactory(), definition.resourceLoader(), export.getSchema());
8481
}
8582
}
8683

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public TaskExecutor<? extends TaskBase> getTaskExecutor(
7373
return new TryExecutor(task.getTryTask(), definition);
7474
} else if (task.getForkTask() != null) {
7575
return new ForkExecutor(task.getForkTask(), definition);
76+
} else if (task.getWaitTask() != null) {
77+
return new WaitExecutor(task.getWaitTask(), definition);
7678
}
7779
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
7880
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.serverlessworkflow.impl.TaskContext;
2020
import io.serverlessworkflow.impl.WorkflowContext;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22-
import io.serverlessworkflow.impl.WorkflowUtils;
2322

2423
public class DoExecutor extends AbstractTaskExecutor<DoTask> {
2524

@@ -29,6 +28,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) {
2928

3029
@Override
3130
protected void internalExecute(WorkflowContext workflow, TaskContext<DoTask> taskContext) {
32-
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
31+
TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext);
3332
}
3433
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForTask> ta
5252
JsonNode item = iter.next();
5353
taskContext.variables().put(task.getFor().getEach(), item);
5454
taskContext.variables().put(task.getFor().getAt(), i++);
55-
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
55+
TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext);
5656
}
5757
}
5858
}

0 commit comments

Comments
 (0)