Skip to content

Commit a1f9988

Browse files
Merge pull request #572 from akarnokd/ObserveOn3
ObserveOn fix for observing the same source on the same scheduler by two...
2 parents 53787f2 + dab56f7 commit a1f9988

File tree

2 files changed

+119
-52
lines changed

2 files changed

+119
-52
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import rx.concurrency.CurrentThreadScheduler;
2828
import rx.concurrency.ImmediateScheduler;
2929
import rx.subscriptions.CompositeSubscription;
30+
import rx.subscriptions.SerialSubscription;
3031
import rx.subscriptions.Subscriptions;
3132
import rx.util.functions.Action0;
3233
import rx.util.functions.Action1;
@@ -46,10 +47,6 @@ public static <T> OnSubscribeFunc<T> observeOn(Observable<? extends T> source, S
4647
private static class ObserveOn<T> implements OnSubscribeFunc<T> {
4748
private final Observable<? extends T> source;
4849
private final Scheduler scheduler;
49-
private volatile Scheduler recursiveScheduler;
50-
51-
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
52-
final AtomicInteger counter = new AtomicInteger(0);
5350

5451
public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
5552
this.source = source;
@@ -65,71 +62,67 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
6562
// do nothing if we request CurrentThreadScheduler so we don't invoke overhead
6663
return source.subscribe(observer);
6764
} else {
68-
return observeOn(observer, scheduler);
65+
return new Observation(observer).init();
6966
}
7067
}
68+
/** Observe through individual queue per observer. */
69+
private class Observation implements Action1<Notification<? extends T>> {
70+
final Observer<? super T> observer;
71+
final CompositeSubscription s;
72+
final ConcurrentLinkedQueue<Notification<? extends T>> queue;
73+
final AtomicInteger counter;
74+
private volatile Scheduler recursiveScheduler;
75+
public Observation(Observer<? super T> observer) {
76+
this.observer = observer;
77+
this.queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
78+
this.counter = new AtomicInteger(0);
79+
this.s = new CompositeSubscription();
80+
}
81+
public Subscription init() {
82+
s.add(source.materialize().subscribe(this));
83+
return s;
84+
}
7185

72-
public Subscription observeOn(final Observer<? super T> observer, final Scheduler scheduler) {
73-
final CompositeSubscription s = new CompositeSubscription();
74-
75-
s.add(source.materialize().subscribe(new Action1<Notification<? extends T>>() {
76-
77-
@Override
78-
public void call(Notification<? extends T> e) {
79-
// this must happen before 'counter' is used to provide synchronization between threads
80-
queue.offer(e);
81-
82-
// we now use counter to atomically determine if we need to start processing or not
83-
// it will be 0 if it's the first notification or the scheduler has finished processing work
84-
// and we need to start doing it again
85-
if (counter.getAndIncrement() == 0) {
86-
if (recursiveScheduler == null) {
87-
s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
88-
86+
@Override
87+
public void call(Notification<? extends T> e) {
88+
queue.offer(e);
89+
if (counter.getAndIncrement() == 0) {
90+
if (recursiveScheduler == null) {
91+
s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
8992
@Override
9093
public Subscription call(Scheduler innerScheduler, T state) {
9194
// record innerScheduler so 'processQueue' can use it for all subsequent executions
9295
recursiveScheduler = innerScheduler;
9396

94-
processQueue(s, observer);
97+
processQueue();
9598

9699
return Subscriptions.empty();
97100
}
98101
}));
99-
} else {
100-
processQueue(s, observer);
101-
}
102+
} else {
103+
processQueue();
102104
}
103-
104105
}
105-
}));
106-
107-
return s;
108-
}
109-
110-
/**
111-
* This uses 'recursiveScheduler' NOT 'scheduler' as it should reuse the same scheduler each time it processes.
112-
* This means it must first get the recursiveScheduler when it first executes.
113-
*/
114-
private void processQueue(final CompositeSubscription s, final Observer<? super T> observer) {
106+
}
107+
void processQueue() {
108+
s.add(recursiveScheduler.schedule(new Action1<Action0>() {
109+
@Override
110+
public void call(Action0 self) {
111+
Notification<? extends T> not = queue.poll();
112+
if (not != null) {
113+
not.accept(observer);
114+
}
115115

116-
s.add(recursiveScheduler.schedule(new Action1<Action0>() {
117-
@Override
118-
public void call(Action0 self) {
119-
Notification<? extends T> not = queue.poll();
120-
if (not != null) {
121-
not.accept(observer);
122-
}
116+
// decrement count and if we still have work to do
117+
// recursively schedule ourselves to process again
118+
if (counter.decrementAndGet() > 0) {
119+
self.call();
120+
}
123121

124-
// decrement count and if we still have work to do
125-
// recursively schedule ourselves to process again
126-
if (counter.decrementAndGet() > 0) {
127-
self.call();
128122
}
129-
130-
}
131-
}));
123+
}));
124+
}
132125
}
133126
}
134127

