Skip to content

Commit 2af58cd

Browse files
author
Aaron Tull
committed
Merge pull request #3539 from artem-zinnatullin/single-zip-iterable
Add Single.zip() for Iterable of Singles
2 parents 0c039ee + 1a99ffd commit 2af58cd

File tree

3 files changed

+229
-1
lines changed

3 files changed

+229
-1
lines changed

src/main/java/rx/Single.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package rx;
1414

15+
import java.util.Collection;
1516
import java.util.concurrent.Callable;
1617
import java.util.concurrent.Future;
1718
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@
3233
import rx.functions.Func7;
3334
import rx.functions.Func8;
3435
import rx.functions.Func9;
36+
import rx.functions.FuncN;
3537
import rx.annotations.Beta;
3638
import rx.internal.operators.*;
3739
import rx.internal.producers.SingleDelayedProducer;
@@ -1196,6 +1198,30 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Single<R> zip(Single
11961198
return just(new Observable<?>[] { asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6), asObservable(o7), asObservable(o8), asObservable(o9) }).lift(new OperatorZip<R>(zipFunction));
11971199
}
11981200

1201+
/**
1202+
* Returns a Single that emits the result of specified combiner function applied to combination of
1203+
* items emitted, in sequence, by an Iterable of other Singles.
1204+
* <p>
1205+
* {@code zip} applies this function in strict sequence.
1206+
* <p>
1207+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
1208+
* <dl>
1209+
* <dt><b>Scheduler:</b></dt>
1210+
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
1211+
* </dl>
1212+
*
1213+
* @param singles
1214+
* an Iterable of source Singles
1215+
* @param zipFunction
1216+
* a function that, when applied to an item emitted by each of the source Singles, results in
1217+
* an item that will be emitted by the resulting Single
1218+
* @return a Single that emits the zipped results
1219+
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
1220+
*/
1221+
public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? extends R> zipFunction) {
1222+
return SingleOperatorZip.zip(iterableToArray(singles), zipFunction);
1223+
}
1224+
11991225
/**
12001226
* Returns an Observable that emits the item emitted by the source Single, then the item emitted by the
12011227
* specified Single.
@@ -1264,7 +1290,7 @@ public final <R> Observable<R> flatMapObservable(Func1<? super T, ? extends Obse
12641290
* <dt><b>Scheduler:</b></dt>
12651291
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
12661292
* </dl>
1267-
*
1293+
*
12681294
* @param func
12691295
* a function to apply to the item emitted by the Single
12701296
* @return a Single that emits the item from the source Single, transformed by the specified function
@@ -2031,4 +2057,46 @@ public final Single<T> doOnUnsubscribe(final Action0 action) {
20312057
public final Single<T> doAfterTerminate(Action0 action) {
20322058
return lift(new OperatorDoAfterTerminate<T>(action));
20332059
}
2060+
2061+
/**
2062+
* FOR INTERNAL USE ONLY.
2063+
* <p>
2064+
* Converts {@link Iterable} of {@link Single} to array of {@link Single}.
2065+
*
2066+
* @param singlesIterable
2067+
* non null iterable of {@link Single}.
2068+
* @return array of {@link Single} with same length as passed iterable.
2069+
*/
2070+
@SuppressWarnings("unchecked")
2071+
static <T> Single<? extends T>[] iterableToArray(final Iterable<? extends Single<? extends T>> singlesIterable) {
2072+
final Single<? extends T>[] singlesArray;
2073+
int count;
2074+
2075+
if (singlesIterable instanceof Collection) {
2076+
Collection<? extends Single<? extends T>> list = (Collection<? extends Single<? extends T>>) singlesIterable;
2077+
count = list.size();
2078+
singlesArray = list.toArray(new Single[count]);
2079+
} else {
2080+
Single<? extends T>[] tempArray = new Single[8]; // Magic number used just to reduce number of allocations.
2081+
count = 0;
2082+
for (Single<? extends T> s : singlesIterable) {
2083+
if (count == tempArray.length) {
2084+
Single<? extends T>[] sb = new Single[count + (count >> 2)];
2085+
System.arraycopy(tempArray, 0, sb, 0, count);
2086+
tempArray = sb;
2087+
}
2088+
tempArray[count] = s;
2089+
count++;
2090+
}
2091+
2092+
if (tempArray.length == count) {
2093+
singlesArray = tempArray;
2094+
} else {
2095+
singlesArray = new Single[count];
2096+
System.arraycopy(tempArray, 0, singlesArray, 0, count);
2097+
}
2098+
}
2099+
2100+
return singlesArray;
2101+
}
20342102
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package rx.internal.operators;
2+
3+
import rx.Single;
4+
import rx.SingleSubscriber;
5+
import rx.exceptions.Exceptions;
6+
import rx.functions.FuncN;
7+
import rx.plugins.RxJavaPlugins;
8+
import rx.subscriptions.CompositeSubscription;
9+
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
public class SingleOperatorZip {
14+
15+
public static <T, R> Single<R> zip(final Single<? extends T>[] singles, final FuncN<? extends R> zipper) {
16+
return Single.create(new Single.OnSubscribe<R>() {
17+
@Override
18+
public void call(final SingleSubscriber<? super R> subscriber) {
19+
final AtomicInteger wip = new AtomicInteger(singles.length);
20+
final AtomicBoolean once = new AtomicBoolean();
21+
final Object[] values = new Object[singles.length];
22+
23+
CompositeSubscription compositeSubscription = new CompositeSubscription();
24+
subscriber.add(compositeSubscription);
25+
26+
for (int i = 0; i < singles.length; i++) {
27+
if (compositeSubscription.isUnsubscribed() || once.get()) {
28+
break;
29+
}
30+
31+
final int j = i;
32+
SingleSubscriber<T> singleSubscriber = new SingleSubscriber<T>() {
33+
@Override
34+
public void onSuccess(T value) {
35+
values[j] = value;
36+
if (wip.decrementAndGet() == 0) {
37+
R r;
38+
39+
try {
40+
r = zipper.call(values);
41+
} catch (Throwable e) {
42+
Exceptions.throwIfFatal(e);
43+
onError(e);
44+
return;
45+
}
46+
47+
subscriber.onSuccess(r);
48+
}
49+
}
50+
51+
@Override
52+
public void onError(Throwable error) {
53+
if (once.compareAndSet(false, true)) {
54+
subscriber.onError(error);
55+
} else {
56+
RxJavaPlugins.getInstance().getErrorHandler().handleError(error);
57+
}
58+
}
59+
};
60+
61+
compositeSubscription.add(singleSubscriber);
62+
63+
if (compositeSubscription.isUnsubscribed() || once.get()) {
64+
break;
65+
}
66+
67+
singles[i].subscribe(singleSubscriber);
68+
}
69+
}
70+
});
71+
}
72+
}

