Skip to content

Commit 68e0713

Browse files
akarnokdviktorklang
authored andcommitted
Java 9 Flow bridge: add Subscriber converters (#399)
* Java 9 Flow bridge: add Subscriber converters * Fix return type javadoc
1 parent 306ae92 commit 68e0713

File tree

2 files changed

+92
-1
lines changed

2 files changed

+92
-1
lines changed

flow-bridge/src/main/java/org/reactivestreams/ReactiveStreamsFlowBridge.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,33 @@ public static <T, U> Flow.Processor<T, U> toFlow(
110110
}
111111
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
112112
}
113-
113+
114+
/**
115+
* Converts a Reactive Streams Subscriber into a Flow Subscriber.
116+
* @param <T> the input and output value type
117+
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
118+
* @return the equivalent Flow Subscriber
119+
*/
120+
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
121+
if (reactiveStreamsSubscriber == null) {
122+
throw new NullPointerException("reactiveStreamsSubscriber");
123+
}
124+
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
125+
}
126+
127+
/**
128+
* Converts a Flow Subscriber into a Reactive Streams Subscriber.
129+
* @param <T> the input and output value type
130+
* @param flowSubscriber the Flow Subscriber instance to convert
131+
* @return the equivalent Reactive Streams Subscriber
132+
*/
133+
public static <T> org.reactivestreams.Subscriber<T> toReactiveStreamsSubscriber(Flow.Subscriber<T> flowSubscriber) {
134+
if (flowSubscriber == null) {
135+
throw new NullPointerException("flowSubscriber");
136+
}
137+
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
138+
}
139+
114140
/**
115141
* Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
116142
*/

flow-bridge/src/test/java/org/reactivestreams/ReactiveStreamsFlowBridgeTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
package org.reactivestreams;
1313

14+
import org.testng.Assert;
1415
import org.testng.annotations.Test;
1516

1617
import java.io.IOException;
@@ -110,4 +111,68 @@ public void execute(Runnable command) {
110111

111112
tc.assertFailure(IOException.class, 1, 2, 3, 4, 5);
112113
}
114+
115+
@Test
116+
public void reactiveStreamsToFlowSubscriber() {
117+
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
118+
119+
Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);
120+
121+
final Object[] state = { null, null };
122+
123+
fs.onSubscribe(new Flow.Subscription() {
124+
@Override
125+
public void request(long n) {
126+
state[0] = n;
127+
}
128+
129+
@Override
130+
public void cancel() {
131+
state[1] = true;
132+
}
133+
});
134+
135+
Assert.assertEquals(state[0], Long.MAX_VALUE);
136+
137+
fs.onNext(1);
138+
fs.onNext(2);
139+
fs.onNext(3);
140+
fs.onComplete();
141+
142+
tc.assertResult(1, 2, 3);
143+
144+
Assert.assertNull(state[1]);
145+
}
146+
147+
@Test
148+
public void flowToReactiveStreamsSubscriber() {
149+
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();
150+
151+
org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toReactiveStreamsSubscriber(tc);
152+
153+
final Object[] state = { null, null };
154+
155+
fs.onSubscribe(new org.reactivestreams.Subscription() {
156+
@Override
157+
public void request(long n) {
158+
state[0] = n;
159+
}
160+
161+
@Override
162+
public void cancel() {
163+
state[1] = true;
164+
}
165+
});
166+
167+
Assert.assertEquals(state[0], Long.MAX_VALUE);
168+
169+
fs.onNext(1);
170+
fs.onNext(2);
171+
fs.onNext(3);
172+
fs.onComplete();
173+
174+
tc.assertResult(1, 2, 3);
175+
176+
Assert.assertNull(state[1]);
177+
}
113178
}

0 commit comments

Comments
 (0)