From b22fe94b36dafe818be9de12b772827bd2af0464 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 18 Jun 2015 14:24:16 +0200 Subject: [PATCH] Operator requestBatching to change the in-flight event amounts --- src/main/java/rx/Observable.java | 18 + .../operators/OperatorRequestBatcher.java | 125 +++++++ .../operators/OperatorRequestBatcherTest.java | 318 ++++++++++++++++++ 3 files changed, 461 insertions(+) create mode 100644 src/main/java/rx/internal/operators/OperatorRequestBatcher.java create mode 100644 src/test/java/rx/internal/operators/OperatorRequestBatcherTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index c576ea59f9..2fab6cca6d 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -6612,6 +6612,24 @@ public Throwable call(Notification notification) { return OnSubscribeRedo. retry(this, dematerializedNotificationHandler, scheduler); } + /** + * Returns an Observable that batches the backpressure requests to the upstream + * if the downstream requests more than a specified batch size. + * @param batchSize the number of elements to request upfront and otherwise keep in-flight + * @param replenishLevel the lower bound for in-flight values that triggers a replenishment + * @return an Observable that batches the backpressure requests to the upstream + */ + @Experimental + public final Observable requestBatching(int batchSize, int replenishLevel) { + if (batchSize <= 0) { + throw new IllegalArgumentException("batchSize > 0 required"); + } + if (replenishLevel < 0) { + throw new IllegalArgumentException("replenishLevel >= 0 required"); + } + return lift(new OperatorRequestBatcher(batchSize, replenishLevel)); + } + /** * Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable * within periodic time intervals. diff --git a/src/main/java/rx/internal/operators/OperatorRequestBatcher.java b/src/main/java/rx/internal/operators/OperatorRequestBatcher.java new file mode 100644 index 0000000000..070f453fd7 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorRequestBatcher.java @@ -0,0 +1,125 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.Observable.Operator; +import rx.*; + +public final class OperatorRequestBatcher implements Operator { + final int batchSize; + final int replenishLevel; + + public OperatorRequestBatcher(int batchSize, int replenishLevel) { + this.batchSize = batchSize; + this.replenishLevel = replenishLevel; + } + @Override + public Subscriber call(Subscriber child) { + RequestBatcherSubscriber parent = new RequestBatcherSubscriber( + child, batchSize, replenishLevel); + RequestBatcherProducer producer = new RequestBatcherProducer(parent); + child.add(parent); + child.setProducer(producer); + return parent; + } + + static final class RequestBatcherSubscriber extends Subscriber { + final Subscriber child; + final int batchSize; + final int replenishLevel; + RequestBatcherProducer producer; + + public RequestBatcherSubscriber(Subscriber child, int batchSize, int replenishLevel) { + this.child = child; + this.batchSize = batchSize; + this.replenishLevel = replenishLevel; + this.request(0); + } + @Override + public void onNext(T t) { + child.onNext(t); + producer.produced(); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + public void requestMore(long n) { + request(n); + } + } + + static final class RequestBatcherProducer implements Producer { + + final RequestBatcherSubscriber parent; + long childRequested; + long upstreamRequested; + + public RequestBatcherProducer(RequestBatcherSubscriber parent) { + parent.producer = this; + this.parent = parent; + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n > 0) { + long n0; + synchronized (this) { + long r = childRequested; + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + childRequested = u; + + if (r != 0) { + return; + } + long k = upstreamRequested; + n0 = Math.min(parent.batchSize, u) - k; + upstreamRequested = k + n0; + } + if (n0 != 0L) { + parent.requestMore(n0); + } + } + } + void produced() { + long n0; + synchronized (this) { + long c = --childRequested; + + long k = --upstreamRequested; + if (k > parent.replenishLevel) { + return; + } + + n0 = Math.min(parent.batchSize - k, c - k); + upstreamRequested = k + n0; + } + if (n0 != 0L) { + parent.requestMore(n0); + } + } + } +} diff --git a/src/test/java/rx/internal/operators/OperatorRequestBatcherTest.java b/src/test/java/rx/internal/operators/OperatorRequestBatcherTest.java new file mode 100644 index 0000000000..0f8a17b662 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorRequestBatcherTest.java @@ -0,0 +1,318 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.*; + +import rx.Observable; +import rx.functions.Action1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +public class OperatorRequestBatcherTest { + @Test + public void testMaxValueSubscriber() { + final List requests = new ArrayList(); + + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testMaxValueSubscriber >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5); + + TestSubscriber ts = TestSubscriber.create(); + + source.subscribe(ts); + + ts.assertValueCount(100); + ts.assertCompleted(); + ts.assertNoErrors(); + + Assert.assertEquals(Arrays.asList(20L, 15L, 15L, 15L, 15L, 15L, 15L), requests); + } + @Test + public void testTotalGreaterValueSubscriber() { + final List requests = new ArrayList(); + + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testTotalGreaterValueSubscriber >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5); + + TestSubscriber ts = TestSubscriber.create(200); + + source.subscribe(ts); + + ts.assertValueCount(100); + ts.assertCompleted(); + ts.assertNoErrors(); + + Assert.assertEquals(Arrays.asList(20L, 15L, 15L, 15L, 15L, 15L, 15L), requests); + } + + @Test + public void testLargerValueSubscriber() { + final List requests = new ArrayList(); + + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testLargerValueSubscriber >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5); + + TestSubscriber ts = new TestSubscriber(30) { + int remaining = 30; + @Override + public void onNext(Integer t) { + super.onNext(t); + if (--remaining == 0) { + remaining = 30; + request(30); + } + } + @Override + public void onError(Throwable e) { + super.onError(e); + } + @Override + public void onCompleted() { + super.onCompleted(); + } + }; + + source.subscribe(ts); + + ts.assertValueCount(100); + ts.assertCompleted(); + ts.assertNoErrors(); + + Assert.assertEquals(Arrays.asList(20L, 10L, 20L, 10L, 20L, 10L, 20L), requests); + } + @Test + public void testSmallerValueSubscriber() { + final List requests = new ArrayList(); + + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testSmallerValueSubscriber >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5); + + TestSubscriber ts = new TestSubscriber(30) { + int remaining = 10; + @Override + public void onNext(Integer t) { + super.onNext(t); + if (--remaining == 0) { + remaining = 10; + request(10); + } + } + @Override + public void onError(Throwable e) { + super.onError(e); + } + @Override + public void onCompleted() { + super.onCompleted(); + } + }; + + source.subscribe(ts); + + ts.assertValueCount(100); + ts.assertCompleted(); + ts.assertNoErrors(); + + Assert.assertEquals(Arrays.asList(20L, 15L, 15L, 15L, 15L, 15L, 15L), requests); + } + + @Test + public void testBackpressureHonored() { + final List requests = new ArrayList(); + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testBackpressureHonored >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testBackpressureHonored 2 >> " + t); + } + }) + ; + + TestSubscriber ts = TestSubscriber.create(15); + + source.subscribe(ts); + + ts.assertValueCount(15); + + ts.requestMore(15); + + ts.assertValueCount(30); + + ts.requestMore(69); + + ts.assertValueCount(99); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + } + @Test + public void testBackpressureHonoredModulo() { + final List requests = new ArrayList(); + Observable source = Observable.range(0, 120) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testBackpressureHonored >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testBackpressureHonored 2 >> " + t); + } + }) + ; + + TestSubscriber ts = TestSubscriber.create(15); + + source.subscribe(ts); + int tally = 15; + for (int i = 0; i < 8; i++) { + ts.assertValueCount(tally); + tally += 15; + ts.requestMore(15); + } + + ts.assertValueCount(120); + ts.assertCompleted(); + ts.assertNoErrors(); + } + @Test + public void testBackpressureHonoredAsync() { + final List requests = new ArrayList(); + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testBackpressureHonoredAsync >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 5) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testBackpressureHonoredAsync 2 >> " + t); + } + }) + .observeOn(Schedulers.computation()) + ; + + TestSubscriber ts = TestSubscriber.create(); + + source.subscribe(ts); + + ts.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertValueCount(100); + } + + @Test + public void testZeroLevelValueSubscriber() { + final List requests = new ArrayList(); + + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testZeroLevelValueSubscriber >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 0); + + TestSubscriber ts = TestSubscriber.create(200); + + source.subscribe(ts); + + ts.assertValueCount(100); + ts.assertCompleted(); + ts.assertNoErrors(); + + Assert.assertEquals(Arrays.asList(20L, 20L, 20L, 20L, 20L, 20L), requests); + } + + @Test + public void testZeroLevelLargeRequestValueSubscriber() { + final List requests = new ArrayList(); + + Observable source = Observable.range(0, 100) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + System.out.println("testZeroLevelValueSubscriber >> " + t); + requests.add(t); + } + }) + .requestBatching(20, 0); + + TestSubscriber ts = TestSubscriber.create(50); + + source.subscribe(ts); + + ts.assertValueCount(50); + + ts.requestMore(50); + + ts.assertValueCount(100); + ts.assertCompleted(); + ts.assertNoErrors(); + + Assert.assertEquals(Arrays.asList(20L, 20L, 10L, 20L, 20L, 10L), requests); + } +}