From 5d8980fc3c9b496b457b389f7772d98c9f6de01b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 28 Apr 2016 22:25:12 +0200 Subject: [PATCH] 1.x: OperatorMapPair should unsubscribe on crash eagerly --- .../internal/operators/OperatorMapPair.java | 106 +++++++++++++----- .../operators/OperatorMapPairTest.java | 39 +++++++ 2 files changed, 117 insertions(+), 28 deletions(-) create mode 100644 src/test/java/rx/internal/operators/OperatorMapPairTest.java diff --git a/src/main/java/rx/internal/operators/OperatorMapPair.java b/src/main/java/rx/internal/operators/OperatorMapPair.java index b16a9b3c41..3cba7d6bf8 100644 --- a/src/main/java/rx/internal/operators/OperatorMapPair.java +++ b/src/main/java/rx/internal/operators/OperatorMapPair.java @@ -15,12 +15,11 @@ */ package rx.internal.operators; -import rx.Observable; +import rx.*; import rx.Observable.Operator; import rx.exceptions.*; -import rx.Subscriber; -import rx.functions.Func1; -import rx.functions.Func2; +import rx.functions.*; +import rx.internal.util.RxJavaPluginUtils; /** * An {@link Operator} that pairs up items emitted by a source {@link Observable} with the sequence of items @@ -45,6 +44,7 @@ public final class OperatorMapPair implements Operator Func1> convertSelector(final Func1> selector) { return new Func1>() { + @SuppressWarnings("cast") @Override public Observable call(T t1) { return (Observable)Observable.from(selector.call(t1)); @@ -62,34 +62,84 @@ public OperatorMapPair(final Func1> @Override public Subscriber call(final Subscriber> o) { - return new Subscriber(o) { + MapPairSubscriber parent = new MapPairSubscriber(o, collectionSelector, resultSelector); + o.add(parent); + return parent; + } + + static final class MapPairSubscriber extends Subscriber { + + final Subscriber> actual; + + final Func1> collectionSelector; + final Func2 resultSelector; - @Override - public void onCompleted() { - o.onCompleted(); + boolean done; + + public MapPairSubscriber(Subscriber> actual, + Func1> collectionSelector, + Func2 resultSelector) { + this.actual = actual; + this.collectionSelector = collectionSelector; + this.resultSelector = resultSelector; + } + + @Override + public void onNext(T outer) { + + Observable intermediate; + + try { + intermediate = collectionSelector.call(outer); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(ex, outer)); + return; } - - @Override - public void onError(Throwable e) { - o.onError(e); + + actual.onNext(intermediate.map(new OuterInnerMapper(outer, resultSelector))); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; } - - @Override - public void onNext(final T outer) { - try { - o.onNext(collectionSelector.call(outer).map(new Func1() { - - @Override - public R call(U inner) { - return resultSelector.call(outer, inner); - } - })); - } catch (Throwable e) { - Exceptions.throwOrReport(e, o, outer); - } + done = true; + + actual.onError(e); + } + + + @Override + public void onCompleted() { + if (done) { + return; } - - }; + actual.onCompleted(); + } + + @Override + public void setProducer(Producer p) { + actual.setProducer(p); + } } + static final class OuterInnerMapper implements Func1 { + final T outer; + final Func2 resultSelector; + + public OuterInnerMapper(T outer, Func2 resultSelector) { + this.outer = outer; + this.resultSelector = resultSelector; + } + + @Override + public R call(U inner) { + return resultSelector.call(outer, inner); + } + + } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorMapPairTest.java b/src/test/java/rx/internal/operators/OperatorMapPairTest.java new file mode 100644 index 0000000000..2554b5e2f3 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorMapPairTest.java @@ -0,0 +1,39 @@ +package rx.internal.operators; + +import org.junit.*; + +import rx.Observable; +import rx.exceptions.TestException; +import rx.functions.*; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +public class OperatorMapPairTest { + @Test + public void castCrashUnsubscribes() { + + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + ps.flatMap(new Func1>() { + @Override + public Observable call(Integer t) { + throw new TestException(); + } + }, new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1; + } + }).unsafeSubscribe(ts); + + Assert.assertTrue("Not subscribed?", ps.hasObservers()); + + ps.onNext(1); + + Assert.assertFalse("Subscribed?", ps.hasObservers()); + + ts.assertError(TestException.class); + } +}