src/test/java/rx/SingleTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
import static org.mockito.Mockito.when;
2727

2828
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.HashSet;
31+
import java.util.LinkedHashMap;
32+
import java.util.List;
33+
import java.util.Set;
2934
import java.util.concurrent.Callable;
3035
import java.util.concurrent.CountDownLatch;
3136
import java.util.concurrent.TimeUnit;
@@ -45,6 +50,7 @@
4550
import rx.functions.Action1;
4651
import rx.functions.Func1;
4752
import rx.functions.Func2;
53+
import rx.functions.FuncN;
4854
import rx.schedulers.TestScheduler;
4955
import rx.singles.BlockingSingle;
5056
import rx.observers.TestSubscriber;
@@ -113,6 +119,57 @@ public String call(String a, String b) {
113119
ts.assertReceivedOnNext(Arrays.asList("AB"));
114120
}
115121

122+
@Test
123+
public void zipIterableShouldZipListOfSingles() {
124+
TestSubscriber<String> ts = new TestSubscriber<String>();
125+
Iterable<Single<Integer>> singles = Arrays.asList(Single.just(1), Single.just(2), Single.just(3));
126+
127+
Single
128+
.zip(singles, new FuncN<String>() {
129+
@Override
130+
public String call(Object... args) {
131+
StringBuilder stringBuilder = new StringBuilder();
132+
for (Object arg : args) {
133+
stringBuilder.append(arg);
134+
}
135+
return stringBuilder.toString();
136+
}
137+
}).subscribe(ts);
138+
139+
ts.assertValue("123");
140+
ts.assertNoErrors();
141+
ts.assertCompleted();
142+
}
143+
144+
@Test
145+
public void zipIterableShouldZipSetOfSingles() {
146+
TestSubscriber<String> ts = new TestSubscriber<String>();
147+
Set<Single<String>> singlesSet = Collections.newSetFromMap(new LinkedHashMap<Single<String>, Boolean>(2));
148+
Single<String> s1 = Single.just("1");
149+
Single<String> s2 = Single.just("2");
150+
Single<String> s3 = Single.just("3");
151+
152+
singlesSet.add(s1);
153+
singlesSet.add(s2);
154+
singlesSet.add(s3);
155+
156+
Single
157+
.zip(singlesSet, new FuncN<String>() {
158+
@Override
159+
public String call(Object... args) {
160+
StringBuilder stringBuilder = new StringBuilder();
161+
for (Object arg : args) {
162+
stringBuilder.append(arg);
163+
}
164+
return stringBuilder.toString();
165+
}
166+
}).subscribe(ts);
167+
168+
ts.assertValue("123");
169+
ts.assertNoErrors();
170+
ts.assertCompleted();
171+
}
172+
116173
@Test
117174
public void testZipWith() {
118175
TestSubscriber<String> ts = new TestSubscriber<String>();
@@ -941,4 +998,35 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes()
941998

942999
verifyZeroInteractions(action);
9431000
}
1001+
1002+
@Test(expected = NullPointerException.class)
1003+
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
1004+
Single.iterableToArray(null);
1005+
}
1006+
1007+
@Test
1008+
public void iterableToArrayShouldConvertList() {
1009+
List<Single<String>> singlesList = Arrays.asList(Single.just("1"), Single.just("2"));
1010+
1011+
Single<? extends String>[] singlesArray = Single.iterableToArray(singlesList);
1012+
assertEquals(2, singlesArray.length);
1013+
assertSame(singlesList.get(0), singlesArray[0]);
1014+
assertSame(singlesList.get(1), singlesArray[1]);
1015+
}
1016+
1017+
@Test
1018+
public void iterableToArrayShouldConvertSet() {
1019+
// Just to trigger different path of the code that handles non-list iterables.
1020+
Set<Single<String>> singlesSet = Collections.newSetFromMap(new LinkedHashMap<Single<String>, Boolean>(2));
1021+
Single<String> s1 = Single.just("1");
1022+
Single<String> s2 = Single.just("2");
1023+
1024+
singlesSet.add(s1);
1025+
singlesSet.add(s2);
1026+
1027+
Single<? extends String>[] singlesArray = Single.iterableToArray(singlesSet);
1028+
assertEquals(2, singlesArray.length);
1029+
assertSame(s1, singlesArray[0]);
1030+
assertSame(s2, singlesArray[1]);
1031+
}
9441032
}

0 commit comments

Comments
 (0)