Skip to content

Commit fd20891

Browse files
committed
Add an operator to throttle data via controlling the requests going upstream.
1 parent 271c83b commit fd20891

File tree

3 files changed

+429
-0
lines changed

3 files changed

+429
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7664,6 +7664,26 @@ public final <U> Observable<T> sample(Observable<U> sampler) {
76647664
return lift(new OperatorSampleWithObservable<T, U>(sampler));
76657665
}
76667666

7667+
/**
7668+
* Allow the an external signal control the amount of data being set through this Observable chain.
7669+
* When the control Observable emits false (closes the valve) requests upstream are stopped and any
7670+
* requests from downstream for more data are buffered until the control Observable emits a true
7671+
* (opens the valve). Should the control Observable error or complete while closed (last control
7672+
* emition was a false) an error is sent down the data stream. The granularity breaks up large requests
7673+
* from downstream to limit the number of onNexts that are possible after the control valve has closed.
7674+
* The smaller the number the tighter the control on the flow but the more overhead there will be in
7675+
* managing the requests.
7676+
*
7677+
* @param control
7678+
* an Observable that dictates if request signals propagate upstream
7679+
* @param granularity
7680+
* the maximum number of outstanding requests.
7681+
* @returns an Observable that mostly stops emiting after the control Observable emits a false.
7682+
*/
7683+
public final Observable<T> pressureValve(Observable<Boolean> control, long granularity) {
7684+
return lift(new OperatorValve<T>(control, granularity));
7685+
}
7686+
76677687
/**
76687688
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
76697689
* Observable, then feeds the result of that function along with the second item emitted by the source
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
20+
import rx.Observable;
21+
import rx.Observable.Operator;
22+
import rx.Producer;
23+
import rx.Subscriber;
24+
25+
/**
26+
* An {@code Observable} that emits the first {@code num} items emitted by the source {@code Observable}.
27+
* <p>
28+
* <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/take.png" alt="" />
29+
* <p>
30+
* You can choose to pay attention only to the first {@code num} items emitted by an {@code Observable} by using
31+
* the {@code take} operator. This operator returns an {@code Observable} that will invoke a subscriber's
32+
* {@link Subscriber#onNext onNext} function a maximum of {@code num} times before invoking
33+
* {@link Subscriber#onCompleted onCompleted}.
34+
*/
35+
public final class OperatorValve<T> implements Operator<T, T> {
36+
private final Observable<Boolean> onByDefault;
37+
private final long _granularity;
38+
39+
public OperatorValve(Observable<Boolean> onByDefault, long granularity) {
40+
this.onByDefault = onByDefault;
41+
this._granularity = granularity;
42+
}
43+
44+
@Override
45+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
46+
return new Subscriber<T>(child) {
47+
private final long granularity = _granularity;
48+
private Producer p;
49+
private long backlog;// synchronized access on Producer p
50+
private long outstanding;// synchronized access on Producer p
51+
private boolean isOpen = true;// synchronized access on Producer p
52+
private AtomicBoolean terminated = new AtomicBoolean();
53+
54+
@Override
55+
public void onCompleted() {
56+
if (terminated.compareAndSet(false, true))
57+
child.onCompleted();
58+
}
59+
60+
@Override
61+
public void onError(Throwable e) {
62+
if (terminated.compareAndSet(false, true))
63+
child.onError(e);
64+
}
65+
66+
@Override
67+
public void onNext(T t) {
68+
child.onNext(t);
69+
final long requestUp;
70+
synchronized (this) {
71+
if (--outstanding == 0 && isOpen) {
72+
// all out and still open; check to see if there is a backlog.
73+
if (backlog > granularity) {
74+
// don't request too much at once
75+
requestUp = granularity;
76+
} else if (backlog > 0) {
77+
// the backlog isn't too big
78+
requestUp = backlog;
79+
} else {
80+
// no backlog
81+
requestUp = 0;
82+
}
83+
} else {
84+
// expecting more or closed
85+
requestUp = 0;
86+
}
87+
if (requestUp > 0) {
88+
// do the last of the accounting inside the synchronized block
89+
backlog -= requestUp;
90+
outstanding += requestUp;
91+
}
92+
}
93+
// do the request work outside the synchronized block
94+
if (requestUp != 0)
95+
p.request(requestUp);
96+
}
97+
98+
@Override
99+
public void setProducer(final Producer p) {
100+
this.p = p;
101+
102+
onByDefault.unsafeSubscribe(new Subscriber<Boolean>() {
103+
@Override
104+
public void onCompleted() {
105+
boolean _isOpen;
106+
synchronized (this) {
107+
// make sure to get the latest value of isOpen
108+
_isOpen = isOpen;
109+
}
110+
if (!_isOpen) {
111+
if (terminated.compareAndSet(false, true)) {
112+
child.onError(new IllegalStateException("control signal terminated while valve was closed"));
113+
}
114+
}
115+
unsubscribe();
116+
}
117+
118+
@Override
119+
public void onError(Throwable e) {
120+
if (terminated.compareAndSet(false, true))
121+
child.onError(e);
122+
unsubscribe();
123+
}
124+
125+
@Override
126+
public void onNext(Boolean open) {
127+
if (open) {
128+
final long requestUp;
129+
synchronized (this) {
130+
if (!isOpen) {
131+
// opening, check backlog.
132+
if (backlog > granularity) {
133+
// don't request too much at once
134+
requestUp = granularity;
135+
} else if (backlog > 0) {
136+
// the backlog isn't too big
137+
requestUp = backlog;
138+
} else {
139+
// no backlog
140+
requestUp = 0;
141+
}
142+
isOpen = true;
143+
} else {
144+
// was already open
145+
requestUp = 0;
146+
}
147+
if (requestUp > 0) {
148+
// do the last of the accounting inside the synchronized block
149+
backlog -= requestUp;
150+
outstanding += requestUp;
151+
}
152+
}
153+
// do the request work outside the synchronized block
154+
if (requestUp > 0)
155+
p.request(requestUp);
156+
} else {
157+
synchronized (this) {
158+
// closing
159+
isOpen = false;
160+
}
161+
}
162+
}
163+
});
164+
165+
super.setProducer(new Producer() {
166+
@Override
167+
public void request(long n) {
168+
if (n < 0)
169+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
170+
final long requestUp;
171+
synchronized (this) {
172+
// increase backlog
173+
backlog += n;
174+
// now figure out if what is going to happen to it.
175+
if (!isOpen) {
176+
// closed; don't send
177+
requestUp = 0;
178+
} else {
179+
if (backlog > granularity) {
180+
// don't request too much at once
181+
requestUp = granularity;
182+
} else if (backlog > 0) {
183+
// the backlog isn't too big
184+
requestUp = backlog;
185+
} else {
186+
// no backlog
187+
requestUp = 0;
188+
}
189+
}
190+
if (requestUp > 0) {
191+
// do the last of the accounting inside the synchronized block
192+
backlog -= requestUp;
193+
outstanding += requestUp;
194+
}
195+
}
196+
// do the request work outside the synchronized block
197+
if (requestUp != 0)
198+
p.request(requestUp);
199+
}
200+
});
201+
}
202+
};
203+
}
204+
}

0 commit comments

Comments
 (0)