Skip to content

Commit b0250bb

Browse files
Merge pull request #2923 from akarnokd/OnBackpressureLatest
OnBackpressureLatest: Non-blocking version of the toBlocking().latest() operator.
2 parents 03dd808 + 0ddd75f commit b0250bb

File tree

3 files changed

+393
-0
lines changed

3 files changed

+393
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5366,6 +5366,27 @@ public final Observable<T> onBackpressureBlock() {
53665366
return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE);
53675367
}
53685368

5369+
/**
5370+
* Instructs an Observable that is emitting items faster than its observer can consume them to
5371+
* hold onto the latest value and emit that on request.
5372+
* <p>
5373+
* Its behavior is logically equivalent to toBlocking().latest() with the exception that
5374+
* the downstream is not blocking while requesting more values.
5375+
* <p>
5376+
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
5377+
* and doesn't propagate any backpressure requests from downstream.
5378+
* <p>
5379+
* Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
5380+
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events.
5381+
* @return
5382+
* @Experimental The behavior of this can change at any time.
5383+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5384+
*/
5385+
@Experimental
5386+
public final Observable<T> onBackpressureLatest() {
5387+
return lift(OperatorOnBackpressureLatest.<T>instance());
5388+
}
5389+
53695390
/**
53705391
* Instructs an Observable to pass control to another Observable rather than invoking
53715392
* {@link Observer#onError onError} if it encounters an error.
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.*;
19+
20+
import rx.Observable.Operator;
21+
import rx.*;
22+
23+
/**
24+
* An operator which drops all but the last received value in case the downstream
25+
* doesn't request more.
26+
*/
27+
public final class OperatorOnBackpressureLatest<T> implements Operator<T, T> {
28+
/** Holds a singleton instance initialized on class-loading. */
29+
static final class Holder {
30+
static final OperatorOnBackpressureLatest<Object> INSTANCE = new OperatorOnBackpressureLatest<Object>();
31+
}
32+
33+
/**
34+
* Returns a singleton instance of the OnBackpressureLatest operator since it is stateless.
35+
* @return the single instanceof OperatorOnBackpressureLatest
36+
*/
37+
@SuppressWarnings("unchecked")
38+
public static <T> OperatorOnBackpressureLatest<T> instance() {
39+
return (OperatorOnBackpressureLatest<T>)Holder.INSTANCE;
40+
}
41+
42+
@Override
43+
public Subscriber<? super T> call(Subscriber<? super T> child) {
44+
final LatestEmitter<T> producer = new LatestEmitter<T>(child);
45+
LatestSubscriber<T> parent = new LatestSubscriber<T>(producer);
46+
producer.parent = parent;
47+
child.add(parent);
48+
child.add(producer);
49+
child.setProducer(producer);
50+
return parent;
51+
}
52+
/**
53+
* A terminatable producer which emits the latest items on request.
54+
* @param <T>
55+
*/
56+
static final class LatestEmitter<T> extends AtomicLong implements Producer, Subscription, Observer<T> {
57+
/** */
58+
private static final long serialVersionUID = -1364393685005146274L;
59+
final Subscriber<? super T> child;
60+
LatestSubscriber<? super T> parent;
61+
final AtomicReference<Object> value;
62+
/** Written before done, read after done. */
63+
Throwable terminal;
64+
volatile boolean done;
65+
/** Guarded by this. */
66+
boolean emitting;
67+
/** Guarded by this. */
68+
boolean missed;
69+
static final Object EMPTY = new Object();
70+
static final long NOT_REQUESTED = Long.MIN_VALUE / 2;
71+
public LatestEmitter(Subscriber<? super T> child) {
72+
this.child = child;
73+
this.value = new AtomicReference<Object>(EMPTY);
74+
this.lazySet(NOT_REQUESTED); // not
75+
}
76+
@Override
77+
public void request(long n) {
78+
if (n >= 0) {
79+
for (;;) {
80+
long r = get();
81+
if (r == Long.MIN_VALUE) {
82+
return;
83+
}
84+
long u;
85+
if (r == NOT_REQUESTED) {
86+
u = n;
87+
} else {
88+
u = r + n;
89+
if (u < 0) {
90+
u = Long.MAX_VALUE;
91+
}
92+
}
93+
if (compareAndSet(r, u)) {
94+
if (r == NOT_REQUESTED) {
95+
parent.requestMore(Long.MAX_VALUE);
96+
}
97+
emit();
98+
return;
99+
}
100+
}
101+
}
102+
}
103+
long produced(long n) {
104+
for (;;) {
105+
long r = get();
106+
if (r < 0) {
107+
return r;
108+
}
109+
long u = r - n;
110+
if (compareAndSet(r, u)) {
111+
return u;
112+
}
113+
}
114+
}
115+
@Override
116+
public boolean isUnsubscribed() {
117+
return get() == Long.MIN_VALUE;
118+
}
119+
@Override
120+
public void unsubscribe() {
121+
if (get() >= 0) {
122+
getAndSet(Long.MIN_VALUE);
123+
}
124+
}
125+
126+
@Override
127+
public void onNext(T t) {
128+
value.lazySet(t); // emit's synchronized block does a full release
129+
emit();
130+
}
131+
@Override
132+
public void onError(Throwable e) {
133+
terminal = e;
134+
done = true;
135+
emit();
136+
}
137+
@Override
138+
public void onCompleted() {
139+
done = true;
140+
emit();
141+
}
142+
void emit() {
143+
synchronized (this) {
144+
if (emitting) {
145+
missed = true;
146+
return;
147+
}
148+
emitting = true;
149+
missed = false;
150+
}
151+
boolean skipFinal = false;
152+
try {
153+
for (;;) {
154+
long r = get();
155+
if (r == Long.MIN_VALUE) {
156+
skipFinal = true;
157+
break;
158+
}
159+
Object v = value.get();
160+
if (r > 0 && v != EMPTY) {
161+
@SuppressWarnings("unchecked")
162+
T v2 = (T)v;
163+
child.onNext(v2);
164+
value.compareAndSet(v, EMPTY);
165+
produced(1);
166+
v = EMPTY;
167+
}
168+
if (v == EMPTY && done) {
169+
Throwable e = terminal;
170+
if (e != null) {
171+
child.onError(e);
172+
} else {
173+
child.onCompleted();
174+
}
175+
}
176+
synchronized (this) {
177+
if (!missed) {
178+
emitting = false;
179+
skipFinal = true;
180+
break;
181+
}
182+
missed = false;
183+
}
184+
}
185+
} finally {
186+
if (!skipFinal) {
187+
synchronized (this) {
188+
emitting = false;
189+
}
190+
}
191+
}
192+
}
193+
}
194+
static final class LatestSubscriber<T> extends Subscriber<T> {
195+
private final LatestEmitter<T> producer;
196+
197+
private LatestSubscriber(LatestEmitter<T> producer) {
198+
this.producer = producer;
199+
}
200+
201+
@Override
202+
public void onStart() {
203+
// don't run until the child actually requested to avoid synchronous problems
204+
request(0);
205+
}
206+
207+
@Override
208+
public void onNext(T t) {
209+
producer.onNext(t);
210+
}
211+
212+
@Override
213+
public void onError(Throwable e) {
214+
producer.onError(e);
215+
}
216+
217+
@Override
218+
public void onCompleted() {
219+
producer.onCompleted();
220+
}
221+
void requestMore(long n) {
222+
request(n);
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)