Skip to content

Commit 1b41461

Browse files
Unit Tests and SuppressWarnings
Added tests while validating pull request. This fixes issue ReactiveX#387
1 parent 21e230c commit 1b41461

File tree

2 files changed

+116
-0
lines changed

2 files changed

+116
-0
lines changed

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,37 +59,45 @@
5959
*/
6060
public final class OperationZip {
6161

62+
@SuppressWarnings("unchecked")
6263
public static <T1, T2, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
6364
return zip(Arrays.asList(o1, o2), Functions.fromFunc(zipFunction));
6465
}
6566

67+
@SuppressWarnings("unchecked")
6668
public static <T1, T2, T3, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, final Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
6769
return zip(Arrays.asList(o1, o2, o3), Functions.fromFunc(zipFunction));
6870
}
6971

72+
@SuppressWarnings("unchecked")
7073
public static <T1, T2, T3, T4, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, final Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
7174
return zip(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(zipFunction));
7275
}
7376

77+
@SuppressWarnings("unchecked")
7478
public static <T1, T2, T3, T4, T5, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, final Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
7579
return zip(Arrays.asList(o1, o2, o3, o4, o5), Functions.fromFunc(zipFunction));
7680
}
7781

82+
@SuppressWarnings("unchecked")
7883
public static <T1, T2, T3, T4, T5, T6, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
7984
final Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
8085
return zip(Arrays.asList(o1, o2, o3, o4, o5, o6), Functions.fromFunc(zipFunction));
8186
}
8287

88+
@SuppressWarnings("unchecked")
8389
public static <T1, T2, T3, T4, T5, T6, T7, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
8490
final Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
8591
return zip(Arrays.asList(o1, o2, o3, o4, o5, o6, o7), Functions.fromFunc(zipFunction));
8692
}
8793

94+
@SuppressWarnings("unchecked")
8895
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
8996
final Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
9097
return zip(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8), Functions.fromFunc(zipFunction));
9198
}
9299

100+
@SuppressWarnings("unchecked")
93101
public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
94102
Observable<? extends T9> o9, final Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
95103
return zip(Arrays.asList(o1, o2, o3, o4, o5, o6, o7, o8, o9), Functions.fromFunc(zipFunction));
@@ -415,6 +423,7 @@ public ItemObserver(
415423
this.observer = observer;
416424
this.cancel = cancel;
417425
}
426+
@SuppressWarnings("unchecked")
418427
@Override
419428
public void onNext(T value) {
420429
rwLock.readLock().lock();

rxjava-core/src/test/java/rx/operators/OperationZipTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import rx.Subscription;
3131
import rx.operators.OperationZip.Aggregator;
3232
import rx.operators.OperationZip.ZipObserver;
33+
import rx.subjects.PublishSubject;
3334
import rx.subscriptions.Subscriptions;
3435
import rx.util.functions.Func2;
3536
import rx.util.functions.Func3;
@@ -494,6 +495,112 @@ public void testOnNextExceptionInvokesOnError() {
494495

495496
verify(aObserver, times(1)).onError(any(Throwable.class));
496497
}
498+
499+
@Test
500+
public void testOnFirstCompletion() {
501+
PublishSubject<String> oA = PublishSubject.create();
502+
PublishSubject<String> oB = PublishSubject.create();
503+
504+
@SuppressWarnings("unchecked")
505+
Observer<String> observer = mock(Observer.class);
506+
507+
Observable<String> o = Observable.create(zip(oA, oB, getConcat2Strings()));
508+
o.subscribe(observer);
509+
510+
InOrder inOrder = inOrder(observer);
511+
512+
oA.onNext("a1");
513+
inOrder.verify(observer, never()).onNext(anyString());
514+
oB.onNext("b1");
515+
inOrder.verify(observer, times(1)).onNext("a1-b1");
516+
oB.onNext("b2");
517+
inOrder.verify(observer, never()).onNext(anyString());
518+
oA.onNext("a2");
519+
inOrder.verify(observer, times(1)).onNext("a2-b2");
520+
521+
oA.onNext("a3");
522+
oA.onNext("a4");
523+
oA.onNext("a5");
524+
oA.onCompleted();
525+
526+
// SHOULD ONCOMPLETE BE EMITTED HERE INSTEAD OF WAITING
527+
// FOR B3, B4, B5 TO BE EMITTED?
528+
529+
oB.onNext("b3");
530+
oB.onNext("b4");
531+
oB.onNext("b5");
532+
533+
inOrder.verify(observer, times(1)).onNext("a3-b3");
534+
inOrder.verify(observer, times(1)).onNext("a4-b4");
535+
inOrder.verify(observer, times(1)).onNext("a5-b5");
536+
537+
// WE RECEIVE THE ONCOMPLETE HERE
538+
inOrder.verify(observer, times(1)).onCompleted();
539+
540+
oB.onNext("b6");
541+
oB.onNext("b7");
542+
oB.onNext("b8");
543+
oB.onNext("b9");
544+
// never completes (infinite stream for example)
545+
546+
// we should receive nothing else despite oB continuing after oA completed
547+
inOrder.verifyNoMoreInteractions();
548+
}
549+
550+
@Test
551+
public void testOnErrorTermination() {
552+
PublishSubject<String> oA = PublishSubject.create();
553+
PublishSubject<String> oB = PublishSubject.create();
554+
555+
@SuppressWarnings("unchecked")
556+
Observer<String> observer = mock(Observer.class);
557+
558+
Observable<String> o = Observable.create(zip(oA, oB, getConcat2Strings()));
559+
o.subscribe(observer);
560+
561+
InOrder inOrder = inOrder(observer);
562+
563+
oA.onNext("a1");
564+
inOrder.verify(observer, never()).onNext(anyString());
565+
oB.onNext("b1");
566+
inOrder.verify(observer, times(1)).onNext("a1-b1");
567+
oB.onNext("b2");
568+
inOrder.verify(observer, never()).onNext(anyString());
569+
oA.onNext("a2");
570+
inOrder.verify(observer, times(1)).onNext("a2-b2");
571+
572+
oA.onNext("a3");
573+
oA.onNext("a4");
574+
oA.onNext("a5");
575+
oA.onError(new RuntimeException("forced failure"));
576+
577+
// it should emit failure immediately
578+
inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
579+
580+
oB.onNext("b3");
581+
oB.onNext("b4");
582+
oB.onNext("b5");
583+
oB.onNext("b6");
584+
oB.onNext("b7");
585+
oB.onNext("b8");
586+
oB.onNext("b9");
587+
// never completes (infinite stream for example)
588+
589+
// we should receive nothing else despite oB continuing after oA completed
590+
inOrder.verifyNoMoreInteractions();
591+
}
592+
593+
private Func2<String, String, String> getConcat2Strings() {
594+
return new Func2<String, String, String>() {
595+
596+
@Override
597+
public String call(String t1, String t2) {
598+
return t1 + "-" + t2;
599+
}
600+
};
601+
}
602+
603+
497604

498605
private Func2<Integer, Integer, Integer> getDivideZipr() {
499606
Func2<Integer, Integer, Integer> zipr = new Func2<Integer, Integer, Integer>() {

0 commit comments

Comments
 (0)