Skip to content

Commit 5d6a4b5

Browse files
authored
Merge pull request #521 from fjtirado/fix_listen_bug
Fixing ListenTask
2 parents ff0e817 + ea7f04f commit 5d6a4b5

File tree

3 files changed

+57
-57
lines changed

3 files changed

+57
-57
lines changed

api/src/main/resources/schema/workflow.yaml

+4-8
Original file line numberDiff line numberDiff line change
@@ -499,10 +499,6 @@ $defs:
499499
description: Defines the properties of event to emit.
500500
required: [ source, type ]
501501
additionalProperties: true
502-
cc:
503-
$ref: '#/$defs/endpoint'
504-
title: EmitCarbonCopyDefinition
505-
description: Defines an additional endpoint, if any, to publish an event's carbon copy to.
506502
required: [ event ]
507503
forTask:
508504
type: object
@@ -1343,7 +1339,7 @@ $defs:
13431339
- properties:
13441340
until: false
13451341
title: AnyEventUntilConsumed
1346-
required: [ any ]
1342+
required: [ any ]
13471343
- title: OneEventConsumptionStrategy
13481344
properties:
13491345
one:
@@ -1717,20 +1713,20 @@ $defs:
17171713
- properties:
17181714
amount:
17191715
type: integer
1720-
title: AsyncApiMessageConsumptionPolicyAmount
17211716
description: The amount of (filtered) messages to consume before disposing of the subscription.
1717+
title: AsyncApiMessageConsumptionPolicyAmount
17221718
required: [ amount ]
17231719
- properties:
17241720
while:
17251721
$ref: '#/$defs/runtimeExpression'
1726-
title: AsyncApiMessageConsumptionPolicyWhile
17271722
description: A runtime expression evaluated after each consumed (filtered) message to decide if message consumption should continue.
1723+
title: AsyncApiMessageConsumptionPolicyWhile
17281724
required: [ while ]
17291725
- properties:
17301726
until:
17311727
$ref: '#/$defs/runtimeExpression'
1732-
title: AsyncApiMessageConsumptionPolicyUntil
17331728
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
1729+
title: AsyncApiMessageConsumptionPolicyUntil
17341730
required: [ until ]
17351731
subscriptionIterator:
17361732
type: object

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

+45-49
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,16 @@
4747
import java.util.List;
4848
import java.util.Optional;
4949
import java.util.concurrent.CompletableFuture;
50-
import java.util.concurrent.atomic.AtomicBoolean;
5150
import java.util.function.BiConsumer;
5251
import java.util.function.Function;
5352
import java.util.stream.Collectors;
5453

5554
public abstract class ListenExecutor extends RegularTaskExecutor<ListenTask> {
5655

5756
protected final EventRegistrationBuilderCollection regBuilders;
58-
protected final EventRegistrationBuilderCollection untilRegBuilders;
59-
protected final Optional<WorkflowFilter> until;
6057
protected final Optional<TaskExecutor<?>> loop;
6158
protected final Function<CloudEvent, JsonNode> converter;
6259
protected final EventConsumer eventConsumer;
63-
protected final AtomicBoolean untilEvent = new AtomicBoolean(true);
6460

6561
private static record EventRegistrationBuilderCollection(
6662
Collection<EventRegistrationBuilder> registrations, boolean isAnd) {}
@@ -177,22 +173,37 @@ protected void internalProcessCe(
177173
arrayNode.add(node);
178174
future.complete(node);
179175
}
180-
181-
@Override
182-
protected CompletableFuture<?> combine(CompletableFuture<JsonNode>[] completables) {
183-
return CompletableFuture.allOf(completables);
184-
}
185176
}
186177

187178
public static class OrListenExecutor extends ListenExecutor {
188179

180+
private final Optional<WorkflowFilter> until;
181+
private final EventRegistrationBuilderCollection untilRegBuilders;
182+
189183
public OrListenExecutor(ListenExecutorBuilder builder) {
190184
super(builder);
185+
this.until = Optional.ofNullable(builder.until);
186+
this.untilRegBuilders = builder.untilRegistrations;
191187
}
192188

193189
@Override
194-
protected CompletableFuture<?> combine(CompletableFuture<JsonNode>[] completables) {
195-
return CompletableFuture.anyOf(completables);
190+
protected <T> CompletableFuture<?> buildFuture(
191+
EventRegistrationBuilderCollection regCollection,
192+
Collection<EventRegistration> registrations,
193+
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
194+
CompletableFuture<?> combinedFuture =
195+
super.buildFuture(regCollection, registrations, consumer);
196+
if (untilRegBuilders != null) {
197+
Collection<EventRegistration> untilRegistrations = new ArrayList<>();
198+
CompletableFuture<?> untilFuture =
199+
combine(untilRegBuilders, untilRegistrations, (ce, f) -> f.complete(null));
200+
untilFuture.thenAccept(
201+
v -> {
202+
combinedFuture.complete(null);
203+
untilRegistrations.forEach(reg -> eventConsumer.unregister(reg));
204+
});
205+
}
206+
return combinedFuture;
196207
}
197208

198209
protected void internalProcessCe(
@@ -206,14 +217,12 @@ protected void internalProcessCe(
206217
|| until
207218
.filter(u -> u.apply(workflow, taskContext, arrayNode).asBoolean())
208219
.isPresent())
209-
&& untilEvent.get()) {
220+
&& untilRegBuilders == null) {
210221
future.complete(arrayNode);
211222
}
212223
}
213224
}
214225

