From 77b84791d0bdb9caaa8baba3ecb463dbde21e06c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 28 Apr 2016 14:21:06 +0200 Subject: [PATCH] 1.x: map() and filter() should unsubscribe on crash eagerly --- .../rx/internal/operators/OperatorFilter.java | 86 +++++++++++++------ .../rx/internal/operators/OperatorMap.java | 80 ++++++++++++----- .../operators/OperatorFilterTest.java | 27 ++++++ .../internal/operators/OperatorMapTest.java | 46 ++++++---- 4 files changed, 179 insertions(+), 60 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorFilter.java b/src/main/java/rx/internal/operators/OperatorFilter.java index 3704dbc4a3..15f1a48ec9 100644 --- a/src/main/java/rx/internal/operators/OperatorFilter.java +++ b/src/main/java/rx/internal/operators/OperatorFilter.java @@ -15,15 +15,17 @@ */ package rx.internal.operators; +import rx.*; import rx.Observable.Operator; -import rx.Subscriber; import rx.exceptions.*; import rx.functions.Func1; +import rx.internal.util.RxJavaPluginUtils; /** * Filters an Observable by discarding any items it emits that do not meet some test. *

* + * @param the value type */ public final class OperatorFilter implements Operator { @@ -35,33 +37,67 @@ public OperatorFilter(Func1 predicate) { @Override public Subscriber call(final Subscriber child) { - return new Subscriber(child) { + FilterSubscriber parent = new FilterSubscriber(child, predicate); + child.add(parent); + return parent; + } - @Override - public void onCompleted() { - child.onCompleted(); - } + static final class FilterSubscriber extends Subscriber { + + final Subscriber actual; + + final Func1 predicate; - @Override - public void onError(Throwable e) { - child.onError(e); + boolean done; + + public FilterSubscriber(Subscriber actual, Func1 predicate) { + this.actual = actual; + this.predicate = predicate; + } + + @Override + public void onNext(T t) { + boolean result; + + try { + result = predicate.call(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(ex, t)); + return; } - - @Override - public void onNext(T t) { - try { - if (predicate.call(t)) { - child.onNext(t); - } else { - // TODO consider a more complicated version that batches these - request(1); - } - } catch (Throwable e) { - Exceptions.throwOrReport(e, child, t); - } + + if (result) { + actual.onNext(t); + } else { + request(1); } - - }; + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; + } + done = true; + + actual.onError(e); + } + + + @Override + public void onCompleted() { + if (done) { + return; + } + actual.onCompleted(); + } + @Override + public void setProducer(Producer p) { + super.setProducer(p); + actual.setProducer(p); + } } - } diff --git a/src/main/java/rx/internal/operators/OperatorMap.java b/src/main/java/rx/internal/operators/OperatorMap.java index 90925c2764..a8a33178ca 100644 --- a/src/main/java/rx/internal/operators/OperatorMap.java +++ b/src/main/java/rx/internal/operators/OperatorMap.java @@ -15,16 +15,20 @@ */ package rx.internal.operators; +import rx.*; import rx.Observable.Operator; -import rx.Subscriber; -import rx.exceptions.Exceptions; +import rx.exceptions.*; import rx.functions.Func1; +import rx.internal.util.RxJavaPluginUtils; /** * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of * this transformation as a new {@code Observable}. *

* + * + * @param the input value type + * @param the return value type */ public final class OperatorMap implements Operator { @@ -36,28 +40,64 @@ public OperatorMap(Func1 transformer) { @Override public Subscriber call(final Subscriber o) { - return new Subscriber(o) { + MapSubscriber parent = new MapSubscriber(o, transformer); + o.add(parent); + return parent; + } + + static final class MapSubscriber extends Subscriber { + + final Subscriber actual; + + final Func1 mapper; - @Override - public void onCompleted() { - o.onCompleted(); + boolean done; + + public MapSubscriber(Subscriber actual, Func1 mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void onNext(T t) { + R result; + + try { + result = mapper.call(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(ex, t)); + return; } - - @Override - public void onError(Throwable e) { - o.onError(e); + + actual.onNext(result); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; } - - @Override - public void onNext(T t) { - try { - o.onNext(transformer.call(t)); - } catch (Throwable e) { - Exceptions.throwOrReport(e, this, t); - } + done = true; + + actual.onError(e); + } + + + @Override + public void onCompleted() { + if (done) { + return; } - - }; + actual.onCompleted(); + } + + @Override + public void setProducer(Producer p) { + actual.setProducer(p); + } } } diff --git a/src/test/java/rx/internal/operators/OperatorFilterTest.java b/src/test/java/rx/internal/operators/OperatorFilterTest.java index f1f086666c..543dc06cf7 100644 --- a/src/test/java/rx/internal/operators/OperatorFilterTest.java +++ b/src/test/java/rx/internal/operators/OperatorFilterTest.java @@ -28,6 +28,7 @@ import rx.functions.*; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; public class OperatorFilterTest { @@ -54,6 +55,7 @@ public Boolean call(String t1) { /** * Make sure we are adjusting subscriber.request() for filtered items + * @throws InterruptedException on interrupt */ @Test(timeout = 500) public void testWithBackpressure() throws InterruptedException { @@ -100,6 +102,7 @@ public void onNext(String t) { /** * Make sure we are adjusting subscriber.request() for filtered items + * @throws InterruptedException on interrupt */ @Test(timeout = 500000) public void testWithBackpressure2() throws InterruptedException { @@ -167,4 +170,28 @@ public void call(Integer t) { } } } + + @Test + public void functionCrashUnsubscribes() { + + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + ps.filter(new Func1() { + @Override + public Boolean call(Integer v) { + throw new TestException(); + } + }).unsafeSubscribe(ts); + + Assert.assertTrue("Not subscribed?", ps.hasObservers()); + + ps.onNext(1); + + Assert.assertFalse("Subscribed?", ps.hasObservers()); + + ts.assertError(TestException.class); + } + } diff --git a/src/test/java/rx/internal/operators/OperatorMapTest.java b/src/test/java/rx/internal/operators/OperatorMapTest.java index d79d5863b6..18e3e523e3 100644 --- a/src/test/java/rx/internal/operators/OperatorMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorMapTest.java @@ -16,28 +16,21 @@ package rx.internal.operators; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; -import java.util.HashMap; -import java.util.Map; -import java.util.NoSuchElementException; +import java.util.*; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.junit.*; +import org.mockito.*; import rx.Observable; import rx.Observer; import rx.Subscriber; -import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.internal.operators.OperatorMap; +import rx.exceptions.*; +import rx.functions.*; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorMapTest { @@ -339,4 +332,27 @@ public void call(String s) { } }); } + + @Test + public void functionCrashUnsubscribes() { + + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + ps.map(new Func1() { + @Override + public Integer call(Integer v) { + throw new TestException(); + } + }).unsafeSubscribe(ts); + + Assert.assertTrue("Not subscribed?", ps.hasObservers()); + + ps.onNext(1); + + Assert.assertFalse("Subscribed?", ps.hasObservers()); + + ts.assertError(TestException.class); + } }