135-
}
128+
}

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import rx.Observable;
3232
import rx.Observer;
3333
import rx.concurrency.Schedulers;
34+
import rx.concurrency.TestScheduler;
3435
import rx.util.functions.Action1;
3536

3637
public class OperationObserveOnTest {
@@ -132,4 +133,77 @@ public void call(String t1) {
132133

133134
inOrder.verify(observer, times(1)).onCompleted();
134135
}
136+
@Test
137+
public void observeOnTheSameSchedulerTwice() {
138+
TestScheduler scheduler = new TestScheduler();
139+
140+
Observable<Integer> o = Observable.from(1, 2, 3);
141+
Observable<Integer> o2 = o.observeOn(scheduler);
142+
143+
@SuppressWarnings("unchecked")
144+
Observer<Object> observer1 = mock(Observer.class);
145+
@SuppressWarnings("unchecked")
146+
Observer<Object> observer2 = mock(Observer.class);
147+
148+
InOrder inOrder1 = inOrder(observer1);
149+
InOrder inOrder2 = inOrder(observer2);
150+
151+
o2.subscribe(observer1);
152+
o2.subscribe(observer2);
153+
154+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
155+
156+
inOrder1.verify(observer1, times(1)).onNext(1);
157+
inOrder1.verify(observer1, times(1)).onNext(2);
158+
inOrder1.verify(observer1, times(1)).onNext(3);
159+
inOrder1.verify(observer1, times(1)).onCompleted();
160+
verify(observer1, never()).onError(any(Throwable.class));
161+
inOrder1.verifyNoMoreInteractions();
162+
163+
inOrder2.verify(observer2, times(1)).onNext(1);
164+
inOrder2.verify(observer2, times(1)).onNext(2);
165+
inOrder2.verify(observer2, times(1)).onNext(3);
166+
inOrder2.verify(observer2, times(1)).onCompleted();
167+
verify(observer2, never()).onError(any(Throwable.class));
168+
inOrder2.verifyNoMoreInteractions();
169+
170+
}
171+
@Test
172+
public void observeSameOnMultipleSchedulers() {
173+
TestScheduler scheduler1 = new TestScheduler();
174+
TestScheduler scheduler2 = new TestScheduler();
175+
176+
Observable<Integer> o = Observable.from(1, 2, 3);
177+
Observable<Integer> o1 = o.observeOn(scheduler1);
178+
Observable<Integer> o2 = o.observeOn(scheduler2);
179+
180+
@SuppressWarnings("unchecked")
181+
Observer<Object> observer1 = mock(Observer.class);
182+
@SuppressWarnings("unchecked")
183+
Observer<Object> observer2 = mock(Observer.class);
184+
185+
InOrder inOrder1 = inOrder(observer1);
186+
InOrder inOrder2 = inOrder(observer2);
187+
188+
o1.subscribe(observer1);
189+
o2.subscribe(observer2);
190+
191+
scheduler1.advanceTimeBy(1, TimeUnit.SECONDS);
192+
scheduler2.advanceTimeBy(1, TimeUnit.SECONDS);
193+
194+
inOrder1.verify(observer1, times(1)).onNext(1);
195+
inOrder1.verify(observer1, times(1)).onNext(2);
196+
inOrder1.verify(observer1, times(1)).onNext(3);
197+
inOrder1.verify(observer1, times(1)).onCompleted();
198+
verify(observer1, never()).onError(any(Throwable.class));
199+
inOrder1.verifyNoMoreInteractions();
200+
201+
inOrder2.verify(observer2, times(1)).onNext(1);
202+
inOrder2.verify(observer2, times(1)).onNext(2);
203+
inOrder2.verify(observer2, times(1)).onNext(3);
204+
inOrder2.verify(observer2, times(1)).onCompleted();
205+
verify(observer2, never()).onError(any(Throwable.class));
206+
inOrder2.verifyNoMoreInteractions();
207+
208+
}
135209
}

0 commit comments

Comments
 (0)