Skip to content

Commit 6daf59e

Browse files
authored
2.x: fix Maybe.concat() subscribe-after-cancel, verify others (#5101)
1 parent 5a8be22 commit 6daf59e

File tree

8 files changed

+302
-6
lines changed

8 files changed

+302
-6
lines changed

src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,10 @@ void drain() {
114114

115115
AtomicReference<Object> c = current;
116116
Subscriber<? super T> a = actual;
117+
Disposable cancelled = disposables;
117118

118119
for (;;) {
119-
if (disposables.isDisposed()) {
120+
if (cancelled.isDisposed()) {
120121
c.lazySet(null);
121122
return;
122123
}
@@ -141,7 +142,7 @@ void drain() {
141142
c.lazySet(null);
142143
}
143144

144-
if (goNextSource) {
145+
if (goNextSource && !cancelled.isDisposed()) {
145146
int i = index;
146147
if (i == sources.length) {
147148
a.onComplete();

src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayDelayError.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,10 @@ void drain() {
124124

125125
AtomicReference<Object> c = current;
126126
Subscriber<? super T> a = actual;
127+
Disposable cancelled = disposables;
127128

128129
for (;;) {
129-
if (disposables.isDisposed()) {
130+
if (cancelled.isDisposed()) {
130131
c.lazySet(null);
131132
return;
132133
}
@@ -151,7 +152,7 @@ void drain() {
151152
c.lazySet(null);
152153
}
153154

154-
if (goNextSource) {
155+
if (goNextSource && !cancelled.isDisposed()) {
155156
int i = index;
156157
if (i == sources.length) {
157158
Throwable ex = errors.get();

src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,10 @@ void drain() {
126126

127127
AtomicReference<Object> c = current;
128128
Subscriber<? super T> a = actual;
129+
Disposable cancelled = disposables;
129130

130131
for (;;) {
131-
if (disposables.isDisposed()) {
132+
if (cancelled.isDisposed()) {
132133
c.lazySet(null);
133134
return;
134135
}
@@ -153,7 +154,7 @@ void drain() {
153154
c.lazySet(null);
154155
}
155156

156-
if (goNextSource) {
157+
if (goNextSource && !cancelled.isDisposed()) {
157158
boolean b;
158159

159160
try {

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,4 +1542,88 @@ public Publisher<Object> apply(Integer v) throws Exception {
15421542
.test()
15431543
.assertFailure(TestException.class);
15441544
}
1545+
1546+
@SuppressWarnings("unchecked")
1547+
@Test
1548+
public void noSubsequentSubscription() {
1549+
final int[] calls = { 0 };
1550+
1551+
Flowable<Integer> source = Flowable.create(new FlowableOnSubscribe<Integer>() {
1552+
@Override
1553+
public void subscribe(FlowableEmitter<Integer> s) throws Exception {
1554+
calls[0]++;
1555+
s.onNext(1);
1556+
s.onComplete();
1557+
}
1558+
}, BackpressureStrategy.MISSING);
1559+
1560+
Flowable.concatArray(source, source).firstElement()
1561+
.test()
1562+
.assertResult(1);
1563+
1564+
assertEquals(1, calls[0]);
1565+
}
1566+
1567+
@SuppressWarnings("unchecked")
1568+
@Test
1569+
public void noSubsequentSubscriptionDelayError() {
1570+
final int[] calls = { 0 };
1571+
1572+
Flowable<Integer> source = Flowable.create(new FlowableOnSubscribe<Integer>() {
1573+
@Override
1574+
public void subscribe(FlowableEmitter<Integer> s) throws Exception {
1575+
calls[0]++;
1576+
s.onNext(1);
1577+
s.onComplete();
1578+
}
1579+
}, BackpressureStrategy.MISSING);
1580+
1581+
Flowable.concatArrayDelayError(source, source).firstElement()
1582+
.test()
1583+
.assertResult(1);
1584+
1585+
assertEquals(1, calls[0]);
1586+
}
1587+
1588+
@SuppressWarnings("unchecked")
1589+
@Test
1590+
public void noSubsequentSubscriptionIterable() {
1591+
final int[] calls = { 0 };
1592+
1593+
Flowable<Integer> source = Flowable.create(new FlowableOnSubscribe<Integer>() {
1594+
@Override
1595+
public void subscribe(FlowableEmitter<Integer> s) throws Exception {
1596+
calls[0]++;
1597+
s.onNext(1);
1598+
s.onComplete();
1599+
}
1600+
}, BackpressureStrategy.MISSING);
1601+
1602+
Flowable.concat(Arrays.asList(source, source)).firstElement()
1603+
.test()
1604+
.assertResult(1);
1605+
1606+
assertEquals(1, calls[0]);
1607+
}
1608+
1609+
@SuppressWarnings("unchecked")
1610+
@Test
1611+
public void noSubsequentSubscriptionDelayErrorIterable() {
1612+
final int[] calls = { 0 };
1613+
1614+
Flowable<Integer> source = Flowable.create(new FlowableOnSubscribe<Integer>() {
1615+
@Override
1616+
public void subscribe(FlowableEmitter<Integer> s) throws Exception {
1617+
calls[0]++;
1618+
s.onNext(1);
1619+
s.onComplete();
1620+
}
1621+
}, BackpressureStrategy.MISSING);
1622+
1623+
Flowable.concatDelayError(Arrays.asList(source, source)).firstElement()
1624+
.test()
1625+
.assertResult(1);
1626+
1627+
assertEquals(1, calls[0]);
1628+
}
15451629
}

src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.IOException;
1717
import java.util.List;
1818

19+
import static org.junit.Assert.*;
1920
import org.junit.Test;
2021

2122
import io.reactivex.*;
@@ -156,4 +157,44 @@ protected void subscribeActual(MaybeObserver<? super Integer> observer) {
156157
RxJavaPlugins.reset();
157158
}
158159
}
160+
161+
@SuppressWarnings("unchecked")
162+
@Test
163+
public void noSubsequentSubscription() {
164+
final int[] calls = { 0 };
165+
166+
Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() {
167+
@Override
168+
public void subscribe(MaybeEmitter<Integer> s) throws Exception {
169+
calls[0]++;
170+
s.onSuccess(1);
171+
}
172+
});
173+
174+
Maybe.concatArray(source, source).firstElement()
175+
.test()
176+
.assertResult(1);
177+
178+
assertEquals(1, calls[0]);
179+
}
180+
181+
@SuppressWarnings("unchecked")
182+
@Test
183+
public void noSubsequentSubscriptionDelayError() {
184+
final int[] calls = { 0 };
185+
186+
Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() {
187+
@Override
188+
public void subscribe(MaybeEmitter<Integer> s) throws Exception {
189+
calls[0]++;
190+
s.onSuccess(1);
191+
}
192+
});
193+
194+
Maybe.concatArrayDelayError(source, source).firstElement()
195+
.test()
196+
.assertResult(1);
197+
198+
assertEquals(1, calls[0]);
199+
}
159200
}

src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.reactivex.internal.operators.maybe;
1515

16+
import static org.junit.Assert.assertEquals;
17+
1618
import java.util.*;
1719

1820
import org.junit.Test;
@@ -121,4 +123,44 @@ public Maybe<Integer> apply(Integer v) throws Exception {
121123
.test()
122124
.assertFailure(NullPointerException.class);
123125
}
126+
127+
@SuppressWarnings("unchecked")
128+
@Test
129+
public void noSubsequentSubscription() {
130+
final int[] calls = { 0 };
131+
132+
Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() {
133+
@Override
134+
public void subscribe(MaybeEmitter<Integer> s) throws Exception {
135+
calls[0]++;
136+
s.onSuccess(1);
137+
}
138+
});
139+
140+
Maybe.concat(Arrays.asList(source, source)).firstElement()
141+
.test()
142+
.assertResult(1);
143+
144+
assertEquals(1, calls[0]);
145+
}
146+
147+
@SuppressWarnings("unchecked")
148+
@Test
149+
public void noSubsequentSubscriptionDelayError() {
150+
final int[] calls = { 0 };
151+
152+
Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() {
153+
@Override
154+
public void subscribe(MaybeEmitter<Integer> s) throws Exception {
155+
calls[0]++;
156+
s.onSuccess(1);
157+
}
158+
});
159+
160+
Maybe.concatDelayError(Arrays.asList(source, source)).firstElement()
161+
.test()
162+
.assertResult(1);
163+
164+
assertEquals(1, calls[0]);
165+
}
124166
}

src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -958,4 +958,87 @@ public ObservableSource<Integer> apply(Object v) throws Exception {
958958

959959
}
960960

961+
@SuppressWarnings("unchecked")
962+
@Test
963+
public void noSubsequentSubscription() {
964+
final int[] calls = { 0 };
965+
966+
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
967+
@Override
968+
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
969+
calls[0]++;
970+
s.onNext(1);
971+
s.onComplete();
972+
}
973+
});
974+
975+
Observable.concatArray(source, source).firstElement()
976+
.test()
977+
.assertResult(1);
978+
979+
assertEquals(1, calls[0]);
980+
}
981+
982+
@SuppressWarnings("unchecked")
983+
@Test
984+
public void noSubsequentSubscriptionDelayError() {
985+
final int[] calls = { 0 };
986+
987+
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
988+
@Override
989+
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
990+
calls[0]++;
991+
s.onNext(1);
992+
s.onComplete();
993+
}
994+
});
995+
996+
Observable.concatArrayDelayError(source, source).firstElement()
997+
.test()
998+
.assertResult(1);
999+
1000+
assertEquals(1, calls[0]);
1001+
}
1002+
1003+
@SuppressWarnings("unchecked")
1004+
@Test
1005+
public void noSubsequentSubscriptionIterable() {
1006+
final int[] calls = { 0 };
1007+
1008+
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
1009+
@Override
1010+
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
1011+
calls[0]++;
1012+
s.onNext(1);
1013+
s.onComplete();
1014+
}
1015+
});
1016+
1017+
Observable.concat(Arrays.asList(source, source)).firstElement()
1018+
.test()
1019+
.assertResult(1);
1020+
1021+
assertEquals(1, calls[0]);
1022+
}
1023+
1024+
@SuppressWarnings("unchecked")
1025+
@Test
1026+
public void noSubsequentSubscriptionDelayErrorIterable() {
1027+
final int[] calls = { 0 };
1028+
1029+
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
1030+
@Override
1031+
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
1032+
calls[0]++;
1033+
s.onNext(1);
1034+
s.onComplete();
1035+
}
1036+
});
1037+
1038+
Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
1039+
.test()
1040+
.assertResult(1);
1041+
1042+
assertEquals(1, calls[0]);
1043+
}
9611044
}

0 commit comments

Comments
 (0)