Skip to content

Add Single.zip() for Iterable of Singles #3539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package rx;

import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +33,7 @@
import rx.functions.Func7;
import rx.functions.Func8;
import rx.functions.Func9;
import rx.functions.FuncN;
import rx.annotations.Beta;
import rx.internal.operators.*;
import rx.internal.producers.SingleDelayedProducer;
Expand Down Expand Up @@ -1196,6 +1198,30 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Single<R> zip(Single
return just(new Observable<?>[] { asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6), asObservable(o7), asObservable(o8), asObservable(o9) }).lift(new OperatorZip<R>(zipFunction));
}

/**
* Returns a Single that emits the result of specified combiner function applied to combination of
* items emitted, in sequence, by an Iterable of other Singles.
* <p>
* {@code zip} applies this function in strict sequence.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param singles
* an Iterable of source Singles
* @param zipFunction
* a function that, when applied to an item emitted by each of the source Singles, results in
* an item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
public static <R> Single<R> zip(Iterable<? extends Single<?>> singles, FuncN<? extends R> zipFunction) {
return SingleOperatorZip.zip(iterableToArray(singles), zipFunction);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterables and Observables are mainly for deferred data streaming. This will iterate at assembly time instead on Subscription time, losing the duality aspects. For example, an Observable source turned into an Iterable via toBlocking().getIterable() is no longer lazy when this zip is applied.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is compatible with the behavior of Observable.zip.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep currently Observable serializes all values in the iterable into an array before zipping them so you can't append after lifting. IMO this is probably intentional since appending to some iterables while iterating results in ConcurrentModificationExceptions. Today the Observable<Observable<T>> overload can receive new observables to zip but the zip function won't be called until the outer observable calls onCompleted(). This is slightly later than the Iterable overload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, @akarnokd, @stealthcode let's use current implementation from this PR to be consistent with Observable.zip(Iterable<Observable>)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

}

/**
* Returns an Observable that emits the item emitted by the source Single, then the item emitted by the
* specified Single.
Expand Down Expand Up @@ -1264,7 +1290,7 @@ public final <R> Observable<R> flatMapObservable(Func1<? super T, ? extends Obse
* <dt><b>Scheduler:</b></dt>
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
*
* @param func
* a function to apply to the item emitted by the Single
* @return a Single that emits the item from the source Single, transformed by the specified function
Expand Down Expand Up @@ -2031,4 +2057,46 @@ public final Single<T> doOnUnsubscribe(final Action0 action) {
public final Single<T> doAfterTerminate(Action0 action) {
return lift(new OperatorDoAfterTerminate<T>(action));
}

/**
* FOR INTERNAL USE ONLY.
* <p>
* Converts {@link Iterable} of {@link Single} to array of {@link Single}.
*
* @param singlesIterable
* non null iterable of {@link Single}.
* @return array of {@link Single} with same length as passed iterable.
*/
@SuppressWarnings("unchecked")
static <T> Single<? extends T>[] iterableToArray(final Iterable<? extends Single<? extends T>> singlesIterable) {
final Single<? extends T>[] singlesArray;
int count;

if (singlesIterable instanceof Collection) {
Collection<? extends Single<? extends T>> list = (Collection<? extends Single<? extends T>>) singlesIterable;
count = list.size();
singlesArray = list.toArray(new Single[count]);
} else {
Single<? extends T>[] tempArray = new Single[8]; // Magic number used just to reduce number of allocations.
count = 0;
for (Single<? extends T> s : singlesIterable) {
if (count == tempArray.length) {
Single<? extends T>[] sb = new Single[count + (count >> 2)];
System.arraycopy(tempArray, 0, sb, 0, count);
tempArray = sb;
}
tempArray[count] = s;
count++;
}

if (tempArray.length == count) {
singlesArray = tempArray;
} else {
singlesArray = new Single[count];
System.arraycopy(tempArray, 0, singlesArray, 0, count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copy can be avoided with my inlined version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! It's a bugfix. Otherwise, you can have an array with length != count and have a null pointer exception in a loop (for example in SingleOperatorZip). I faced it when I tried to pass Set as Iterable in one of the tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why mine run up to count and not array length. If you factor it out, you can't return both the array and the count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it, so it was my bug when I split your code into two parts, sorry.

I can return Map.Entry from iterableToArray where one value will be the array and another — real count, but personally I'd prefer current variant.

If you afraid about performance — we can add more general check instanceof Collection and initialize the array with correct size even if Iterable is not a List. For real, I don't even remember when I used something that does not implement Collection but implements Iterable, I guess will cover a lot of cases with such check: List, Set, Queue and so on.

}
}

return singlesArray;
}
}
72 changes: 72 additions & 0 deletions src/main/java/rx/internal/operators/SingleOperatorZip.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package rx.internal.operators;

