Skip to content

Commit 0c50f0a

Browse files
davidmotenakarnokd
authored andcommitted
3.x: constrain upstream requests from take (#6569) (#6650)
1 parent 852e052 commit 0c50f0a

15 files changed

+117
-276
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+22-51
Original file line numberDiff line numberDiff line change
@@ -11212,52 +11212,6 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
1121211212
return RxJavaPlugins.onAssembly(new FlowableLift<R, T>(this, lifter));
1121311213
}
1121411214

11215-
/**
11216-
* Limits both the number of upstream items (after which the sequence completes)
11217-
* and the total downstream request amount requested from the upstream to
11218-
* possibly prevent the creation of excess items by the upstream.
11219-
* <p>
11220-
* The operator requests at most the given {@code count} of items from upstream even
11221-
* if the downstream requests more than that. For example, given a {@code limit(5)},
11222-
* if the downstream requests 1, a request of 1 is submitted to the upstream
11223-
* and the operator remembers that only 4 items can be requested now on. A request
11224-
* of 5 at this point will request 4 from the upstream and any subsequent requests will
11225-
* be ignored.
11226-
* <p>
11227-
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
11228-
* may not be preserved further upstream. For example,
11229-
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
11230-
* default (128) elements from the given {@code source}.
11231-
* <p>
11232-
* The main use of this operator is with sources that are async boundaries that
11233-
* don't interfere with request amounts, such as certain {@code Flowable}-based
11234-
* network endpoints that relay downstream request amounts unchanged and are, therefore,
11235-
* prone to trigger excessive item creation/transmission over the network.
11236-
* <dl>
11237-
* <dt><b>Backpressure:</b></dt>
11238-
* <dd>The operator requests a total of the given {@code count} items from the upstream.</dd>
11239-
* <dt><b>Scheduler:</b></dt>
11240-
* <dd>{@code limit} does not operate by default on a particular {@link Scheduler}.</dd>
11241-
* </dl>
11242-
* <p>History: 2.1.6 - experimental
11243-
* @param count the maximum number of items and the total request amount, non-negative.
11244-
* Zero will immediately cancel the upstream on subscription and complete
11245-
* the downstream.
11246-
* @return the new Flowable instance
11247-
* @see #take(long)
11248-
* @see #rebatchRequests(int)
11249-
* @since 2.2
11250-
*/
11251-
@BackpressureSupport(BackpressureKind.SPECIAL)
11252-
@SchedulerSupport(SchedulerSupport.NONE)
11253-
@CheckReturnValue
11254-
public final Flowable<T> limit(long count) {
11255-
if (count < 0) {
11256-
throw new IllegalArgumentException("count >= 0 required but it was " + count);
11257-
}
11258-
return RxJavaPlugins.onAssembly(new FlowableLimit<T>(this, count));
11259-
}
11260-
1126111215
/**
1126211216
* Returns a Flowable that applies a specified function to each item emitted by the source Publisher and
1126311217
* emits the results of these function applications.
@@ -15372,6 +15326,7 @@ public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super
1537215326
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSingle<T, R>(this, mapper, true));
1537315327
}
1537415328

15329+
1537515330
/**
1537615331
* Returns a Flowable that emits only the first {@code count} items emitted by the source Publisher. If the source emits fewer than
1537715332
* {@code count} items then all of its items are emitted.
@@ -15381,23 +15336,39 @@ public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super
1538115336
* This method returns a Publisher that will invoke a subscribing {@link Subscriber}'s
1538215337
* {@link Subscriber#onNext onNext} function a maximum of {@code count} times before invoking
1538315338
* {@link Subscriber#onComplete onComplete}.
15339+
* <p>
15340+
* Limits both the number of upstream items (after which the sequence completes)
15341+
* and the total downstream request amount requested from the upstream to
15342+
* possibly prevent the creation of excess items by the upstream.
15343+
* <p>
15344+
* The operator requests at most the given {@code count} of items from upstream even
15345+
* if the downstream requests more than that. For example, given a {@code limit(5)},
15346+
* if the downstream requests 1, a request of 1 is submitted to the upstream
15347+
* and the operator remembers that only 4 items can be requested now on. A request
15348+
* of 5 at this point will request 4 from the upstream and any subsequent requests will
15349+
* be ignored.
15350+
* <p>
15351+
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
15352+
* may not be preserved further upstream. For example,
15353+
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
15354+
* default (128) elements from the given {@code source}.
1538415355
* <dl>
1538515356
* <dt><b>Backpressure:</b></dt>
15386-
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
15387-
* behavior in case the first request is smaller than the {@code count}. Otherwise, the source {@code Publisher}
15388-
* is consumed in an unbounded manner (i.e., without applying backpressure to it).</dd>
15357+
* <dd>The source {@code Publisher} is consumed in a bounded manner.</dd>
1538915358
* <dt><b>Scheduler:</b></dt>
1539015359
* <dd>This version of {@code take} does not operate by default on a particular {@link Scheduler}.</dd>
1539115360
* </dl>
1539215361
*
1539315362
* @param count
15394-
* the maximum number of items to emit
15363+
* the maximum number of items and the total request amount, non-negative.
15364+
* Zero will immediately cancel the upstream on subscription and complete
15365+
* the downstream.
1539515366
* @return a Flowable that emits only the first {@code count} items emitted by the source Publisher, or
1539615367
* all of the items from the source Publisher if that Publisher emits fewer than {@code count} items
1539715368
* @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
1539815369
*/
1539915370
@CheckReturnValue
15400-
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
15371+
@BackpressureSupport(BackpressureKind.FULL)
1540115372
@SchedulerSupport(SchedulerSupport.NONE)
1540215373
public final Flowable<T> take(long count) {
1540315374
if (count < 0) {

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimit.java

-135
This file was deleted.

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java

+44-35
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.rxjava3.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.concurrent.atomic.AtomicLong;
1717

1818
import org.reactivestreams.*;
1919

@@ -22,68 +22,67 @@
2222
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2323

2424
public final class FlowableTake<T> extends AbstractFlowableWithUpstream<T, T> {
25-
final long limit;
26-
public FlowableTake(Flowable<T> source, long limit) {
25+
26+
final long n;
27+
28+
public FlowableTake(Flowable<T> source, long n) {
2729
super(source);
28-
this.limit = limit;
30+
this.n = n;
2931
}
3032

3133
@Override
3234
protected void subscribeActual(Subscriber<? super T> s) {
33-
source.subscribe(new TakeSubscriber<T>(s, limit));
35+
source.subscribe(new TakeSubscriber<T>(s, n));
3436
}
3537

36-
static final class TakeSubscriber<T> extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {
38+
static final class TakeSubscriber<T>
39+
extends AtomicLong
40+
implements FlowableSubscriber<T>, Subscription {
3741

38-
private static final long serialVersionUID = -5636543848937116287L;
42+
private static final long serialVersionUID = 2288246011222124525L;
3943

4044
final Subscriber<? super T> downstream;
4145

42-
final long limit;
43-
44-
boolean done;
46+
long remaining;
4547

4648
Subscription upstream;
4749

48-
long remaining;
49-
50-
TakeSubscriber(Subscriber<? super T> actual, long limit) {
50+
TakeSubscriber(Subscriber<? super T> actual, long remaining) {
5151
this.downstream = actual;
52-
this.limit = limit;
53-
this.remaining = limit;
52+
this.remaining = remaining;
53+
lazySet(remaining);
5454
}
5555

5656
@Override
5757
public void onSubscribe(Subscription s) {
5858
if (SubscriptionHelper.validate(this.upstream, s)) {
59-
upstream = s;
60-
if (limit == 0L) {
59+
if (remaining == 0L) {
6160
s.cancel();
62-
done = true;
6361
EmptySubscription.complete(downstream);
6462
} else {
63+
this.upstream = s;
6564
downstream.onSubscribe(this);
6665
}
6766
}
6867
}
6968

7069
@Override
7170
public void onNext(T t) {
72-
if (!done && remaining-- > 0) {
73-
boolean stop = remaining == 0;
71+
long r = remaining;
72+
if (r > 0L) {
73+
remaining = --r;
7474
downstream.onNext(t);
75-
if (stop) {
75+
if (r == 0L) {
7676
upstream.cancel();
77-
onComplete();
77+
downstream.onComplete();
7878
}
7979
}
8080
}
8181

8282
@Override
8383
public void onError(Throwable t) {
84-
if (!done) {
85-
done = true;
86-
upstream.cancel();
84+
if (remaining > 0L) {
85+
remaining = 0L;
8786
downstream.onError(t);
8887
} else {
8988
RxJavaPlugins.onError(t);
@@ -92,29 +91,39 @@ public void onError(Throwable t) {
9291

9392
@Override
9493
public void onComplete() {
95-
if (!done) {
96-
done = true;
94+
if (remaining > 0L) {
95+
remaining = 0L;
9796
downstream.onComplete();
9897
}
9998
}
10099

101100
@Override
102101
public void request(long n) {
103-
if (!SubscriptionHelper.validate(n)) {
104-
return;
105-
}
106-
if (!get() && compareAndSet(false, true)) {
107-
if (n >= limit) {
108-
upstream.request(Long.MAX_VALUE);
109-
return;
102+
if (SubscriptionHelper.validate(n)) {
103+
for (;;) {
104+
long r = get();
105+
if (r == 0L) {
106+
break;
107+
}
108+
long toRequest;
109+
if (r <= n) {
110+
toRequest = r;
111+
} else {
112+
toRequest = n;
113+
}
114+
long u = r - toRequest;
115+
if (compareAndSet(r, u)) {
116+
upstream.request(toRequest);
117+
break;
118+
}
110119
}
111120
}
112-
upstream.request(n);
113121
}
114122

115123
@Override
116124
public void cancel() {
117125
upstream.cancel();
118126
}
127+
119128
}
120129
}

0 commit comments

Comments
 (0)