Skip to content

Commit f699993

Browse files
author
Aaron Tull
committed
Implemented the AsyncOnSubscribe
1 parent 689e73f commit f699993

File tree

2 files changed

+387
-0
lines changed

2 files changed

+387
-0
lines changed
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.observables;
18+
19+
import java.util.HashSet;
20+
import java.util.Set;
21+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
25+
import rx.Observable;
26+
import rx.Observable.OnSubscribe;
27+
import rx.Observer;
28+
import rx.Producer;
29+
import rx.Subscriber;
30+
import rx.Subscription;
31+
import rx.functions.Action0;
32+
import rx.internal.operators.BackpressureUtils;
33+
import rx.internal.operators.BufferUntilSubscriber;
34+
import rx.observers.SerializedObserver;
35+
import rx.plugins.RxJavaPlugins;
36+
import rx.subscriptions.BooleanSubscription;
37+
38+
public abstract class AsyncOnSubscribe<S, T> implements OnSubscribe<T> {
39+
40+
/**
41+
* Executed once when subscribed to by a subscriber (via {@link OnSubscribe#call(Subscriber)})
42+
* to produce a state value. This value is passed into {@link #next(Object, Observer) next(S
43+
* state, Observer <T> observer)} on the first iteration. Subsequent iterations of {@code next}
44+
* will receive the state returned by the previous invocation of {@code next}.
45+
*
46+
* @return the initial state value
47+
*/
48+
protected abstract S generateState();
49+
50+
/**
51+
* Called to produce data to the downstream subscribers. To emit data to a downstream subscriber
52+
* call {@code observer.onNext(t)}. To signal an error condition call
53+
* {@code observer.onError(throwable)} or throw an Exception. To signal the end of a data stream
54+
* call {@code
55+
* observer.onCompleted()}. Implementations of this method must follow the following rules.
56+
*
57+
* <ul>
58+
* <li>Must not call {@code observer.onNext(t)} more than 1 time per invocation.</li>
59+
* <li>Must not call {@code observer.onNext(t)} concurrently.</li>
60+
* </ul>
61+
*
62+
* The value returned from an invocation of this method will be passed in as the {@code state}
63+
* argument of the next invocation of this method.
64+
*
65+
* @param state
66+
* the state value (from {@link #generateState()} on the first invocation or the
67+
* previous invocation of this method.
68+
* @param observer
69+
* the observer of data emitted by
70+
* @return the next iteration's state value
71+
*/
72+
protected abstract S next(S state, long requested, Observer<Observable<T>> observer);
73+
74+
/**
75+
* Clean up behavior that is executed after the downstream subscriber's subscription is
76+
* unsubscribed. This method will be invoked exactly once.
77+
*
78+
* @param state
79+
* the last state value prior from {@link #generateState()} or
80+
* {@link #next(Object, Observer)} before unsubscribe.
81+
*/
82+
protected void onUnsubscribe(S state) {
83+
84+
}
85+
86+
@Override
87+
public final void call(Subscriber<? super T> actualSubscriber) {
88+
S state = generateState();
89+
UnicastSubject<Observable<T>> unicast = UnicastSubject.<Observable<T>>create();
90+
Observable<T> concat = Observable.concat(unicast);
91+
AsyncOuterSubscriber<S, T> outerSubscriberProducer = new AsyncOuterSubscriber<S, T>(this, state, unicast);
92+
actualSubscriber.add(outerSubscriberProducer);
93+
concat.subscribe(actualSubscriber);
94+
actualSubscriber.setProducer(outerSubscriberProducer);
95+
}
96+
97+
private static class AsyncOuterSubscriber<S, T>
98+
extends AtomicLong
99+
implements Producer, Subscription, Observer<Observable<T>> {
100+
/** */
101+
private static final long serialVersionUID = -7884904861928856832L;
102+
103+
private volatile int isUnsubscribed;
104+
@SuppressWarnings("rawtypes")
105+
private static final AtomicIntegerFieldUpdater<AsyncOuterSubscriber> IS_UNSUBSCRIBED =
106+
AtomicIntegerFieldUpdater.newUpdater(AsyncOuterSubscriber.class, "isUnsubscribed");
107+
108+
private final AsyncOnSubscribe<S, T> parent;
109+
private final SerializedObserver<Observable<T>> serializedSubscriber;
110+
private final Set<Subscription> subscriptions = new HashSet<Subscription>();
111+
112+
private boolean hasTerminated = false;
113+
private boolean onNextCalled = false;
114+
115+
private S state;
116+
117+
private final UnicastSubject<Observable<T>> merger;
118+
119+
public AsyncOuterSubscriber(AsyncOnSubscribe<S, T> parent, S initialState, UnicastSubject<Observable<T>> merger) {
120+
this.parent = parent;
121+
this.serializedSubscriber = new SerializedObserver<Observable<T>>(this);
122+
this.state = initialState;
123+
this.merger = merger;
124+
}
125+
126+
@Override
127+
public void unsubscribe() {
128+
System.out.println("unsub main " + isUnsubscribed);
129+
if (IS_UNSUBSCRIBED.compareAndSet(this, 0, 1)) {
130+
// it's safe to process terminal behavior
131+
System.out.println("subscriptions: " + subscriptions);
132+
parent.onUnsubscribe(state);
133+
for(Subscription s : subscriptions) {
134+
System.out.println("unsubing: " + s);
135+
if (!s.isUnsubscribed()) {
136+
s.unsubscribe();
137+
}
138+
}
139+
}
140+
}
141+
142+
@Override
143+
public boolean isUnsubscribed() {
144+
return isUnsubscribed != 0;
145+
}
146+
147+
public S nextIteration(long n) {
148+
return parent.next(state, n, serializedSubscriber);
149+
}
150+
151+
@Override
152+
public void request(long n) {
153+
long previousRequestAmount = BackpressureUtils.getAndAddRequest(this, n);
154+
if (n > 0 && previousRequestAmount == 0L) {
155+
do {
156+
// check if unsubscribed before doing any work
157+
if (isUnsubscribed()) {
158+
unsubscribe();
159+
return;
160+
}
161+
// otherwise try one iteration for a request of `numRequested` elements
162+
try {
163+
onNextCalled = false;
164+
state = nextIteration(n);
165+
} catch (Throwable ex) {
166+
handleThrownError(parent, state, ex);
167+
return;
168+
}
169+
} while (!hasTerminated);
170+
}
171+
}
172+
173+
private void handleThrownError(final AsyncOnSubscribe<S, T> p, S st, Throwable ex) {
174+
if (hasTerminated) {
175+
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
176+
} else {
177+
hasTerminated = true;
178+
merger.onError(ex);
179+
unsubscribe();
180+
}
181+
}
182+
183+
@Override
184+
public void onCompleted() {
185+
if (hasTerminated) {
186+
throw new IllegalStateException("Terminal event already emitted.");
187+
}
188+
hasTerminated = true;
189+
merger.onCompleted();
190+
}
191+
192+
@Override
193+
public void onError(Throwable e) {
194+
if (hasTerminated) {
195+
throw new IllegalStateException("Terminal event already emitted.");
196+
}
197+
hasTerminated = true;
198+
merger.onError(e);
199+
}
200+
201+
private static Subscription SUBSCRIPTION_SENTINAL = new BooleanSubscription();
202+
203+
@Override
204+
public void onNext(final Observable<T> t) {
205+
if (onNextCalled) {
206+
throw new IllegalStateException("onNext called multiple times!");
207+
}
208+
onNextCalled = true;
209+
if (hasTerminated)
210+
return;
211+
BufferUntilSubscriber<T> buffer = BufferUntilSubscriber.<T>create();
212+
final AtomicReference<Subscription> holder = new AtomicReference<Subscription>(SUBSCRIPTION_SENTINAL);
213+
final Subscription innerSubscription = t.doOnTerminate(new Action0(){
214+
@Override
215+
public void call() {
216+
if (!holder.compareAndSet(SUBSCRIPTION_SENTINAL, null)) {
217+
Subscription h = holder.get();
218+
System.out.println("Removing subscription " + h);
219+
subscriptions.remove(h);
220+
}
221+
}
222+
}).subscribe(buffer);
223+
if (holder.compareAndSet(SUBSCRIPTION_SENTINAL, innerSubscription)) {
224+
System.out.println("t: " + t + " s: "+ innerSubscription);
225+
subscriptions.add(innerSubscription);
226+
}
227+
merger.onNext(buffer);
228+
}
229+
}
230+
231+
private static final class UnicastSubject<T> extends Observable<T> implements Observer<T> {
232+
public static <T> UnicastSubject<T> create() {
233+
return new UnicastSubject<T>(new State<T>());
234+
}
235+
236+
private State<T> state;
237+
238+
protected UnicastSubject(final State<T> state) {
239+
super(new OnSubscribe<T>() {
240+
@Override
241+
public void call(Subscriber<? super T> s) {
242+
if (state.subscriber != null) {
243+
s.onError(new IllegalStateException("Only 1 Subscriber permitted"));
244+
} else {
245+
state.subscriber = s;
246+
}
247+
}});
248+
this.state = state;
249+
}
250+
251+
@Override
252+
public void onCompleted() {
253+
state.subscriber.onCompleted();
254+
}
255+
256+
@Override
257+
public void onError(Throwable e) {
258+
state.subscriber.onError(e);
259+
}
260+
261+
@Override
262+
public void onNext(T t) {
263+
state.subscriber.onNext(t);
264+
}
265+
266+
private static class State<T> {
267+
private Subscriber<? super T> subscriber;
268+
}
269+
}
270+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package rx.observables;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.atomic.AtomicReference;
7+
8+
import org.junit.Assert;
9+
import org.junit.Test;
10+
11+
import rx.Observable;
12+
import rx.Observable.OnSubscribe;
13+
import rx.Observer;
14+
import rx.functions.Action0;
15+
16+
public class AsyncOnSubscribeTest {
17+
18+
@Test
19+
public void test() throws InterruptedException {
20+
final AtomicBoolean unsubbedFrom1 = new AtomicBoolean(false);
21+
final AtomicBoolean unsubbedFrom2 = new AtomicBoolean(false);
22+
final AtomicBoolean unsubbedFrom3 = new AtomicBoolean(false);
23+
final AtomicBoolean unsubbedFrom4 = new AtomicBoolean(false);
24+
final CountDownLatch l1 = new CountDownLatch(1);
25+
final CountDownLatch l2 = new CountDownLatch(1);
26+
final CountDownLatch l3 = new CountDownLatch(1);
27+
final CountDownLatch l4 = new CountDownLatch(1);
28+
final AtomicReference<String> failed1 = new AtomicReference<String>();
29+
final AtomicReference<String> failed2 = new AtomicReference<String>();
30+
final AtomicReference<String> failed3 = new AtomicReference<String>();
31+
final AtomicReference<String> failed4 = new AtomicReference<String>();
32+
OnSubscribe<Integer> os = new AsyncOnSubscribe<Integer, Integer>() {
33+
@Override
34+
protected Integer generateState() {
35+
return 1;
36+
}
37+
38+
@Override
39+
protected Integer next(Integer state, long requested, Observer<Observable<Integer>> observer) {
40+
if (state == 1) {
41+
Observable<Integer> o1 = Observable.just(1)
42+
.delay(100, TimeUnit.MILLISECONDS)
43+
.doOnUnsubscribe(new Action0(){
44+
@Override
45+
public void call() {
46+
unsubbedFrom1.set(true);
47+
if (l1.getCount() <= 0)
48+
failed1.set("1st Observable that emits terminal unsubscribed twice");
49+
l1.countDown();
50+
}});
51+
observer.onNext(o1);
52+
} else if (state == 2) {
53+
Observable<Integer> o = Observable.just(2)
54+
.delay(50, TimeUnit.MILLISECONDS)
55+
.doOnUnsubscribe(new Action0(){
56+
@Override
57+
public void call() {
58+
unsubbedFrom2.set(true);
59+
if (l2.getCount() <= 0)
60+
failed2.set("2nd Observable that emits terminal unsubscribed twice");
61+
l2.countDown();
62+
}});
63+
observer.onNext(o);
64+
} else if (state == 3) {
65+
Observable<Integer> o = Observable.<Integer>never()
66+
.doOnUnsubscribe(new Action0(){
67+
@Override
68+
public void call() {
69+
unsubbedFrom3.set(true);
70+
if (l3.getCount() <= 0)
71+
failed3.set("Observable.never() unsubscribed twice");
72+
l3.countDown();
73+
}});
74+
observer.onNext(o);
75+
} else if (state == 4) {
76+
Observable<Integer> o = Observable.just(4)
77+
.delay(2000, TimeUnit.MILLISECONDS)
78+
.doOnUnsubscribe(new Action0(){
79+
@Override
80+
public void call() {
81+
unsubbedFrom4.set(true);
82+
if (l4.getCount() <= 0)
83+
failed4.set("Observable delayed until after outer unsubscribe was unsubscribed twice");
84+
l4.countDown();
85+
}});
86+
observer.onNext(o);
87+
}
88+
else
89+
observer.onCompleted();
90+
return state + 1;
91+
}
92+
};
93+
Observable.create(os).take(2).toBlocking().last();
94+
95+
latch(l1);
96+
latch(l2);
97+
latch(l3);
98+
latch(l4);
99+
assertNullOrMessage(failed1.get());
100+
assertNullOrMessage(failed2.get());
101+
assertNullOrMessage(failed3.get());
102+
assertNullOrMessage(failed4.get());
103+
Assert.assertTrue("did not unsub from first observable after terminal", unsubbedFrom1.get());
104+
Assert.assertTrue("did not unsub from second observable after terminal", unsubbedFrom2.get());
105+
Assert.assertTrue("did not unsub from Observable.never() inner obs", unsubbedFrom3.get());
106+
Assert.assertTrue("did not unsub from observable unsubscribed from outer", unsubbedFrom4.get());
107+
}
108+
109+
private void assertNullOrMessage(String msg) {
110+
Assert.assertEquals(msg, null, msg);
111+
}
112+
113+
private void latch(CountDownLatch latch) throws InterruptedException {
114+
if (!latch.await(2, TimeUnit.SECONDS))
115+
Assert.fail("count down didn't succeed");
116+
}
117+
}

0 commit comments

Comments
 (0)