Skip to content

Commit e44d6c4

Browse files
committed
fix OperatorObserveOn race condition where onComplete could be emitted despite onError being called
1 parent 2532484 commit e44d6c4

File tree

1 file changed

+37
-40
lines changed

1 file changed

+37
-40
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,15 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// These are the states for the status field. The transitions are
80+
// ACTIVE -> COMPLETED and ACTIVE -> ERRORED
81+
private static final byte ACTIVE = 0;
82+
private static final byte COMPLETED = 1;
83+
private static final byte ERRORED = 2;
84+
85+
// the current status of the incoming stream, possible values listed above
86+
volatile byte status = ACTIVE;
8087

8188
volatile long requested = 0;
8289
@SuppressWarnings("rawtypes")
@@ -127,7 +134,7 @@ public void onStart() {
127134

128135
@Override
129136
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
137+
if (isUnsubscribed() || status !=ACTIVE) {
131138
return;
132139
}
133140
if (!queue.offer(on.next(t))) {
@@ -139,30 +146,23 @@ public void onNext(final T t) {
139146

140147
@Override
141148
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
143-
return;
144-
}
145-
if (error != null) {
149+
if (isUnsubscribed() || status != ACTIVE) {
146150
return;
147151
}
148-
completed = true;
152+
status = COMPLETED;
149153
schedule();
150154
}
151155

152156
@Override
153157
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
155-
return;
156-
}
157-
if (error != null) {
158+
if (isUnsubscribed() || status != ACTIVE) {
158159
return;
159160
}
160161
error = e;
161162
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162163
unsubscribe();
163164
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
165+
status = ERRORED;
166166
schedule();
167167
}
168168

@@ -193,39 +193,36 @@ void pollQueue() {
193193

194194
// middle:
195195
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
196+
if (status == ERRORED) {
197197
child.onError(error);
198198
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
201-
child.onCompleted();
202-
return;
203-
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
Object o = queue.poll();
206-
if (o == null) {
207-
if (completed) {
208-
if (failure) {
209-
child.onError(error);
210-
} else {
211-
child.onCompleted();
212-
}
213-
return;
214-
}
215-
// nothing in queue
199+
} else if (requested == 0 && status == COMPLETED && queue.isEmpty()) {
200+
child.onCompleted();
201+
return;
202+
} else if (REQUESTED.getAndDecrement(this) != 0) {
203+
Object o = queue.poll();
204+
if (o == null) {
205+
// nothing in queue
206+
if (status == ERRORED) {
207+
child.onError(error);
208+
return;
209+
} else if (status == COMPLETED) {
210+
child.onCompleted();
211+
return;
212+
} else {
216213
REQUESTED.incrementAndGet(this);
217214
break;
218-
} else {
219-
if (!on.accept(child, o)) {
220-
// non-terminal event so let's increment count
221-
emitted++;
222-
}
223215
}
224216
} else {
225-
// we hit the end ... so increment back to 0 again
226-
REQUESTED.incrementAndGet(this);
227-
break;
217+
if (!on.accept(child, o)) {
218+
// non-terminal event so let's increment count
219+
emitted++;
220+
}
228221
}
222+
} else {
223+
// we hit the end ... so increment back to 0 again
224+
REQUESTED.incrementAndGet(this);
225+
break;
229226
}
230227
}
231228
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

0 commit comments

Comments
 (0)