From 7d8fdeed1e6bd1f723497d47ff4930129883c368 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 28 Apr 2016 21:56:43 +0200 Subject: [PATCH] 1.x: cast() should unsubscribe on crash eagerly --- .../rx/internal/operators/OperatorCast.java | 74 ++++++++++++++----- .../internal/operators/OperatorCastTest.java | 30 ++++++-- 2 files changed, 79 insertions(+), 25 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorCast.java b/src/main/java/rx/internal/operators/OperatorCast.java index 825847b5ce..02099f31ad 100644 --- a/src/main/java/rx/internal/operators/OperatorCast.java +++ b/src/main/java/rx/internal/operators/OperatorCast.java @@ -15,9 +15,10 @@ */ package rx.internal.operators; +import rx.*; import rx.Observable.Operator; import rx.exceptions.*; -import rx.Subscriber; +import rx.internal.util.RxJavaPluginUtils; /** * Converts the elements of an observable sequence to the specified type. @@ -32,26 +33,63 @@ public OperatorCast(Class castClass) { @Override public Subscriber call(final Subscriber o) { - return new Subscriber(o) { + CastSubscriber parent = new CastSubscriber(o, castClass); + o.add(parent); + return parent; + } + + static final class CastSubscriber extends Subscriber { + + final Subscriber actual; + + final Class castClass; - @Override - public void onCompleted() { - o.onCompleted(); + boolean done; + + public CastSubscriber(Subscriber actual, Class castClass) { + this.actual = actual; + this.castClass = castClass; + } + + @Override + public void onNext(T t) { + R result; + + try { + result = castClass.cast(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(ex, t)); + return; } - - @Override - public void onError(Throwable e) { - o.onError(e); + + actual.onNext(result); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; } - - @Override - public void onNext(T t) { - try { - o.onNext(castClass.cast(t)); - } catch (Throwable e) { - Exceptions.throwOrReport(e, this, t); - } + done = true; + + actual.onError(e); + } + + + @Override + public void onCompleted() { + if (done) { + return; } - }; + actual.onCompleted(); + } + + @Override + public void setProducer(Producer p) { + actual.setProducer(p); + } } } diff --git a/src/test/java/rx/internal/operators/OperatorCastTest.java b/src/test/java/rx/internal/operators/OperatorCastTest.java index 1f18b592c2..d67c1abfa4 100644 --- a/src/test/java/rx/internal/operators/OperatorCastTest.java +++ b/src/test/java/rx/internal/operators/OperatorCastTest.java @@ -15,15 +15,13 @@ */ package rx.internal.operators; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; -import org.junit.Test; +import org.junit.*; -import rx.Observable; -import rx.Observer; +import rx.*; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; public class OperatorCastTest { @@ -53,4 +51,22 @@ public void testCastWithWrongType() { verify(observer, times(1)).onError( org.mockito.Matchers.any(ClassCastException.class)); } + + @Test + public void castCrashUnsubscribes() { + + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + ps.cast(String.class).unsafeSubscribe(ts); + + Assert.assertTrue("Not subscribed?", ps.hasObservers()); + + ps.onNext(1); + + Assert.assertFalse("Subscribed?", ps.hasObservers()); + + ts.assertError(ClassCastException.class); + } }