import rx.Single;
import rx.SingleSubscriber;
import rx.exceptions.Exceptions;
import rx.functions.FuncN;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class SingleOperatorZip {

public static <T, R> Single<R> zip(final Single<? extends T>[] singles, final FuncN<? extends R> zipper) {
return Single.create(new Single.OnSubscribe<R>() {
@Override
public void call(final SingleSubscriber<? super R> subscriber) {
final AtomicInteger wip = new AtomicInteger(singles.length);
final AtomicBoolean once = new AtomicBoolean();
final Object[] values = new Object[singles.length];

CompositeSubscription compositeSubscription = new CompositeSubscription();
subscriber.add(compositeSubscription);

for (int i = 0; i < singles.length; i++) {
if (compositeSubscription.isUnsubscribed() || once.get()) {
break;
}

final int j = i;
SingleSubscriber<T> singleSubscriber = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
values[j] = value;
if (wip.decrementAndGet() == 0) {
R r;

try {
r = zipper.call(values);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}

subscriber.onSuccess(r);
}
}

@Override
public void onError(Throwable error) {
if (once.compareAndSet(false, true)) {
subscriber.onError(error);
} else {
RxJavaPlugins.getInstance().getErrorHandler().handleError(error);
}
}
};

compositeSubscription.add(singleSubscriber);

if (compositeSubscription.isUnsubscribed() || once.get()) {
break;
}

singles[i].subscribe(singleSubscriber);
}
}
});
}
}
88 changes: 88 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -45,6 +50,7 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.FuncN;
import rx.schedulers.TestScheduler;
import rx.singles.BlockingSingle;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -113,6 +119,57 @@ public String call(String a, String b) {
ts.assertReceivedOnNext(Arrays.asList("AB"));
}

@Test
public void zipIterableShouldZipListOfSingles() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Iterable<Single<Integer>> singles = Arrays.asList(Single.just(1), Single.just(2), Single.just(3));

Single
.zip(singles, new FuncN<String>() {
@Override
public String call(Object... args) {
StringBuilder stringBuilder = new StringBuilder();
for (Object arg : args) {
stringBuilder.append(arg);
}
return stringBuilder.toString();
}
}).subscribe(ts);

ts.assertValue("123");
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void zipIterableShouldZipSetOfSingles() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Set<Single<String>> singlesSet = Collections.newSetFromMap(new LinkedHashMap<Single<String>, Boolean>(2));
Single<String> s1 = Single.just("1");
Single<String> s2 = Single.just("2");
Single<String> s3 = Single.just("3");

singlesSet.add(s1);
singlesSet.add(s2);
singlesSet.add(s3);

Single
.zip(singlesSet, new FuncN<String>() {
@Override
public String call(Object... args) {
StringBuilder stringBuilder = new StringBuilder();
for (Object arg : args) {
stringBuilder.append(arg);
}
return stringBuilder.toString();
}
}).subscribe(ts);

ts.assertValue("123");
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void testZipWith() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Expand Down Expand Up @@ -941,4 +998,35 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes()

verifyZeroInteractions(action);
}

@Test(expected = NullPointerException.class)
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
Single.iterableToArray(null);
}

@Test
public void iterableToArrayShouldConvertList() {
List<Single<String>> singlesList = Arrays.asList(Single.just("1"), Single.just("2"));

Single<? extends String>[] singlesArray = Single.iterableToArray(singlesList);
assertEquals(2, singlesArray.length);
assertSame(singlesList.get(0), singlesArray[0]);
assertSame(singlesList.get(1), singlesArray[1]);
}

@Test
public void iterableToArrayShouldConvertSet() {
// Just to trigger different path of the code that handles non-list iterables.
Set<Single<String>> singlesSet = Collections.newSetFromMap(new LinkedHashMap<Single<String>, Boolean>(2));
Single<String> s1 = Single.just("1");
Single<String> s2 = Single.just("2");

singlesSet.add(s1);
singlesSet.add(s2);

Single<? extends String>[] singlesArray = Single.iterableToArray(singlesSet);
assertEquals(2, singlesArray.length);
assertSame(s1, singlesArray[0]);
assertSame(s2, singlesArray[1]);
}
}