Skip to content

Commit dec05b2

Browse files
committed
1.x: map() and filter() should unsubscribe on crash eagerly (#3890)
1 parent 35b51b3 commit dec05b2

File tree

4 files changed

+179
-60
lines changed

4 files changed

+179
-60
lines changed

src/main/java/rx/internal/operators/OperatorFilter.java

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import rx.*;
1819
import rx.Observable.Operator;
19-
import rx.Subscriber;
2020
import rx.exceptions.*;
2121
import rx.functions.Func1;
22+
import rx.internal.util.RxJavaPluginUtils;
2223

2324
/**
2425
* Filters an Observable by discarding any items it emits that do not meet some test.
2526
* <p>
2627
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/filter.png" alt="">
28+
* @param <T> the value type
2729
*/
2830
public final class OperatorFilter<T> implements Operator<T, T> {
2931

@@ -35,33 +37,67 @@ public OperatorFilter(Func1<? super T, Boolean> predicate) {
3537

3638
@Override
3739
public Subscriber<? super T> call(final Subscriber<? super T> child) {
38-
return new Subscriber<T>(child) {
40+
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
41+
child.add(parent);
42+
return parent;
43+
}
3944

40-
@Override
41-
public void onCompleted() {
42-
child.onCompleted();
43-
}
45+
static final class FilterSubscriber<T> extends Subscriber<T> {
46+
47+
final Subscriber<? super T> actual;
48+
49+
final Func1<? super T, Boolean> predicate;
4450

45-
@Override
46-
public void onError(Throwable e) {
47-
child.onError(e);
51+
boolean done;
52+
53+
public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
54+
this.actual = actual;
55+
this.predicate = predicate;
56+
}
57+
58+
@Override
59+
public void onNext(T t) {
60+
boolean result;
61+
62+
try {
63+
result = predicate.call(t);
64+
} catch (Throwable ex) {
65+
Exceptions.throwIfFatal(ex);
66+
unsubscribe();
67+
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
68+
return;
4869
}
49-
50-
@Override
51-
public void onNext(T t) {
52-
try {
53-
if (predicate.call(t)) {
54-
child.onNext(t);
55-
} else {
56-
// TODO consider a more complicated version that batches these
57-
request(1);
58-
}
59-
} catch (Throwable e) {
60-
Exceptions.throwOrReport(e, child, t);
61-
}
70+
71+
if (result) {
72+
actual.onNext(t);
73+
} else {
74+
request(1);
6275
}
63-
64-
};
76+
}
77+
78+
@Override
79+
public void onError(Throwable e) {
80+
if (done) {
81+
RxJavaPluginUtils.handleException(e);
82+
return;
83+
}
84+
done = true;
85+
86+
actual.onError(e);
87+
}
88+
89+
90+
@Override
91+
public void onCompleted() {
92+
if (done) {
93+
return;
94+
}
95+
actual.onCompleted();
96+
}
97+
@Override
98+
public void setProducer(Producer p) {
99+
super.setProducer(p);
100+
actual.setProducer(p);
101+
}
65102
}
66-
67103
}

src/main/java/rx/internal/operators/OperatorMap.java

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import rx.*;
1819
import rx.Observable.Operator;
19-
import rx.Subscriber;
20-
import rx.exceptions.Exceptions;
20+
import rx.exceptions.*;
2121
import rx.functions.Func1;
22+
import rx.internal.util.RxJavaPluginUtils;
2223

2324
/**
2425
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
2526
* this transformation as a new {@code Observable}.
2627
* <p>
2728
* <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt="">
29+
*
30+
* @param <T> the input value type
31+
* @param <R> the return value type
2832
*/
2933
public final class OperatorMap<T, R> implements Operator<R, T> {
3034

@@ -36,28 +40,64 @@ public OperatorMap(Func1<? super T, ? extends R> transformer) {
3640

3741
@Override
3842
public Subscriber<? super T> call(final Subscriber<? super R> o) {
39-
return new Subscriber<T>(o) {
43+
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
44+
o.add(parent);
45+
return parent;
46+
}
47+
48+
static final class MapSubscriber<T, R> extends Subscriber<T> {
49+
50+
final Subscriber<? super R> actual;
51+
52+
final Func1<? super T, ? extends R> mapper;
4053

41-
@Override
42-
public void onCompleted() {
43-
o.onCompleted();
54+
boolean done;
55+
56+
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
57+
this.actual = actual;
58+
this.mapper = mapper;
59+
}
60+
61+
@Override
62+
public void onNext(T t) {
63+
R result;
64+
65+
try {
66+
result = mapper.call(t);
67+
} catch (Throwable ex) {
68+
Exceptions.throwIfFatal(ex);
69+
unsubscribe();
70+
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
71+
return;
4472
}
45-
46-
@Override
47-
public void onError(Throwable e) {
48-
o.onError(e);
73+
74+
actual.onNext(result);
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
if (done) {
80+
RxJavaPluginUtils.handleException(e);
81+
return;
4982
}
50-
51-
@Override
52-
public void onNext(T t) {
53-
try {
54-
o.onNext(transformer.call(t));
55-
} catch (Throwable e) {
56-
Exceptions.throwOrReport(e, this, t);
57-
}
83+
done = true;
84+
85+
actual.onError(e);
86+
}
87+
88+
89+
@Override
90+
public void onCompleted() {
91+
if (done) {
92+
return;
5893
}
59-
60-
};
94+
actual.onCompleted();
95+
}
96+
97+
@Override
98+
public void setProducer(Producer p) {
99+
actual.setProducer(p);
100+
}
61101
}
62102

63103
}

src/test/java/rx/internal/operators/OperatorFilterTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import rx.functions.*;
2929
import rx.internal.util.RxRingBuffer;
3030
import rx.observers.TestSubscriber;
31+
import rx.subjects.PublishSubject;
3132

3233
public class OperatorFilterTest {
3334

@@ -54,6 +55,7 @@ public Boolean call(String t1) {
5455

5556
/**
5657
* Make sure we are adjusting subscriber.request() for filtered items
58+
* @throws InterruptedException on interrupt
5759
*/
5860
@Test(timeout = 500)
5961
public void testWithBackpressure() throws InterruptedException {
@@ -100,6 +102,7 @@ public void onNext(String t) {
100102

101103
/**
102104
* Make sure we are adjusting subscriber.request() for filtered items
105+
* @throws InterruptedException on interrupt
103106
*/
104107
@Test(timeout = 500000)
105108
public void testWithBackpressure2() throws InterruptedException {
@@ -167,4 +170,28 @@ public void call(Integer t) {
167170
}
168171
}
169172
}
173+
174+
@Test
175+
public void functionCrashUnsubscribes() {
176+
177+
PublishSubject<Integer> ps = PublishSubject.create();
178+
179+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
180+
181+
ps.filter(new Func1<Integer, Boolean>() {
182+
@Override
183+
public Boolean call(Integer v) {
184+
throw new TestException();
185+
}
186+
}).unsafeSubscribe(ts);
187+
188+
Assert.assertTrue("Not subscribed?", ps.hasObservers());
189+
190+
ps.onNext(1);
191+
192+
Assert.assertFalse("Subscribed?", ps.hasObservers());
193+
194+
ts.assertError(TestException.class);
195+
}
196+
170197
}

src/test/java/rx/internal/operators/OperatorMapTest.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,21 @@
1616
package rx.internal.operators;
1717

1818
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.never;
20-
import static org.mockito.Mockito.times;
21-
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Mockito.*;
2220

23-
import java.util.HashMap;
24-
import java.util.Map;
25-
import java.util.NoSuchElementException;
21+
import java.util.*;
2622

27-
import org.junit.Before;
28-
import org.junit.Test;
29-
import org.mockito.Mock;
30-
import org.mockito.MockitoAnnotations;
23+
import org.junit.*;
24+
import org.mockito.*;
3125

3226
import rx.Observable;
3327
import rx.Observer;
3428
import rx.Subscriber;
35-
import rx.exceptions.OnErrorNotImplementedException;
36-
import rx.functions.Action1;
37-
import rx.functions.Func1;
38-
import rx.functions.Func2;
39-
import rx.internal.operators.OperatorMap;
29+
import rx.exceptions.*;
30+
import rx.functions.*;
31+
import rx.observers.TestSubscriber;
4032
import rx.schedulers.Schedulers;
33+
import rx.subjects.PublishSubject;
4134

4235
public class OperatorMapTest {
4336

@@ -339,4 +332,27 @@ public void call(String s) {
339332
}
340333
});
341334
}
335+
336+
@Test
337+
public void functionCrashUnsubscribes() {
338+
339+
PublishSubject<Integer> ps = PublishSubject.create();
340+
341+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
342+
343+
ps.map(new Func1<Integer, Integer>() {
344+
@Override
345+
public Integer call(Integer v) {
346+
throw new TestException();
347+
}
348+
}).unsafeSubscribe(ts);
349+
350+
Assert.assertTrue("Not subscribed?", ps.hasObservers());
351+
352+
ps.onNext(1);
353+
354+
Assert.assertFalse("Subscribed?", ps.hasObservers());
355+
356+
ts.assertError(TestException.class);
357+
}
342358
}

0 commit comments

Comments
 (0)