Skip to content

Commit 13ce7a9

Browse files
committed
1.x: fix doOnRequest premature requesting
1 parent c5a4902 commit 13ce7a9

File tree

2 files changed

+58
-10
lines changed

2 files changed

+58
-10
lines changed

src/main/java/rx/internal/operators/OperatorDoOnRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ private static final class ParentSubscriber<T> extends Subscriber<T> {
5757

5858
ParentSubscriber(Subscriber<? super T> child) {
5959
this.child = child;
60+
this.request(0);
6061
}
6162

6263
private void requestMore(long n) {

src/test/java/rx/internal/operators/OperatorDoOnRequestTest.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
package rx.internal.operators;
22

3-
import static org.junit.Assert.assertEquals;
4-
import static org.junit.Assert.assertTrue;
3+
import static org.junit.Assert.*;
54

6-
import java.util.ArrayList;
7-
import java.util.Arrays;
8-
import java.util.List;
9-
import java.util.concurrent.atomic.AtomicBoolean;
5+
import java.util.*;
6+
import java.util.concurrent.atomic.*;
107

11-
import org.junit.Test;
8+
import org.junit.*;
129

10+
import rx.*;
1311
import rx.Observable;
14-
import rx.Subscriber;
15-
import rx.functions.Action0;
16-
import rx.functions.Action1;
12+
import rx.Observable.OnSubscribe;
13+
import rx.functions.*;
1714

1815
public class OperatorDoOnRequestTest {
1916

@@ -76,5 +73,55 @@ public void onNext(Integer t) {
7673
});
7774
assertEquals(Arrays.asList(3L,1L,2L,3L,4L,5L), requests);
7875
}
76+
77+
@Test
78+
public void dontRequestIfDownstreamRequestsLate() {
79+
final List<Long> requested = new ArrayList<Long>();
80+
81+
Action1<Long> empty = Actions.empty();
82+
83+
final AtomicReference<Producer> producer = new AtomicReference<Producer>();
84+
85+
Observable.create(new OnSubscribe<Integer>() {
86+
@Override
87+
public void call(Subscriber<? super Integer> t) {
88+
t.setProducer(new Producer() {
89+
@Override
90+
public void request(long n) {
91+
requested.add(n);
92+
}
93+
});
94+
}
95+
}).doOnRequest(empty).subscribe(new Subscriber<Object>() {
96+
@Override
97+
public void onNext(Object t) {
98+
99+
}
100+
101+
@Override
102+
public void onError(Throwable e) {
103+
104+
}
105+
106+
@Override
107+
public void onCompleted() {
108+
109+
}
110+
111+
@Override
112+
public void setProducer(Producer p) {
113+
producer.set(p);
114+
}
115+
});
116+
117+
producer.get().request(1);
79118

119+
int s = requested.size();
120+
if (s == 1) {
121+
// this allows for an implementation that itself doesn't request
122+
Assert.assertEquals(Arrays.asList(1L), requested);
123+
} else {
124+
Assert.assertEquals(Arrays.asList(0L, 1L), requested);
125+
}
126+
}
80127
}

0 commit comments

Comments
 (0)