Skip to content

Commit bc83db0

Browse files
committed
1.x: request rebatch operator (#3971)
* 1.x: request rebatch operator * Add argument test * Fix javadoc, add experimental
1 parent ab6dbc1 commit bc83db0

File tree

3 files changed

+71
-0
lines changed

3 files changed

+71
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6698,6 +6698,32 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
66986698
return OperatorPublish.create(this, selector);
66996699
}
67006700

6701+
/**
6702+
* Requests {@code n} initially from the upstream and then 75% of {@code n} subsequently
6703+
* after 75% of {@code n} values have been emitted to the downstream.
6704+
*
6705+
* <p>This operator allows preventing the downstream to trigger unbounded mode via {@code request(Long.MAX_VALUE)}
6706+
* or compensate for the per-item overhead of small and frequent requests.
6707+
*
6708+
* <dl>
6709+
* <dt><b>Backpressure:</b></dt>
6710+
* <dd>The operator expects backpressure from upstream and honors backpressure from downstream.</dd>
6711+
* <dt><b>Scheduler:</b></dt>
6712+
* <dd>{@code rebatchRequests} does not operate by default on a particular {@link Scheduler}.</dd>
6713+
* </dl>
6714+
*
6715+
* @param n the initial request amount, further request will happen after 75% of this value
6716+
* @return the Observable that rebatches request amounts from downstream
6717+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
6718+
*/
6719+
@Experimental
6720+
public final Observable<T> rebatchRequests(int n) {
6721+
if (n <= 0) {
6722+
throw new IllegalArgumentException("n > 0 required but it was " + n);
6723+
}
6724+
return lift(OperatorObserveOn.<T>rebatch(n));
6725+
}
6726+
67016727
/**
67026728
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
67036729
* Observable, then feeds the result of that function along with the second item emitted by the source

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import rx.internal.util.atomic.SpscAtomicArrayQueue;
2828
import rx.internal.util.unsafe.*;
2929
import rx.plugins.RxJavaPlugins;
30+
import rx.schedulers.Schedulers;
3031

3132
/**
3233
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
@@ -75,6 +76,17 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
7576
return parent;
7677
}
7778
}
79+
80+
public static <T> Operator<T, T> rebatch(final int n) {
81+
return new Operator<T, T>() {
82+
@Override
83+
public Subscriber<? super T> call(Subscriber<? super T> child) {
84+
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
85+
parent.init();
86+
return parent;
87+
}
88+
};
89+
}
7890

7991
/** Observe through individual queue per observer. */
8092
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -933,4 +933,37 @@ public void bufferSizesWork() {
933933
ts.assertNoErrors();
934934
}
935935
}
936+
937+
@Test
938+
public void synchronousRebatching() {
939+
final List<Long> requests = new ArrayList<Long>();
940+
941+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
942+
943+
Observable.range(1, 50)
944+
.doOnRequest(new Action1<Long>() {
945+
@Override
946+
public void call(Long r) {
947+
requests.add(r);
948+
}
949+
})
950+
.rebatchRequests(20)
951+
.subscribe(ts);
952+
953+
ts.assertValueCount(50);
954+
ts.assertNoErrors();
955+
ts.assertCompleted();
956+
957+
assertEquals(Arrays.asList(20L, 15L, 15L, 15L), requests);
958+
}
959+
960+
@Test
961+
public void rebatchRequestsArgumentCheck() {
962+
try {
963+
Observable.never().rebatchRequests(-99);
964+
fail("Didn't throw IAE");
965+
} catch (IllegalArgumentException ex) {
966+
assertEquals("n > 0 required but it was -99", ex.getMessage());
967+
}
968+
}
936969
}

0 commit comments

Comments
 (0)