Skip to content

Commit fd2da39

Browse files
committed
Merge pull request #3678 from akarnokd/BufferWindowRequestFix1x
1.x: fix counted buffer and window backpressure
2 parents df963fa + be8d144 commit fd2da39

File tree

8 files changed

+1082
-328
lines changed

8 files changed

+1082
-328
lines changed

src/main/java/rx/Observable.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10131,8 +10131,8 @@ public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observa
1013110131
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window3.png" alt="">
1013210132
* <dl>
1013310133
* <dt><b>Backpressure Support:</b></dt>
10134-
* <dd>The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables
10135-
* but each of them will emit at most {@code count} elements.</dd>
10134+
* <dd>The operator honors backpressure of its inner and outer subscribers, however, the inner Observable uses an
10135+
* unbounded buffer that may hold at most {@code count} elements.</dd>
1013610136
* <dt><b>Scheduler:</b></dt>
1013710137
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
1013810138
* </dl>
@@ -10141,6 +10141,7 @@ public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observa
1014110141
* the maximum size of each window before it should be emitted
1014210142
* @return an Observable that emits connected, non-overlapping windows, each containing at most
1014310143
* {@code count} items from the source Observable
10144+
* @throws IllegalArgumentException if either count is non-positive
1014410145
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
1014510146
*/
1014610147
public final Observable<Observable<T>> window(int count) {
@@ -10156,8 +10157,8 @@ public final Observable<Observable<T>> window(int count) {
1015610157
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window4.png" alt="">
1015710158
* <dl>
1015810159
* <dt><b>Backpressure Support:</b></dt>
10159-
* <dd>The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables
10160-
* but each of them will emit at most {@code count} elements.</dd>
10160+
* <dd>The operator honors backpressure of its inner and outer subscribers, however, the inner Observable uses an
10161+
* unbounded buffer that may hold at most {@code count} elements.</dd>
1016110162
* <dt><b>Scheduler:</b></dt>
1016210163
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
1016310164
* </dl>
@@ -10169,9 +10170,16 @@ public final Observable<Observable<T>> window(int count) {
1016910170
* {@code count} are equal this is the same operation as {@link #window(int)}.
1017010171
* @return an Observable that emits windows every {@code skip} items containing at most {@code count} items
1017110172
* from the source Observable
10173+
* @throws IllegalArgumentException if either count or skip is non-positive
1017210174
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
1017310175
*/
1017410176
public final Observable<Observable<T>> window(int count, int skip) {
10177+
if (count <= 0) {
10178+
throw new IllegalArgumentException("count > 0 required but it was " + count);
10179+
}
10180+
if (skip <= 0) {
10181+
throw new IllegalArgumentException("skip > 0 required but it was " + skip);
10182+
}
1017510183
return lift(new OperatorWindowWithSize<T>(count, skip));
1017610184
}
1017710185

src/main/java/rx/internal/operators/BackpressureUtils.java

Lines changed: 208 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicLong;
19-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18+
import java.util.Queue;
19+
import java.util.concurrent.atomic.*;
20+
21+
import rx.Subscriber;
2022

2123
/**
2224
* Utility functions for use with backpressure.
@@ -32,6 +34,8 @@ private BackpressureUtils() {
3234
* addition once the addition is successful (uses CAS semantics). If
3335
* overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
3436
*
37+
* @param <T> the type of the target object on which the field updater operates
38+
*
3539
* @param requested
3640
* atomic field updater for a request count
3741
* @param object
@@ -103,6 +107,208 @@ public static long addCap(long a, long b) {
103107
return u;
104108
}
105109

110+
/**
111+
* Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.
112+
*/
113+
static final long COMPLETED_MASK = Long.MIN_VALUE;
114+
/**
115+
* Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.
116+
*/
117+
static final long REQUESTED_MASK = Long.MAX_VALUE;
118+
119+
/**
120+
* Signals the completion of the main sequence and switches to post-completion replay mode.
121+
*
122+
* <p>
123+
* Don't modify the queue after calling this method!
124+
*
125+
* <p>
126+
* Post-completion backpressure handles the case when a source produces values based on
127+
* requests when it is active but more values are available even after its completion.
128+
* In this case, the onCompleted() can't just emit the contents of the queue but has to
129+
* coordinate with the requested amounts. This requires two distinct modes: active and
130+
* completed. In active mode, requests flow through and the queue is not accessed but
131+
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
132+
* <p>
133+
* The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since
134+
* request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't
135+
* allowed.
136+
*
137+
* @param <T> the value type to emit
138+
* @param requested the holder of current requested amount
139+
* @param queue the queue holding values to be emitted after completion
140+
* @param actual the subscriber to receive the values
141+
*/
142+
public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual) {
143+
for (;;) {
144+
long r = requested.get();
145+
146+
// switch to completed mode only once
147+
if ((r & COMPLETED_MASK) != 0L) {
148+
return;
149+
}
150+
151+
//
152+
long u = r | COMPLETED_MASK;
153+
154+
if (requested.compareAndSet(r, u)) {
155+
// if we successfully switched to post-complete mode and there
156+
// are requests available start draining the queue
157+
if (r != 0L) {
158+
// if the switch happened when there was outstanding requests, start draining
159+
postCompleteDrain(requested, queue, actual);
160+
}
161+
return;
162+
}
163+
}
164+
}
165+
166+
/**
167+
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
168+
*
169+
* <p>
170+
* Post-completion backpressure handles the case when a source produces values based on
171+
* requests when it is active but more values are available even after its completion.
172+
* In this case, the onCompleted() can't just emit the contents of the queue but has to
173+
* coordinate with the requested amounts. This requires two distinct modes: active and
174+
* completed. In active mode, requests flow through and the queue is not accessed but
175+
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
176+
*
177+
* @param <T> the value type to emit
178+
* @param requested the holder of current requested amount
179+
* @param n the value requested;
180+
* @param queue the queue holding values to be emitted after completion
181+
* @param actual the subscriber to receive the values
182+
* @return true if in the active mode and the request amount of n can be relayed to upstream, false if
183+
* in the post-completed mode and the queue is draining.
184+
*/
185+
public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) {
186+
if (n < 0L) {
187+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
188+
}
189+
if (n == 0) {
190+
return (requested.get() & COMPLETED_MASK) == 0;
191+
}
192+
193+
for (;;) {
194+
long r = requested.get();
195+
196+
// mask of the completed flag
197+
long c = r & COMPLETED_MASK;
198+
// mask of the requested amount
199+
long u = r & REQUESTED_MASK;
200+
201+
// add the current requested amount and the new requested amount
202+
// cap at Long.MAX_VALUE;
203+
long v = addCap(u, n);
204+
205+
// restore the completed flag
206+
v |= c;
207+
208+
if (requested.compareAndSet(r, v)) {
209+
// if there was no outstanding request before and in
210+
// the post-completed state, start draining
211+
if (r == COMPLETED_MASK) {
212+
postCompleteDrain(requested, queue, actual);
213+
return false;
214+
}
215+
// returns true for active mode and false if the completed flag was set
216+
return c == 0L;
217+
}
218+
}
219+
}
220+
221+
/**
222+
* Drains the queue based on the outstanding requests in post-completed mode (only!).
223+
*
224+
* @param <T> the value type to emit
225+
* @param requested the holder of current requested amount
226+
* @param queue the queue holding values to be emitted after completion
227+
* @param actual the subscriber to receive the values
228+
*/
229+
static <T> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super T> subscriber) {
230+
231+
long r = requested.get();
232+
/*
233+
* Since we are supposed to be in the post-complete state,
234+
* requested will have its top bit set.
235+
* To allow direct comparison, we start with an emission value which has also
236+
* this flag set, then increment it as usual.
237+
* Since COMPLETED_MASK is essentially Long.MIN_VALUE,
238+
* there won't be any overflow or sign flip.
239+
*/
240+
long e = COMPLETED_MASK;
241+
242+
for (;;) {
243+
244+
/*
245+
* This is an improved queue-drain algorithm with a specialization
246+
* in which we know the queue won't change anymore (i.e., done is always true
247+
* when looking at the classical algorithm and there is no error).
248+
*
249+
* Note that we don't check for cancellation or emptyness upfront for two reasons:
250+
* 1) if e != r, the loop will do this and we quit appropriately
251+
* 2) if e == r, then either there was no outstanding requests or we emitted the requested amount
252+
* and the execution simply falls to the e == r check below which checks for emptyness anyway.
253+
*/
254+
255+
while (e != r) {
256+
if (subscriber.isUnsubscribed()) {
257+
return;
258+
}
259+
260+
T v = queue.poll();
261+
262+
if (v == null) {
263+
subscriber.onCompleted();
264+
return;
265+
}
266+
267+
subscriber.onNext(v);
268+
269+
e++;
270+
}
271+
272+
/*
273+
* If the emission count reaches the requested amount the same time the queue becomes empty
274+
* this will make sure the subscriber is completed immediately instead of on the next request.
275+
* This is also true if there are no outstanding requests (this the while loop doesn't run)
276+
* and the queue is empty from the start.
277+
*/
278+
if (e == r) {
279+
if (subscriber.isUnsubscribed()) {
280+
return;
281+
}
282+
if (queue.isEmpty()) {
283+
subscriber.onCompleted();
284+
return;
285+
}
286+
}
287+
288+
/*
289+
* Fast flow: see if more requests have arrived in the meantime.
290+
* This avoids an atomic add (~40 cycles) and resumes the emission immediately.
291+
*/
292+
r = requested.get();
293+
294+
if (r == e) {
295+
/*
296+
* Atomically decrement the requested amount by the emission amount.
297+
* We can't use the full emission value because of the completed flag,
298+
* however, due to two's complement representation, the flag on requested
299+
* is preserved.
300+
*/
301+
r = requested.addAndGet(-(e & REQUESTED_MASK));
302+
// The requested amount actually reached zero, quit
303+
if (r == COMPLETED_MASK) {
304+
return;
305+
}
306+
// reset the emission count
307+
e = COMPLETED_MASK;
308+
}
309+
}
310+
}
311+
106312
/**
107313
* Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
108314
* @param requested the requested amount holder

0 commit comments

Comments
 (0)