Skip to content

Commit 40cd517

Browse files
committed
fix error handling in OperatorDistinctUntilChanged
1 parent 64956e7 commit 40cd517

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.Exceptions;
2021
import rx.functions.Func1;
2122
import rx.internal.util.UtilityFunctions;
2223

@@ -56,7 +57,13 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
5657
@Override
5758
public void onNext(T t) {
5859
U currentKey = previousKey;
59-
U key = keySelector.call(t);
60+
final U key;
61+
try {
62+
key = keySelector.call(t);
63+
} catch (Throwable e) {
64+
Exceptions.throwOrReport(e, child, t);
65+
return;
66+
}
6067
previousKey = key;
6168

6269
if (hasPrevious) {

src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertFalse;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.anyString;
2021
import static org.mockito.Mockito.inOrder;
@@ -23,6 +24,7 @@
2324
import static org.mockito.Mockito.verify;
2425
import static org.mockito.MockitoAnnotations.initMocks;
2526

27+
import java.util.concurrent.atomic.AtomicBoolean;
2628

2729
import org.junit.Before;
2830
import org.junit.Test;
@@ -31,6 +33,7 @@
3133

3234
import rx.Observable;
3335
import rx.Observer;
36+
import rx.functions.Action1;
3437
import rx.functions.Func1;
3538

3639
public class OperatorDistinctUntilChangedTest {
@@ -50,6 +53,15 @@ public String call(String s) {
5053
return s.toUpperCase();
5154
}
5255
};
56+
57+
final Func1<String, String> THROWS_NON_FATAL = new Func1<String, String>() {
58+
@Override
59+
public String call(String s) {
60+
throw new RuntimeException();
61+
}
62+
};
63+
64+
5365

5466
@Before
5567
public void before() {
@@ -138,4 +150,20 @@ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
138150
inOrder.verify(w, never()).onNext(anyString());
139151
inOrder.verify(w, never()).onCompleted();
140152
}
153+
154+
@Test
155+
public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
156+
Observable<String> src = Observable.just("a", "b", null, "c");
157+
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
158+
src
159+
.doOnError(new Action1<Throwable>() {
160+
@Override
161+
public void call(Throwable t) {
162+
errorOccurred.set(true);
163+
}
164+
})
165+
.distinctUntilChanged(THROWS_NON_FATAL)
166+
.subscribe(w);
167+
assertFalse(errorOccurred.get());
168+
}
141169
}

0 commit comments

Comments
 (0)