215-
protected abstract CompletableFuture<?> combine(CompletableFuture<JsonNode>[] completables);
216-
217226
protected abstract void internalProcessCe(
218227
JsonNode node,
219228
ArrayNode arrayNode,
@@ -226,48 +235,37 @@ protected CompletableFuture<JsonNode> internalExecute(
226235
WorkflowContext workflow, TaskContext taskContext) {
227236
ArrayNode output = JsonUtils.mapper().createArrayNode();
228237
Collection<EventRegistration> registrations = new ArrayList<>();
229-
if (untilRegBuilders != null) {
230-
untilEvent.set(false);
231-
}
232-
CompletableFuture<?> combinedFuture =
233-
combine(
234-
toCompletables(
235-
regBuilders,
236-
registrations,
237-
(ce, future) ->
238-
processCe(converter.apply(ce), output, workflow, taskContext, future)));
239-
CompletableFuture<JsonNode> resultFuture =
240-
combinedFuture.thenApply(
238+
return buildFuture(
239+
regBuilders,
240+
registrations,
241+
(BiConsumer<CloudEvent, CompletableFuture<JsonNode>>)
242+
((ce, future) ->
243+
processCe(converter.apply(ce), output, workflow, taskContext, future)))
244+
.thenApply(
241245
v -> {
242246
registrations.forEach(reg -> eventConsumer.unregister(reg));
243247
return output;
244248
});
245-
if (untilRegBuilders != null) {
246-
Collection<EventRegistration> untilRegistrations = new ArrayList<>();
247-
CompletableFuture<?>[] futures =
248-
toCompletables(
249-
untilRegBuilders, untilRegistrations, (ce, future) -> future.complete(null));
250-
CompletableFuture<?> untilFuture =
251-
untilRegBuilders.isAnd()
252-
? CompletableFuture.allOf(futures)
253-
: CompletableFuture.anyOf(futures);
254-
untilFuture.thenAccept(
255-
v -> {
256-
untilEvent.set(true);
257-
combinedFuture.complete(null);
258-
untilRegistrations.forEach(reg -> eventConsumer.unregister(reg));
259-
});
260-
}
261-
return resultFuture;
262249
}
263250

264-
private <T> CompletableFuture<T>[] toCompletables(
251+
protected <T> CompletableFuture<?> buildFuture(
252+
EventRegistrationBuilderCollection regCollection,
253+
Collection<EventRegistration> registrations,
254+
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
255+
return combine(regCollection, registrations, consumer);
256+
}
257+
258+
protected final <T> CompletableFuture<?> combine(
265259
EventRegistrationBuilderCollection regCollection,
266260
Collection<EventRegistration> registrations,
267261
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
268-
return regCollection.registrations().stream()
269-
.map(reg -> toCompletable(reg, registrations, consumer))
270-
.toArray(size -> new CompletableFuture[size]);
262+
CompletableFuture<T>[] futures =
263+
regCollection.registrations().stream()
264+
.map(reg -> toCompletable(reg, registrations, consumer))
265+
.toArray(size -> new CompletableFuture[size]);
266+
return regCollection.isAnd()
267+
? CompletableFuture.allOf(futures)
268+
: CompletableFuture.anyOf(futures);
271269
}
272270

273271
private <T> CompletableFuture<T> toCompletable(
@@ -307,9 +305,7 @@ protected ListenExecutor(ListenExecutorBuilder builder) {
307305
super(builder);
308306
this.eventConsumer = builder.application.eventConsumer();
309307
this.regBuilders = builder.registrations;
310-
this.until = Optional.ofNullable(builder.until);
311308
this.loop = Optional.ofNullable(builder.loop);
312309
this.converter = builder.converter;
313-
this.untilRegBuilders = builder.untilRegistrations;
314310
}
315311
}

impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ void testUntilConsumed() throws IOException {
7777
emitOutDefinition.instance(Map.of()).start().join();
7878
assertThat(future).isCompleted();
7979
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED);
80+
assertThat(waitingInstance.outputAsJsonNode()).isEqualTo(temperature());
8081
}
8182

8283
private static Stream<Arguments> eventListenerParameters() {
@@ -106,4 +107,11 @@ private static JsonNode doctor() {
106107
node.put("isSick", true);
107108
return mapper.createArrayNode().add(node);
108109
}
110+
111+
private static JsonNode temperature() {
112+
ObjectMapper mapper = JsonUtils.mapper();
113+
ObjectNode node = mapper.createObjectNode();
114+
node.put("temperature", 39);
115+
return mapper.createArrayNode().add(node);
116+
}
109117
}

0 commit comments

Comments
 (0)