Skip to content

Commit d9e2df9

Browse files
authored
2.x: fix doOnSubscribe signalling Undeliv.Exception instead of onError (#5103)
1 parent 61af0c0 commit d9e2df9

File tree

8 files changed

+272
-70
lines changed

8 files changed

+272
-70
lines changed

src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public void onSubscribe(Disposable s) {
4343
} catch (Throwable e) {
4444
Exceptions.throwIfFatal(e);
4545
s.dispose();
46-
RxJavaPlugins.onError(e);
47-
46+
this.s = DisposableHelper.DISPOSED;
4847
EmptyDisposable.error(e, actual);
4948
return;
5049
}
@@ -61,12 +60,18 @@ public void onNext(T t) {
6160

6261
@Override
6362
public void onError(Throwable t) {
64-
actual.onError(t);
63+
if (s != DisposableHelper.DISPOSED) {
64+
actual.onError(t);
65+
} else {
66+
RxJavaPlugins.onError(t);
67+
}
6568
}
6669

6770
@Override
6871
public void onComplete() {
69-
actual.onComplete();
72+
if (s != DisposableHelper.DISPOSED) {
73+
actual.onComplete();
74+
}
7075
}
7176

7277

src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java

Lines changed: 82 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
package io.reactivex.internal.operators.completable;
1515

1616
import io.reactivex.*;
17-
import io.reactivex.disposables.*;
17+
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.exceptions.*;
1919
import io.reactivex.functions.*;
20-
import io.reactivex.internal.disposables.EmptyDisposable;
20+
import io.reactivex.internal.disposables.*;
2121
import io.reactivex.plugins.RxJavaPlugins;
2222

2323
public final class CompletablePeek extends Completable {
@@ -48,77 +48,99 @@ public CompletablePeek(CompletableSource source, Consumer<? super Disposable> on
4848
@Override
4949
protected void subscribeActual(final CompletableObserver s) {
5050

51-
source.subscribe(new CompletableObserver() {
51+
source.subscribe(new CompletableObserverImplementation(s));
52+
}
5253

53-
@Override
54-
public void onComplete() {
55-
try {
56-
onComplete.run();
57-
onTerminate.run();
58-
} catch (Throwable e) {
59-
Exceptions.throwIfFatal(e);
60-
s.onError(e);
61-
return;
62-
}
54+
final class CompletableObserverImplementation implements CompletableObserver, Disposable {
6355

64-
s.onComplete();
56+
final CompletableObserver actual;
6557

66-
doAfter();
67-
}
58+
Disposable d;
6859

69-
@Override
70-
public void onError(Throwable e) {
71-
try {
72-
onError.accept(e);
73-
onTerminate.run();
74-
} catch (Throwable ex) {
75-
Exceptions.throwIfFatal(ex);
76-
e = new CompositeException(e, ex);
77-
}
60+
private CompletableObserverImplementation(CompletableObserver actual) {
61+
this.actual = actual;
62+
}
7863

79-
s.onError(e);
8064

81-
doAfter();
65+
@Override
66+
public void onSubscribe(final Disposable d) {
67+
try {
68+
onSubscribe.accept(d);
69+
} catch (Throwable ex) {
70+
Exceptions.throwIfFatal(ex);
71+
d.dispose();
72+
this.d = DisposableHelper.DISPOSED;
73+
EmptyDisposable.error(ex, actual);
74+
return;
75+
}
76+
if (DisposableHelper.validate(this.d, d)) {
77+
this.d = d;
78+
actual.onSubscribe(this);
8279
}
80+
}
8381

84-
@Override
85-
public void onSubscribe(final Disposable d) {
86-
87-
try {
88-
onSubscribe.accept(d);
89-
} catch (Throwable ex) {
90-
Exceptions.throwIfFatal(ex);
91-
d.dispose();
92-
EmptyDisposable.error(ex, s);
93-
return;
94-
}
95-
96-
s.onSubscribe(Disposables.fromRunnable(new Runnable() {
97-
@Override
98-
public void run() {
99-
try {
100-
onDispose.run();
101-
} catch (Throwable e) {
102-
Exceptions.throwIfFatal(e);
103-
RxJavaPlugins.onError(e);
104-
}
105-
d.dispose();
106-
}
107-
}));
82+
@Override
83+
public void onError(Throwable e) {
84+
if (d == DisposableHelper.DISPOSED) {
85+
RxJavaPlugins.onError(e);
86+
return;
87+
}
88+
try {
89+
onError.accept(e);
90+
onTerminate.run();
91+
} catch (Throwable ex) {
92+
Exceptions.throwIfFatal(ex);
93+
e = new CompositeException(e, ex);
10894
}
10995

110-
void doAfter() {
96+
actual.onError(e);
11197

112-
try {
113-
onAfterTerminate.run();
114-
} catch (Throwable ex) {
115-
Exceptions.throwIfFatal(ex);
116-
RxJavaPlugins.onError(ex);
117-
}
98+
doAfter();
99+
}
118100

101+
@Override
102+
public void onComplete() {
103+
if (d == DisposableHelper.DISPOSED) {
104+
return;
119105
}
120-
});
121-
}
122106

107+
try {
108+
onComplete.run();
109+
onTerminate.run();
110+
} catch (Throwable e) {
111+
Exceptions.throwIfFatal(e);
112+
actual.onError(e);
113+
return;
114+
}
115+
116+
actual.onComplete();
117+
118+
doAfter();
119+
}
120+
121+
void doAfter() {
122+
try {
123+
onAfterTerminate.run();
124+
} catch (Throwable ex) {
125+
Exceptions.throwIfFatal(ex);
126+
RxJavaPlugins.onError(ex);
127+
}
128+
}
129+
130+
@Override
131+
public void dispose() {
132+
try {
133+
onDispose.run();
134+
} catch (Throwable e) {
135+
Exceptions.throwIfFatal(e);
136+
RxJavaPlugins.onError(e);
137+
}
138+
d.dispose();
139+
}
123140

141+
@Override
142+
public boolean isDisposed() {
143+
return d.isDisposed();
144+
}
145+
}
124146
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void onSubscribe(Subscription s) {
6464
} catch (Throwable e) {
6565
Exceptions.throwIfFatal(e);
6666
s.cancel();
67-
RxJavaPlugins.onError(e);
67+
this.s = SubscriptionHelper.CANCELLED;
6868
EmptySubscription.error(e, actual);
6969
return;
7070
}
@@ -81,12 +81,18 @@ public void onNext(T t) {
8181

8282
@Override
8383
public void onError(Throwable t) {
84-
actual.onError(t);
84+
if (s != SubscriptionHelper.CANCELLED) {
85+
actual.onError(t);
86+
} else {
87+
RxJavaPlugins.onError(t);
88+
}
8589
}
8690

8791
@Override
8892
public void onComplete() {
89-
actual.onComplete();
93+
if (s != SubscriptionHelper.CANCELLED) {
94+
actual.onComplete();
95+
}
9096
}
9197

9298
@Override

src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.junit.*;
2121

2222
import io.reactivex.*;
23+
import io.reactivex.disposables.*;
2324
import io.reactivex.exceptions.*;
2425
import io.reactivex.functions.*;
2526
import io.reactivex.observers.TestObserver;
27+
import io.reactivex.plugins.RxJavaPlugins;
2628

2729
public class CompletableDoOnTest {
2830

@@ -74,4 +76,35 @@ public void run() throws Exception {
7476

7577
assertTrue(atomicBoolean.get());
7678
}
79+
80+
@Test
81+
public void onSubscribeCrash() {
82+
List<Throwable> errors = TestHelper.trackPluginErrors();
83+
try {
84+
final Disposable bs = Disposables.empty();
85+
86+
new Completable() {
87+
@Override
88+
protected void subscribeActual(CompletableObserver s) {
89+
s.onSubscribe(bs);
90+
s.onError(new TestException("Second"));
91+
s.onComplete();
92+
}
93+
}
94+
.doOnSubscribe(new Consumer<Disposable>() {
95+
@Override
96+
public void accept(Disposable s) throws Exception {
97+
throw new TestException("First");
98+
}
99+
})
100+
.test()
101+
.assertFailureAndMessage(TestException.class, "First");
102+
103+
assertTrue(bs.isDisposed());
104+
105+
TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
106+
} finally {
107+
RxJavaPlugins.reset();
108+
}
109+
}
77110
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import static org.junit.Assert.*;
17+
1618
import java.util.List;
1719

18-
import static org.junit.Assert.*;
1920
import org.junit.Test;
2021
import org.reactivestreams.*;
2122

2223
import io.reactivex.*;
2324
import io.reactivex.exceptions.TestException;
2425
import io.reactivex.functions.*;
2526
import io.reactivex.internal.functions.Functions;
27+
import io.reactivex.internal.subscriptions.BooleanSubscription;
2628
import io.reactivex.plugins.RxJavaPlugins;
2729

2830
public class FlowableDoOnLifecycleTest {
@@ -132,4 +134,35 @@ public void run() throws Exception {
132134
RxJavaPlugins.reset();
133135
}
134136
}
137+
138+
@Test
139+
public void onSubscribeCrash() {
140+
List<Throwable> errors = TestHelper.trackPluginErrors();
141+
try {
142+
final BooleanSubscription bs = new BooleanSubscription();
143+
144+
new Flowable<Integer>() {
145+
@Override
146+
protected void subscribeActual(Subscriber<? super Integer> s) {
147+
s.onSubscribe(bs);
148+
s.onError(new TestException("Second"));
149+
s.onComplete();
150+
}
151+
}
152+
.doOnSubscribe(new Consumer<Subscription>() {
153+
@Override
154+
public void accept(Subscription s) throws Exception {
155+
throw new TestException("First");
156+
}
157+
})
158+
.test()
159+
.assertFailureAndMessage(TestException.class, "First");
160+
161+
assertTrue(bs.isCancelled());
162+
163+
TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
164+
} finally {
165+
RxJavaPlugins.reset();
166+
}
167+
}
135168
}

src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,17 @@
1313

1414
package io.reactivex.internal.operators.maybe;
1515

16+
import static org.junit.Assert.assertTrue;
17+
18+
import java.util.List;
19+
1620
import org.junit.Test;
1721

1822
import io.reactivex.*;
23+
import io.reactivex.disposables.*;
24+
import io.reactivex.exceptions.TestException;
1925
import io.reactivex.functions.*;
26+
import io.reactivex.plugins.RxJavaPlugins;
2027
import io.reactivex.subjects.PublishSubject;
2128

2229
public class MaybeDoOnEventTest {
@@ -45,4 +52,36 @@ public void accept(Integer v, Throwable e) throws Exception {
4552
}
4653
});
4754
}
55+
56+
@Test
57+
public void onSubscribeCrash() {
58+
List<Throwable> errors = TestHelper.trackPluginErrors();
59+
try {
60+
final Disposable bs = Disposables.empty();
61+
62+
new Maybe<Integer>() {
63+
@Override
64+
protected void subscribeActual(MaybeObserver<? super Integer> s) {
65+
s.onSubscribe(bs);
66+
s.onError(new TestException("Second"));
67+
s.onComplete();
68+
s.onSuccess(1);
69+
}
70+
}
71+
.doOnSubscribe(new Consumer<Disposable>() {
72+
@Override
73+
public void accept(Disposable s) throws Exception {
74+
throw new TestException("First");
75+
}
76+
})
77+
.test()
78+
.assertFailureAndMessage(TestException.class, "First");
79+
80+
assertTrue(bs.isDisposed());
81+
82+
TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
83+
} finally {
84+
RxJavaPlugins.reset();
85+
}
86+
}
4887
}

0 commit comments

Comments
 (0)