Skip to content

Commit 8adb583

Browse files
committed
ParallelFlowable.as()
1 parent 1914776 commit 8adb583

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

src/main/java/io/reactivex/parallel/ParallelFlowable.java

+23
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,29 @@ public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> sourc
122122
return RxJavaPlugins.onAssembly(new ParallelFromPublisher<T>(source, parallelism, prefetch));
123123
}
124124

125+
/**
126+
* Calls the specified converter function during assembly time and returns its resulting value.
127+
* <p>
128+
* This allows fluent conversion to any other type.
129+
*
130+
* @param <R> the resulting object type
131+
* @param converter the function that receives the current ParallelFlowable instance and returns a value
132+
* @return the converted value
133+
* @throws NullPointerException if converter is null
134+
* @since 2.1.7 - experimental
135+
*/
136+
@Experimental
137+
@CheckReturnValue
138+
@NonNull
139+
public final <R> R as(@NonNull ParallelFlowableConverter<T, R> converter) {
140+
try {
141+
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
142+
} catch (Throwable ex) {
143+
Exceptions.throwIfFatal(ex);
144+
throw ExceptionHelper.wrapOrThrow(ex);
145+
}
146+
}
147+
125148
/**
126149
* Maps the source values on each 'rail' to another value.
127150
* <p>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.parallel;
15+
16+
import io.reactivex.annotations.*;
17+
18+
/**
19+
* Convenience interface and callback used by the {@link ParallelFlowable#as} operator to turn a ParallelFlowable into
20+
* another value fluently.
21+
*
22+
* @param <T> the upstream type
23+
* @param <R> the output type
24+
* @since 2.1.7 - experimental
25+
*/
26+
@Experimental
27+
public interface ParallelFlowableConverter<T, R> {
28+
/**
29+
* Applies a function to the upstream ParallelFlowable and returns a converted value of type {@code R}.
30+
*
31+
* @param upstream the upstream ParallelFlowable instance
32+
* @return the converted value
33+
* @throws Exception on error
34+
*/
35+
@NonNull
36+
R apply(@NonNull ParallelFlowable<T> upstream) throws Exception;
37+
}

src/test/java/io/reactivex/parallel/ParallelFlowableTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,20 @@ public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
11001100
.assertResult(1, 2, 3, 4, 5);
11011101
}
11021102

1103+
@Test
1104+
public void as() {
1105+
Flowable.range(1, 5)
1106+
.parallel()
1107+
.as(new ParallelFlowableConverter<Integer, Flowable<Integer>>() {
1108+
@Override
1109+
public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
1110+
return pf.sequential();
1111+
}
1112+
})
1113+
.test()
1114+
.assertResult(1, 2, 3, 4, 5);
1115+
}
1116+
11031117
@Test(expected = TestException.class)
11041118
public void toThrows() {
11051119
Flowable.range(1, 5)
@@ -1112,6 +1126,18 @@ public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
11121126
});
11131127
}
11141128

1129+
@Test(expected = TestException.class)
1130+
public void asThrows() {
1131+
Flowable.range(1, 5)
1132+
.parallel()
1133+
.as(new ParallelFlowableConverter<Integer, Flowable<Integer>>() {
1134+
@Override
1135+
public Flowable<Integer> apply(ParallelFlowable<Integer> pf) throws Exception {
1136+
throw new TestException();
1137+
}
1138+
});
1139+
}
1140+
11151141
@Test
11161142
public void compose() {
11171143
Flowable.range(1, 5)

0 commit comments

Comments
 (0)