Skip to content

Commit 300ab1e

Browse files
committed
fix error handling in OperatorDistinctUntilChanged
1 parent 64956e7 commit 300ab1e

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.Exceptions;
2021
import rx.functions.Func1;
2122
import rx.internal.util.UtilityFunctions;
2223

@@ -56,7 +57,13 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
5657
@Override
5758
public void onNext(T t) {
5859
U currentKey = previousKey;
59-
U key = keySelector.call(t);
60+
final U key;
61+
try {
62+
key = keySelector.call(t);
63+
} catch (Throwable e) {
64+
Exceptions.throwOrReport(e, child, t);
65+
return;
66+
}
6067
previousKey = key;
6168

6269
if (hasPrevious) {

src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertFalse;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.anyString;
2021
import static org.mockito.Mockito.inOrder;
@@ -23,6 +24,7 @@
2324
import static org.mockito.Mockito.verify;
2425
import static org.mockito.MockitoAnnotations.initMocks;
2526

27+
import java.util.concurrent.atomic.AtomicBoolean;
2628

2729
import org.junit.Before;
2830
import org.junit.Test;
@@ -31,17 +33,18 @@
3133

3234
import rx.Observable;
3335
import rx.Observer;
36+
import rx.functions.Action1;
3437
import rx.functions.Func1;
3538

3639
public class OperatorDistinctUntilChangedTest {
3740

3841
@Mock
39-
Observer<String> w;
42+
private Observer<String> w;
4043
@Mock
41-
Observer<String> w2;
44+
private Observer<String> w2;
4245

4346
// nulls lead to exceptions
44-
final Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
47+
private final static Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
4548
@Override
4649
public String call(String s) {
4750
if (s.equals("x")) {
@@ -50,6 +53,13 @@ public String call(String s) {
5053
return s.toUpperCase();
5154
}
5255
};
56+
57+
private final static Func1<String, String> THROWS_NON_FATAL = new Func1<String, String>() {
58+
@Override
59+
public String call(String s) {
60+
throw new RuntimeException();
61+
}
62+
};
5363

5464
@Before
5565
public void before() {
@@ -138,4 +148,20 @@ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
138148
inOrder.verify(w, never()).onNext(anyString());
139149
inOrder.verify(w, never()).onCompleted();
140150
}
151+
152+
@Test
153+
public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
154+
Observable<String> src = Observable.just("a", "b", null, "c");
155+
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
156+
src
157+
.doOnError(new Action1<Throwable>() {
158+
@Override
159+
public void call(Throwable t) {
160+
errorOccurred.set(true);
161+
}
162+
})
163+
.distinctUntilChanged(THROWS_NON_FATAL)
164+
.subscribe(w);
165+
assertFalse(errorOccurred.get());
166+
}
141167
}

0 commit comments

Comments
 (0)