diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java index 4aff6fc162..9ab8f82869 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java @@ -22,6 +22,7 @@ import rx.Observable.Operator; import rx.Producer; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; import rx.internal.util.BackpressureDrainManager; @@ -156,7 +157,15 @@ private boolean assertCapacity() { "Overflowed buffer of " + baseCapacity)); if (onOverflow != null) { - onOverflow.call(); + try { + onOverflow.call(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + manager.terminateAndDrain(e); + // this line not strictly necessary but nice for clarity + // and in case of future changes to code after this catch block + return false; + } } } return false; diff --git a/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java b/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java index 004764dd0b..48fa099735 100644 --- a/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java @@ -15,10 +15,17 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; import rx.Observable; import rx.Observable.OnSubscribe; @@ -27,12 +34,10 @@ import rx.Subscription; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; +import rx.functions.Action1; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public class OperatorOnBackpressureBufferTest { @Test @@ -147,5 +152,30 @@ public void call(Subscriber s) { } }); + + private static final Action0 THROWS_NON_FATAL = new Action0() { + + @Override + public void call() { + throw new RuntimeException(); + }}; + + @Test + public void testNonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() { + final AtomicBoolean errorOccurred = new AtomicBoolean(false); + TestSubscriber ts = TestSubscriber.create(0); + infinite + .subscribeOn(Schedulers.computation()) + .doOnError(new Action1() { + @Override + public void call(Throwable t) { + errorOccurred.set(true); + } + }) + .onBackpressureBuffer(1, THROWS_NON_FATAL) + .subscribe(ts); + ts.awaitTerminalEvent(); + assertFalse(errorOccurred.get()); + } }