Skip to content

Commit f0aec07

Browse files
akarnokdakarnokd
authored andcommitted
Window operators now support backpressure in the inner Observable
1 parent 2e44d56 commit f0aec07

12 files changed

+1065
-22
lines changed
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.Queue;
19+
import java.util.concurrent.atomic.*;
20+
21+
import rx.*;
22+
import rx.exceptions.*;
23+
import rx.functions.*;
24+
import rx.internal.util.atomic.*;
25+
import rx.internal.util.unsafe.*;
26+
import rx.subjects.Subject;
27+
import rx.subscriptions.Subscriptions;
28+
29+
/**
30+
* A Subject variant which buffers events until a single Subscriber arrives and replays them to it
31+
* and potentially switches to direct delivery once the Subscriber caught up and requested an unlimited
32+
* amount. In this case, the buffered values are no longer retained. If the Subscriber
33+
* requests a limited amount, queueing is involved and only those values are retained which
34+
* weren't requested by the Subscriber at that time.
35+
*/
36+
public final class BufferUntilSubscriberV2<T> extends Subject<T, T> {
37+
38+
/**
39+
* Constructs an empty BufferUntilSubscriber instance with the default capacity hint of 16 elements.
40+
*
41+
* @return the created BufferUntilSubscriber instance
42+
*/
43+
public static <T> BufferUntilSubscriberV2<T> create() {
44+
return create(16);
45+
}
46+
/**
47+
* Constructs an empty BufferUntilSubscriber instance with a capacity hint.
48+
* <p>The capacity hint determines the internal queue's island size: the larger
49+
* it is the less frequent allocation will happen if there is no subscriber
50+
* or the subscriber hasn't caught up.
51+
* @param capacityHint the capacity hint for the internal queue
52+
* @return the created BufferUntilSubscriber instance
53+
*/
54+
public static <T> BufferUntilSubscriberV2<T> create(int capacityHint) {
55+
State<T> state = new State<T>(capacityHint);
56+
OnSubscribeBUS<T> onSubscribe = new OnSubscribeBUS<T>(state);
57+
return new BufferUntilSubscriberV2<T>(onSubscribe, state);
58+
}
59+
60+
final State<T> state;
61+
62+
private BufferUntilSubscriberV2(OnSubscribe<T> onSubscribe, State<T> state) {
63+
super(onSubscribe);
64+
this.state = state;
65+
}
66+
67+
@Override
68+
public void onNext(T t) {
69+
state.onNext(t);
70+
}
71+
72+
@Override
73+
public void onError(Throwable e) {
74+
state.onError(e);
75+
}
76+
77+
@Override
78+
public void onCompleted() {
79+
state.onCompleted();
80+
}
81+
82+
@Override
83+
public boolean hasObservers() {
84+
return state.subscriber.get() != null;
85+
}
86+
87+
/**
88+
* The single-consumption replaying state.
89+
*
90+
* @param <T> the value type
91+
*/
92+
static final class State<T> extends AtomicLong implements Producer, Observer<T>, Action0 {
93+
/** */
94+
private static final long serialVersionUID = -9044104859202255786L;
95+
/** The single subscriber. */
96+
final AtomicReference<Subscriber<? super T>> subscriber;
97+
/** The queue holding values until the subscriber arrives and catches up. */
98+
final Queue<Object> queue;
99+
/** JCTools queues don't accept nulls. */
100+
final NotificationLite<T> nl;
101+
/** In case the source emitted an error. */
102+
Throwable error;
103+
/** Indicates the source has terminated. */
104+
volatile boolean done;
105+
/** Emitter loop: emitting indicator. Guarded by this. */
106+
boolean emitting;
107+
/** Emitter loop: missed emission indicator. Guarded by this. */
108+
boolean missed;
109+
/** Indicates the queue can be bypassed because the child has caught up with the replay. */
110+
volatile boolean caughtUp;
111+
/**
112+
* Constructor.
113+
* @param capacityHint indicates how large each island in the Spsc queue should be to
114+
* reduce allocation frequency
115+
*/
116+
public State(int capacityHint) {
117+
this.nl = NotificationLite.instance();
118+
this.subscriber = new AtomicReference<Subscriber<? super T>>();
119+
Queue<Object> q;
120+
if (capacityHint > 1) {
121+
q = UnsafeAccess.isUnsafeAvailable()
122+
? new SpscUnboundedArrayQueue<Object>(capacityHint)
123+
: new SpscUnboundedAtomicArrayQueue<Object>(capacityHint);
124+
} else {
125+
q = UnsafeAccess.isUnsafeAvailable()
126+
? new SpscLinkedQueue<Object>()
127+
: new SpscLinkedAtomicQueue<Object>();
128+
}
129+
this.queue = q;
130+
}
131+
132+
@Override
133+
public void onNext(T t) {
134+
if (!done) {
135+
if (!caughtUp) {
136+
boolean stillReplay = false;
137+
/*
138+
* We need to offer while holding the lock because
139+
* we have to atomically switch caughtUp to true
140+
* that can only happen if there isn't any concurrent
141+
* offer() happening while the emission is in replayLoop().
142+
*/
143+
synchronized (this) {
144+
if (!caughtUp) {
145+
queue.offer(nl.next(t));
146+
stillReplay = true;
147+
}
148+
}
149+
if (stillReplay) {
150+
replay();
151+
return;
152+
}
153+
}
154+
Subscriber<? super T> s = subscriber.get();
155+
try {
156+
s.onNext(t);
157+
} catch (Throwable ex) {
158+
Exceptions.throwIfFatal(ex);
159+
s.onError(OnErrorThrowable.addValueAsLastCause(ex, t));
160+
}
161+
}
162+
}
163+
@Override
164+
public void onError(Throwable e) {
165+
if (!done) {
166+
error = e;
167+
done = true;
168+
if (!caughtUp) {
169+
boolean stillReplay = false;
170+
synchronized (this) {
171+
stillReplay = !caughtUp;
172+
}
173+
if (stillReplay) {
174+
replay();
175+
return;
176+
}
177+
}
178+
subscriber.get().onError(e);
179+
}
180+
}
181+
@Override
182+
public void onCompleted() {
183+
if (!done) {
184+
done = true;
185+
if (!caughtUp) {
186+
boolean stillReplay = false;
187+
synchronized (this) {
188+
stillReplay = !caughtUp;
189+
}
190+
if (stillReplay) {
191+
replay();
192+
return;
193+
}
194+
}
195+
subscriber.get().onCompleted();
196+
}
197+
}
198+
199+
@Override
200+
public void request(long n) {
201+
if (n < 0L) {
202+
throw new IllegalArgumentException("n >= 0 required");
203+
} else
204+
if (n > 0L) {
205+
BackpressureUtils.getAndAddRequest(this, n);
206+
replay();
207+
} else
208+
if (done) { // terminal events can be delivered for zero requests
209+
replay();
210+
}
211+
}
212+
/**
213+
* Tries to set the given subscriber if not already set, sending an
214+
* IllegalStateException to the subscriber otherwise.
215+
* @param subscriber
216+
*/
217+
public void setSubscriber(Subscriber<? super T> subscriber) {
218+
if (this.subscriber.compareAndSet(null, subscriber)) {
219+
subscriber.add(Subscriptions.create(this));
220+
subscriber.setProducer(this);
221+
} else {
222+
subscriber.onError(new IllegalStateException("Only a single subscriber is allowed"));
223+
}
224+
}
225+
/**
226+
* Tries to replay the contents of the queue.
227+
*/
228+
void replay() {
229+
synchronized (this) {
230+
if (emitting) {
231+
missed = true;
232+
return;
233+
}
234+
emitting = true;
235+
}
236+
Queue<Object> q = queue;
237+
for (;;) {
238+
Subscriber<? super T> s = subscriber.get();
239+
boolean unlimited = false;
240+
if (s != null) {
241+
boolean d = done;
242+
boolean empty = q.isEmpty();
243+
244+
if (checkTerminated(d, empty, s)) {
245+
return;
246+
}
247+
long r = get();
248+
unlimited = r == Long.MAX_VALUE;
249+
long e = 0L;
250+
251+
while (r != 0) {
252+
d = done;
253+
Object v = q.poll();
254+
empty = v == null;
255+
if (checkTerminated(d, empty, s)) {
256+
return;
257+
}
258+
if (empty) {
259+
break;
260+
}
261+
T value = nl.getValue(v);
262+
try {
263+
s.onNext(value);
264+
} catch (Throwable ex) {
265+
q.clear();
266+
Exceptions.throwIfFatal(ex);
267+
s.onError(OnErrorThrowable.addValueAsLastCause(ex, value));
268+
return;
269+
}
270+
r--;
271+
e++;
272+
}
273+
if (!unlimited && e != 0L) {
274+
addAndGet(-e);
275+
}
276+
}
277+
278+
synchronized (this) {
279+
if (!missed) {
280+
if (unlimited && q.isEmpty()) {
281+
caughtUp = true;
282+
}
283+
emitting = false;
284+
return;
285+
}
286+
missed = false;
287+
}
288+
}
289+
}
290+
/**
291+
* Terminates the state by setting the done flag and tries to clear the queue.
292+
* Should be called only when the child unsubscribes
293+
*/
294+
@Override
295+
public void call() {
296+
done = true;
297+
synchronized (this) {
298+
if (emitting) {
299+
return;
300+
}
301+
emitting = true;
302+
}
303+
queue.clear();
304+
}
305+
/**
306+
* Checks if one of the terminal conditions have been met: child unsubscribed,
307+
* an error happened or the source terminated and the queue is empty
308+
* @param done
309+
* @param empty
310+
* @param s
311+
* @return
312+
*/
313+
boolean checkTerminated(boolean done, boolean empty, Subscriber<? super T> s) {
314+
if (s.isUnsubscribed()) {
315+
queue.clear();
316+
return true;
317+
}
318+
if (done) {
319+
Throwable e = error;
320+
if (e != null) {
321+
queue.clear();
322+
s.onError(e);
323+
return true;
324+
} else
325+
if (empty) {
326+
s.onCompleted();
327+
return true;
328+
}
329+
}
330+
return false;
331+
}
332+
}
333+
/**
334+
* The OnSubscribe implementation of the BufferUntilSubscriber.
335+
*
336+
* @param <T> the value type
337+
*/
338+
static final class OnSubscribeBUS<T> implements OnSubscribe<T> {
339+
final State<T> state;
340+
public OnSubscribeBUS(State<T> state) {
341+
this.state = state;
342+
}
343+
@Override
344+
public void call(Subscriber<? super T> child) {
345+
state.setSubscriber(child);
346+
}
347+
}
348+
}

