From 1a99ffddd1daa3cbf0d866f45fd4fafed72a0465 Mon Sep 17 00:00:00 2001 From: Artem Zinnatullin Date: Tue, 24 Nov 2015 16:26:29 +0300 Subject: [PATCH] Add Single.zip() for Iterable of Singles --- src/main/java/rx/Single.java | 70 ++++++++++++++- .../internal/operators/SingleOperatorZip.java | 72 +++++++++++++++ src/test/java/rx/SingleTest.java | 88 +++++++++++++++++++ 3 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 src/main/java/rx/internal/operators/SingleOperatorZip.java diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 7b116355a5..60ef33b949 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -12,6 +12,7 @@ */ package rx; +import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -32,6 +33,7 @@ import rx.functions.Func7; import rx.functions.Func8; import rx.functions.Func9; +import rx.functions.FuncN; import rx.annotations.Beta; import rx.internal.operators.*; import rx.internal.producers.SingleDelayedProducer; @@ -1196,6 +1198,30 @@ public final static Single zip(Single return just(new Observable[] { asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6), asObservable(o7), asObservable(o8), asObservable(o9) }).lift(new OperatorZip(zipFunction)); } + /** + * Returns a Single that emits the result of specified combiner function applied to combination of + * items emitted, in sequence, by an Iterable of other Singles. + *

+ * {@code zip} applies this function in strict sequence. + *

+ * + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param singles + * an Iterable of source Singles + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Singles, results in + * an item that will be emitted by the resulting Single + * @return a Single that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public static Single zip(Iterable> singles, FuncN zipFunction) { + return SingleOperatorZip.zip(iterableToArray(singles), zipFunction); + } + /** * Returns an Observable that emits the item emitted by the source Single, then the item emitted by the * specified Single. @@ -1264,7 +1290,7 @@ public final Observable flatMapObservable(Func1Scheduler: *
{@code map} does not operate by default on a particular {@link Scheduler}.
* - * + * * @param func * a function to apply to the item emitted by the Single * @return a Single that emits the item from the source Single, transformed by the specified function @@ -2031,4 +2057,46 @@ public final Single doOnUnsubscribe(final Action0 action) { public final Single doAfterTerminate(Action0 action) { return lift(new OperatorDoAfterTerminate(action)); } + + /** + * FOR INTERNAL USE ONLY. + *

+ * Converts {@link Iterable} of {@link Single} to array of {@link Single}. + * + * @param singlesIterable + * non null iterable of {@link Single}. + * @return array of {@link Single} with same length as passed iterable. + */ + @SuppressWarnings("unchecked") + static Single[] iterableToArray(final Iterable> singlesIterable) { + final Single[] singlesArray; + int count; + + if (singlesIterable instanceof Collection) { + Collection> list = (Collection>) singlesIterable; + count = list.size(); + singlesArray = list.toArray(new Single[count]); + } else { + Single[] tempArray = new Single[8]; // Magic number used just to reduce number of allocations. + count = 0; + for (Single s : singlesIterable) { + if (count == tempArray.length) { + Single[] sb = new Single[count + (count >> 2)]; + System.arraycopy(tempArray, 0, sb, 0, count); + tempArray = sb; + } + tempArray[count] = s; + count++; + } + + if (tempArray.length == count) { + singlesArray = tempArray; + } else { + singlesArray = new Single[count]; + System.arraycopy(tempArray, 0, singlesArray, 0, count); + } + } + + return singlesArray; + } } diff --git a/src/main/java/rx/internal/operators/SingleOperatorZip.java b/src/main/java/rx/internal/operators/SingleOperatorZip.java new file mode 100644 index 0000000000..936750941f --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleOperatorZip.java @@ -0,0 +1,72 @@ +package rx.internal.operators; + +import rx.Single; +import rx.SingleSubscriber; +import rx.exceptions.Exceptions; +import rx.functions.FuncN; +import rx.plugins.RxJavaPlugins; +import rx.subscriptions.CompositeSubscription; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class SingleOperatorZip { + + public static Single zip(final Single[] singles, final FuncN zipper) { + return Single.create(new Single.OnSubscribe() { + @Override + public void call(final SingleSubscriber subscriber) { + final AtomicInteger wip = new AtomicInteger(singles.length); + final AtomicBoolean once = new AtomicBoolean(); + final Object[] values = new Object[singles.length]; + + CompositeSubscription compositeSubscription = new CompositeSubscription(); + subscriber.add(compositeSubscription); + + for (int i = 0; i < singles.length; i++) { + if (compositeSubscription.isUnsubscribed() || once.get()) { + break; + } + + final int j = i; + SingleSubscriber singleSubscriber = new SingleSubscriber() { + @Override + public void onSuccess(T value) { + values[j] = value; + if (wip.decrementAndGet() == 0) { + R r; + + try { + r = zipper.call(values); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + onError(e); + return; + } + + subscriber.onSuccess(r); + } + } + + @Override + public void onError(Throwable error) { + if (once.compareAndSet(false, true)) { + subscriber.onError(error); + } else { + RxJavaPlugins.getInstance().getErrorHandler().handleError(error); + } + } + }; + + compositeSubscription.add(singleSubscriber); + + if (compositeSubscription.isUnsubscribed() || once.get()) { + break; + } + + singles[i].subscribe(singleSubscriber); + } + } + }); + } +} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 17e3367835..2871450708 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -26,6 +26,11 @@ import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -45,6 +50,7 @@ import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; +import rx.functions.FuncN; import rx.schedulers.TestScheduler; import rx.singles.BlockingSingle; import rx.observers.TestSubscriber; @@ -113,6 +119,57 @@ public String call(String a, String b) { ts.assertReceivedOnNext(Arrays.asList("AB")); } + @Test + public void zipIterableShouldZipListOfSingles() { + TestSubscriber ts = new TestSubscriber(); + Iterable> singles = Arrays.asList(Single.just(1), Single.just(2), Single.just(3)); + + Single + .zip(singles, new FuncN() { + @Override + public String call(Object... args) { + StringBuilder stringBuilder = new StringBuilder(); + for (Object arg : args) { + stringBuilder.append(arg); + } + return stringBuilder.toString(); + } + }).subscribe(ts); + + ts.assertValue("123"); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void zipIterableShouldZipSetOfSingles() { + TestSubscriber ts = new TestSubscriber(); + Set> singlesSet = Collections.newSetFromMap(new LinkedHashMap, Boolean>(2)); + Single s1 = Single.just("1"); + Single s2 = Single.just("2"); + Single s3 = Single.just("3"); + + singlesSet.add(s1); + singlesSet.add(s2); + singlesSet.add(s3); + + Single + .zip(singlesSet, new FuncN() { + @Override + public String call(Object... args) { + StringBuilder stringBuilder = new StringBuilder(); + for (Object arg : args) { + stringBuilder.append(arg); + } + return stringBuilder.toString(); + } + }).subscribe(ts); + + ts.assertValue("123"); + ts.assertNoErrors(); + ts.assertCompleted(); + } + @Test public void testZipWith() { TestSubscriber ts = new TestSubscriber(); @@ -941,4 +998,35 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes() verifyZeroInteractions(action); } + + @Test(expected = NullPointerException.class) + public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() { + Single.iterableToArray(null); + } + + @Test + public void iterableToArrayShouldConvertList() { + List> singlesList = Arrays.asList(Single.just("1"), Single.just("2")); + + Single[] singlesArray = Single.iterableToArray(singlesList); + assertEquals(2, singlesArray.length); + assertSame(singlesList.get(0), singlesArray[0]); + assertSame(singlesList.get(1), singlesArray[1]); + } + + @Test + public void iterableToArrayShouldConvertSet() { + // Just to trigger different path of the code that handles non-list iterables. + Set> singlesSet = Collections.newSetFromMap(new LinkedHashMap, Boolean>(2)); + Single s1 = Single.just("1"); + Single s2 = Single.just("2"); + + singlesSet.add(s1); + singlesSet.add(s2); + + Single[] singlesArray = Single.iterableToArray(singlesSet); + assertEquals(2, singlesArray.length); + assertSame(s1, singlesArray[0]); + assertSame(s2, singlesArray[1]); + } }