diff --git a/src/main/java/rx/internal/operators/OperatorSkipWhile.java b/src/main/java/rx/internal/operators/OperatorSkipWhile.java index 7936901a0e..deea70afcd 100644 --- a/src/main/java/rx/internal/operators/OperatorSkipWhile.java +++ b/src/main/java/rx/internal/operators/OperatorSkipWhile.java @@ -17,6 +17,7 @@ import rx.Observable.Operator; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.functions.Func1; import rx.functions.Func2; @@ -40,7 +41,14 @@ public void onNext(T t) { if (!skipping) { child.onNext(t); } else { - if (!predicate.call(t, index++)) { + final boolean skip; + try { + skip = predicate.call(t, index++); + } catch (Throwable e) { + Exceptions.throwOrReport(e, child, t); + return; + } + if (!skip) { skipping = false; child.onNext(t); } else { diff --git a/src/test/java/rx/internal/operators/OperatorSkipWhileTest.java b/src/test/java/rx/internal/operators/OperatorSkipWhileTest.java index 38a93bd5fb..d0d8f6960b 100644 --- a/src/test/java/rx/internal/operators/OperatorSkipWhileTest.java +++ b/src/test/java/rx/internal/operators/OperatorSkipWhileTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.inOrder; @@ -23,12 +24,17 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.Test; import org.mockito.InOrder; import rx.Observable; import rx.Observer; +import rx.functions.Action1; import rx.functions.Func1; +import rx.observers.Subscribers; +import rx.observers.TestSubscriber; public class OperatorSkipWhileTest { @@ -51,6 +57,20 @@ public Boolean call(Integer value) { return index++ < 3; } }; + + private static final Func1 THROWS_NON_FATAL = new Func1() { + @Override + public Boolean call(Integer values) { + throw new RuntimeException(); + } + }; + + private static final Func1 THROWS_FATAL = new Func1() { + @Override + public Boolean call(Integer values) { + throw new OutOfMemoryError(); + } + }; @Test public void testSkipWithIndex() { @@ -120,6 +140,33 @@ public void testSkipError() { inOrder.verify(w, times(1)).onError(any(RuntimeException.class)); } + @Test + public void testPredicateRuntimeError() { + Observable.just(1).skipWhile(THROWS_NON_FATAL).subscribe(w); + InOrder inOrder = inOrder(w); + inOrder.verify(w, never()).onNext(anyInt()); + inOrder.verify(w, never()).onCompleted(); + inOrder.verify(w, times(1)).onError(any(RuntimeException.class)); + } + + @Test(expected = OutOfMemoryError.class) + public void testPredicateFatalError() { + Observable.just(1).skipWhile(THROWS_FATAL).unsafeSubscribe(Subscribers.empty()); + } + + @Test + public void testPredicateRuntimeErrorDoesNotGoUpstreamFirst() { + final AtomicBoolean errorOccurred = new AtomicBoolean(false); + TestSubscriber ts = TestSubscriber.create(); + Observable.just(1).doOnError(new Action1() { + @Override + public void call(Throwable t) { + errorOccurred.set(true); + } + }).skipWhile(THROWS_NON_FATAL).subscribe(ts); + assertFalse(errorOccurred.get()); + } + @Test public void testSkipManySubscribers() { Observable src = Observable.range(1, 10).skipWhile(LESS_THAN_FIVE);