From 13ce7a9ab347161dd44752071d13e643141b3f0a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 2 Feb 2016 09:43:01 +0100 Subject: [PATCH] 1.x: fix doOnRequest premature requesting --- .../operators/OperatorDoOnRequest.java | 1 + .../operators/OperatorDoOnRequestTest.java | 67 ++++++++++++++++--- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnRequest.java b/src/main/java/rx/internal/operators/OperatorDoOnRequest.java index d68c3497aa..419eb7046c 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnRequest.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnRequest.java @@ -57,6 +57,7 @@ private static final class ParentSubscriber extends Subscriber { ParentSubscriber(Subscriber child) { this.child = child; + this.request(0); } private void requestMore(long n) { diff --git a/src/test/java/rx/internal/operators/OperatorDoOnRequestTest.java b/src/test/java/rx/internal/operators/OperatorDoOnRequestTest.java index 34014094b6..80f5fd4fc8 100644 --- a/src/test/java/rx/internal/operators/OperatorDoOnRequestTest.java +++ b/src/test/java/rx/internal/operators/OperatorDoOnRequestTest.java @@ -1,19 +1,16 @@ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.*; +import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; +import rx.*; import rx.Observable; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Action1; +import rx.Observable.OnSubscribe; +import rx.functions.*; public class OperatorDoOnRequestTest { @@ -76,5 +73,55 @@ public void onNext(Integer t) { }); assertEquals(Arrays.asList(3L,1L,2L,3L,4L,5L), requests); } + + @Test + public void dontRequestIfDownstreamRequestsLate() { + final List requested = new ArrayList(); + + Action1 empty = Actions.empty(); + + final AtomicReference producer = new AtomicReference(); + + Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t) { + t.setProducer(new Producer() { + @Override + public void request(long n) { + requested.add(n); + } + }); + } + }).doOnRequest(empty).subscribe(new Subscriber() { + @Override + public void onNext(Object t) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onCompleted() { + + } + + @Override + public void setProducer(Producer p) { + producer.set(p); + } + }); + + producer.get().request(1); + int s = requested.size(); + if (s == 1) { + // this allows for an implementation that itself doesn't request + Assert.assertEquals(Arrays.asList(1L), requested); + } else { + Assert.assertEquals(Arrays.asList(0L, 1L), requested); + } + } }