src/main/java/rx/internal/operators/OperatorWindowWithObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void replaceSubject() {
154154
child.onNext(producer);
155155
}
156156
void createNewWindow() {
157-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
157+
BufferUntilSubscriberV2<T> bus = BufferUntilSubscriberV2.create();
158158
consumer = bus;
159159
producer = bus;
160160
}

src/main/java/rx/internal/operators/OperatorWindowWithObservableFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void replaceSubject() {
160160
child.onNext(producer);
161161
}
162162
void createNewWindow() {
163-
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
163+
BufferUntilSubscriberV2<T> bus = BufferUntilSubscriberV2.create();
164164
consumer = bus;
165165
producer = bus;
166166
Observable<? extends U> other;

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
6060
final class ExactSubscriber extends Subscriber<T> {
6161
final Subscriber<? super Observable<T>> child;
6262
int count;
63-
BufferUntilSubscriber<T> window;
63+
BufferUntilSubscriberV2<T> window;
6464
volatile boolean noWindow = true;
6565
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
6666
/**
@@ -107,7 +107,7 @@ void requestMore(long n) {
107107
public void onNext(T t) {
108108
if (window == null) {
109109
noWindow = false;
110-
window = BufferUntilSubscriber.create();
110+
window = BufferUntilSubscriberV2.create();
111111
child.onNext(window);
112112
}
113113
window.onNext(t);
@@ -242,7 +242,7 @@ public void onCompleted() {
242242
}
243243

244244
CountedSubject<T> createCountedSubject() {
245-
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
245+
final BufferUntilSubscriberV2<T> bus = BufferUntilSubscriberV2.create();
246246
return new CountedSubject<T>(bus, bus);
247247
}
248248
}

0 commit comments

Comments
 (0)