Skip to content

Commit 9b09750

Browse files
committed
1.x: fix doOnRequest premature requesting.
When the operator is assembled and it sets the producer on the child, if the child doesn't request until the assembly is complete, doOnRequest requested the default Long.MAX_VALUE.
1 parent c5a4902 commit 9b09750

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

src/main/java/rx/internal/operators/OperatorDoOnRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ private static final class ParentSubscriber<T> extends Subscriber<T> {
5757

5858
ParentSubscriber(Subscriber<? super T> child) {
5959
this.child = child;
60+
this.request(0);
6061
}
6162

6263
private void requestMore(long n) {

src/test/java/rx/internal/operators/OperatorDoOnRequestTest.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
package rx.internal.operators;
22

3-
import static org.junit.Assert.assertEquals;
4-
import static org.junit.Assert.assertTrue;
3+
import static org.junit.Assert.*;
54

6-
import java.util.ArrayList;
7-
import java.util.Arrays;
8-
import java.util.List;
9-
import java.util.concurrent.atomic.AtomicBoolean;
5+
import java.util.*;
6+
import java.util.concurrent.atomic.*;
107

11-
import org.junit.Test;
8+
import org.junit.*;
129

10+
import rx.*;
1311
import rx.Observable;
14-
import rx.Subscriber;
15-
import rx.functions.Action0;
16-
import rx.functions.Action1;
12+
import rx.Observable.OnSubscribe;
13+
import rx.functions.*;
1714

1815
public class OperatorDoOnRequestTest {
1916

@@ -76,5 +73,51 @@ public void onNext(Integer t) {
7673
});
7774
assertEquals(Arrays.asList(3L,1L,2L,3L,4L,5L), requests);
7875
}
76+
77+
@Test
78+
public void dontRequestIfDownstreamRequestsLate() {
79+
final List<Long> requested = new ArrayList<Long>();
7980

81+
Action1<Long> empty = Actions.empty();
82+
83+
final AtomicReference<Producer> producer = new AtomicReference<Producer>();
84+
85+
Observable.create(new OnSubscribe<Integer>() {
86+
@Override
87+
public void call(Subscriber<? super Integer> t) {
88+
t.setProducer(new Producer() {
89+
@Override
90+
public void request(long n) {
91+
requested.add(n);
92+
}
93+
});
94+
}
95+
}).doOnRequest(empty).subscribe(new Subscriber<Object>() {
96+
@Override
97+
public void onNext(Object t) {
98+
99+
}
100+
101+
@Override
102+
public void onError(Throwable e) {
103+
104+
}
105+
106+
@Override
107+
public void onCompleted() {
108+
109+
}
110+
111+
@Override
112+
public void setProducer(Producer p) {
113+
producer.set(p);
114+
}
115+
});
116+
117+
producer.get().request(1);
118+
119+
System.out.println(requested);
120+
Assert.assertEquals(2, requested.size());
121+
Assert.assertEquals(Arrays.asList(0L, 1L), requested);
122+
}
80123
}

0 commit comments

Comments
 (0)