Skip to content

Commit 5df0ef6

Browse files
committed
Fix Race in ReactiveStreamsTests
Messages sent/published before the async task has fully set up, thus it does not receive the messages.
1 parent 3d668d5 commit 5df0ef6

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public void testPollableReactiveFlow() throws InterruptedException, TimeoutExcep
115115
.take(6)
116116
.subscribe();
117117

118+
final CountDownLatch asyncSetupLatch = new CountDownLatch(1);
119+
118120
Future<List<Integer>> future =
119121
Executors.newSingleThreadExecutor().submit(() ->
120122
Flux.just("11,12,13")
@@ -126,7 +128,9 @@ public void testPollableReactiveFlow() throws InterruptedException, TimeoutExcep
126128
.map(Message::getPayload)
127129
.take(7)
128130
.collectList()
129-
.block(Duration.ofSeconds(10)));
131+
.block(getDuration(asyncSetupLatch)));
132+
133+
assertTrue(asyncSetupLatch.await(10, TimeUnit.SECONDS));
130134

131135
this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
132136

@@ -136,6 +140,11 @@ public void testPollableReactiveFlow() throws InterruptedException, TimeoutExcep
136140
assertEquals(7, integers.size());
137141
}
138142

143+
private Duration getDuration(CountDownLatch asyncSetupLatch) {
144+
asyncSetupLatch.countDown();
145+
return Duration.ofSeconds(10);
146+
}
147+
139148
@Configuration
140149
@EnableIntegration
141150
public static class ContextConfiguration {

0 commit comments

Comments
 (0)