Skip to content

Commit 92fe02d

Browse files
committed
Merge pull request #3455 from konmik/on-error-failed-exception-fix
OnErrorFailedException fix
2 parents cc28527 + 974b651 commit 92fe02d

File tree

3 files changed

+71
-16
lines changed

3 files changed

+71
-16
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,11 @@ public void call(Subscriber<? super R> o) {
164164
// localized capture of errors rather than it skipping all operators
165165
// and ending up in the try/catch of the subscribe method which then
166166
// prevents onErrorResumeNext and other similar approaches to error handling
167-
if (e instanceof OnErrorNotImplementedException) {
168-
throw (OnErrorNotImplementedException) e;
169-
}
167+
Exceptions.throwIfFatal(e);
170168
st.onError(e);
171169
}
172170
} catch (Throwable e) {
173-
if (e instanceof OnErrorNotImplementedException) {
174-
throw (OnErrorNotImplementedException) e;
175-
}
171+
Exceptions.throwIfFatal(e);
176172
// if the lift function failed all we can do is pass the error to the final Subscriber
177173
// as we don't have the operator available to us
178174
o.onError(e);

src/main/java/rx/exceptions/Exceptions.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,7 @@ public static void throwIfFatal(Throwable t) {
7676
if (t instanceof OnErrorNotImplementedException) {
7777
throw (OnErrorNotImplementedException) t;
7878
} else if (t instanceof OnErrorFailedException) {
79-
Throwable cause = t.getCause();
80-
if (cause instanceof RuntimeException) {
81-
throw (RuntimeException) cause;
82-
} else {
83-
throw (OnErrorFailedException) t;
84-
}
79+
throw (OnErrorFailedException) t;
8580
}
8681
// values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
8782
else if (t instanceof StackOverflowError) {

src/test/java/rx/exceptions/ExceptionsTest.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import rx.Observable;
2626
import rx.Observer;
2727
import rx.functions.Action1;
28+
import rx.functions.Func1;
29+
import rx.observables.GroupedObservable;
2830
import rx.subjects.PublishSubject;
2931

3032
public class ExceptionsTest {
@@ -45,7 +47,7 @@ public void call(Integer t1) {
4547
public void testStackOverflowWouldOccur() {
4648
final PublishSubject<Integer> a = PublishSubject.create();
4749
final PublishSubject<Integer> b = PublishSubject.create();
48-
final int MAX_STACK_DEPTH = 1000;
50+
final int MAX_STACK_DEPTH = 800;
4951
final AtomicInteger depth = new AtomicInteger();
5052

5153
a.subscribe(new Observer<Integer>() {
@@ -156,10 +158,72 @@ public void onNext(Object o) {
156158
}
157159
});
158160
fail("expecting an exception to be thrown");
159-
} catch (CompositeException t) {
160-
assertTrue(t.getExceptions().get(0) instanceof IllegalArgumentException);
161-
assertTrue(t.getExceptions().get(1) instanceof IllegalStateException);
161+
} catch (OnErrorFailedException t) {
162+
CompositeException cause = (CompositeException) t.getCause();
163+
assertTrue(cause.getExceptions().get(0) instanceof IllegalArgumentException);
164+
assertTrue(cause.getExceptions().get(1) instanceof IllegalStateException);
162165
}
163166
}
164167

168+
/**
169+
* https://github.com/ReactiveX/RxJava/issues/2998
170+
*/
171+
@Test(expected = OnErrorFailedException.class)
172+
public void testOnErrorExceptionIsThrownFromGroupBy() throws Exception {
173+
Observable
174+
.just(1)
175+
.groupBy(new Func1<Integer, Integer>() {
176+
@Override
177+
public Integer call(Integer integer) {
178+
throw new RuntimeException();
179+
}
180+
})
181+
.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
182+
@Override
183+
public void onCompleted() {
184+
185+
}
186+
187+
@Override
188+
public void onError(Throwable e) {
189+
throw new RuntimeException();
190+
}
191+
192+
@Override
193+
public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
194+
195+
}
196+
});
197+
}
198+
199+
/**
200+
* https://github.com/ReactiveX/RxJava/issues/2998
201+
*/
202+
@Test(expected = OnErrorFailedException.class)
203+
public void testOnErrorExceptionIsThrownFromOnNext() throws Exception {
204+
Observable
205+
.just(1)
206+
.doOnNext(new Action1<Integer>() {
207+
@Override
208+
public void call(Integer integer) {
209+
throw new RuntimeException();
210+
}
211+
})
212+
.subscribe(new Observer<Integer>() {
213+
@Override
214+
public void onCompleted() {
215+
216+
}
217+
218+
@Override
219+
public void onError(Throwable e) {
220+
throw new RuntimeException();
221+
}
222+
223+
@Override
224+
public void onNext(Integer integer) {
225+
226+
}
227+
});
228+
}
165229
}

0 commit comments

Comments
 (0)