Skip to content

Commit 47b098c

Browse files
Merge pull request #2929 from davidmoten/observe-on-race
OperatorObserveOn onComplete can be emitted despite onError being called
2 parents ecc2014 + c9d9324 commit 47b098c

File tree

2 files changed

+59
-51
lines changed

2 files changed

+59
-51
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 38 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// the status of the current stream
80+
volatile boolean finished = false;
8081

82+
@SuppressWarnings("unused")
8183
volatile long requested = 0;
84+
8285
@SuppressWarnings("rawtypes")
8386
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");
8487

8588
@SuppressWarnings("unused")
8689
volatile long counter;
90+
8791
@SuppressWarnings("rawtypes")
8892
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
8993

@@ -127,7 +131,7 @@ public void onStart() {
127131

128132
@Override
129133
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
134+
if (isUnsubscribed()) {
131135
return;
132136
}
133137
if (!queue.offer(on.next(t))) {
@@ -139,30 +143,23 @@ public void onNext(final T t) {
139143

140144
@Override
141145
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
146+
if (isUnsubscribed() || finished) {
143147
return;
144148
}
145-
if (error != null) {
146-
return;
147-
}
148-
completed = true;
149+
finished = true;
149150
schedule();
150151
}
151152

152153
@Override
153154
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
155-
return;
156-
}
157-
if (error != null) {
155+
if (isUnsubscribed() || finished) {
158156
return;
159157
}
160158
error = e;
161159
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162160
unsubscribe();
163-
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
161+
finished = true;
162+
// polling thread should skip any onNext still in the queue
166163
schedule();
167164
}
168165

@@ -185,52 +182,42 @@ protected void schedule() {
185182
void pollQueue() {
186183
int emitted = 0;
187184
do {
188-
/*
189-
* Set to 1 otherwise it could have grown very large while in the last poll loop
190-
* and then we can end up looping all those times again here before exiting even once we've drained
191-
*/
192185
counter = 1;
193-
194-
// middle:
195-
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
197-
child.onError(error);
198-
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
186+
long produced = 0;
187+
long r = requested;
188+
while (!child.isUnsubscribed()) {
189+
Throwable error;
190+
if (finished) {
191+
if ((error = this.error) != null) {
192+
// errors shortcut the queue so
193+
// release the elements in the queue for gc
194+
queue.clear();
195+
child.onError(error);
196+
return;
197+
} else
198+
if (queue.isEmpty()) {
201199
child.onCompleted();
202200
return;
203201
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
Object o = queue.poll();
206-
if (o == null) {
207-
if (completed) {
208-
if (failure) {
209-
child.onError(error);
210-
} else {
211-
child.onCompleted();
212-
}
213-
return;
214-
}
215-
// nothing in queue
216-
REQUESTED.incrementAndGet(this);
217-
break;
218-
} else {
219-
if (!on.accept(child, o)) {
220-
// non-terminal event so let's increment count
221-
emitted++;
222-
}
223-
}
202+
}
203+
if (r > 0) {
204+
Object o = queue.poll();
205+
if (o != null) {
206+
child.onNext(on.getValue(o));
207+
r--;
208+
emitted++;
209+
produced++;
224210
} else {
225-
// we hit the end ... so increment back to 0 again
226-
REQUESTED.incrementAndGet(this);
227211
break;
228212
}
213+
} else {
214+
break;
229215
}
230216
}
217+
if (produced > 0 && requested != Long.MAX_VALUE) {
218+
REQUESTED.addAndGet(this, -produced);
219+
}
231220
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);
232-
233-
// request the number of items that we emitted in this poll loop
234221
if (emitted > 0) {
235222
request(emitted);
236223
}

src/perf/java/rx/operators/OperatorObserveOnPerf.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,26 @@ public void observeOnImmediate(Input input) throws InterruptedException {
6666
input.observable.observeOn(Schedulers.immediate()).subscribe(o);
6767
o.latch.await();
6868
}
69+
70+
@Benchmark
71+
public void observeOnComputationSubscribedOnComputation(Input input) throws InterruptedException {
72+
LatchedObserver<Integer> o = input.newLatchedObserver();
73+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).subscribe(o);
74+
o.latch.await();
75+
}
76+
77+
@Benchmark
78+
public void observeOnNewThreadSubscribedOnComputation(Input input) throws InterruptedException {
79+
LatchedObserver<Integer> o = input.newLatchedObserver();
80+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).subscribe(o);
81+
o.latch.await();
82+
}
83+
84+
@Benchmark
85+
public void observeOnImmediateSubscribedOnComputation(Input input) throws InterruptedException {
86+
LatchedObserver<Integer> o = input.newLatchedObserver();
87+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.immediate()).subscribe(o);
88+
o.latch.await();
89+
}
6990

7091
}

0 commit comments

Comments
 (0)