Skip to content

Commit c68dd0a

Browse files
committed
Merge pull request #3702 from akarnokd/MapNotificationFix1x
1.x: fix mapNotification's last item backpressure handling
2 parents 2366c54 + a6f35a5 commit c68dd0a

File tree

2 files changed

+208
-163
lines changed

2 files changed

+208
-163
lines changed

src/main/java/rx/internal/operators/OperatorMapNotification.java

Lines changed: 123 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,12 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.Queue;
19-
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.concurrent.atomic.AtomicLong;
18+
import java.util.concurrent.atomic.*;
2119

2220
import rx.*;
2321
import rx.Observable.Operator;
24-
import rx.exceptions.*;
22+
import rx.exceptions.Exceptions;
2523
import rx.functions.*;
26-
import rx.internal.producers.ProducerArbiter;
27-
import rx.internal.util.unsafe.*;
2824

2925
/**
3026
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
@@ -45,203 +41,167 @@ public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? sup
4541
}
4642

4743
@Override
48-
public Subscriber<? super T> call(final Subscriber<? super R> o) {
49-
final ProducerArbiter pa = new ProducerArbiter();
50-
51-
MapNotificationSubscriber subscriber = new MapNotificationSubscriber(pa, o);
52-
o.add(subscriber);
53-
subscriber.init();
54-
return subscriber;
44+
public Subscriber<? super T> call(final Subscriber<? super R> child) {
45+
final MapNotificationSubscriber<T, R> parent = new MapNotificationSubscriber<T, R>(child, onNext, onError, onCompleted);
46+
child.add(parent);
47+
child.setProducer(new Producer() {
48+
@Override
49+
public void request(long n) {
50+
parent.requestInner(n);
51+
}
52+
});
53+
return parent;
5554
}
5655

57-
final class MapNotificationSubscriber extends Subscriber<T> {
58-
private final Subscriber<? super R> o;
59-
private final ProducerArbiter pa;
60-
final SingleEmitter<R> emitter;
61-
62-
MapNotificationSubscriber(ProducerArbiter pa, Subscriber<? super R> o) {
63-
this.pa = pa;
64-
this.o = o;
65-
this.emitter = new SingleEmitter<R>(o, pa, this);
66-
}
56+
static final class MapNotificationSubscriber<T, R> extends Subscriber<T> {
6757

68-
void init() {
69-
o.setProducer(emitter);
70-
}
58+
final Subscriber<? super R> actual;
59+
60+
final Func1<? super T, ? extends R> onNext;
61+
62+
final Func1<? super Throwable, ? extends R> onError;
63+
64+
final Func0<? extends R> onCompleted;
65+
66+
final AtomicLong requested;
7167

72-
@Override
73-
public void setProducer(Producer producer) {
74-
pa.setProducer(producer);
68+
final AtomicLong missedRequested;
69+
70+
final AtomicReference<Producer> producer;
71+
72+
long produced;
73+
74+
R value;
75+
76+
static final long COMPLETED_FLAG = Long.MIN_VALUE;
77+
static final long REQUESTED_MASK = Long.MAX_VALUE;
78+
79+
public MapNotificationSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> onNext,
80+
Func1<? super Throwable, ? extends R> onError, Func0<? extends R> onCompleted) {
81+
this.actual = actual;
82+
this.onNext = onNext;
83+
this.onError = onError;
84+
this.onCompleted = onCompleted;
85+
this.requested = new AtomicLong();
86+
this.missedRequested = new AtomicLong();
87+
this.producer = new AtomicReference<Producer>();
7588
}
7689

7790
@Override
78-
public void onCompleted() {
91+
public void onNext(T t) {
7992
try {
80-
emitter.offerAndComplete(onCompleted.call());
81-
} catch (Throwable e) {
82-
Exceptions.throwOrReport(e, o);
93+
produced++;
94+
actual.onNext(onNext.call(t));
95+
} catch (Throwable ex) {
96+
Exceptions.throwOrReport(ex, actual, t);
8397
}
8498
}
85-
99+
86100
@Override
87101
public void onError(Throwable e) {
102+
accountProduced();
88103
try {
89-
emitter.offerAndComplete(onError.call(e));
90-
} catch (Throwable e2) {
91-
Exceptions.throwOrReport(e2, o);
104+
value = onError.call(e);
105+
} catch (Throwable ex) {
106+
Exceptions.throwOrReport(ex, actual, e);
92107
}
108+
tryEmit();
93109
}
94-
110+
95111
@Override
96-
public void onNext(T t) {
112+
public void onCompleted() {
113+
accountProduced();
97114
try {
98-
emitter.offer(onNext.call(t));
99-
} catch (Throwable e) {
100-
Exceptions.throwOrReport(e, o, t);
115+
value = onCompleted.call();
116+
} catch (Throwable ex) {
117+
Exceptions.throwOrReport(ex, actual);
101118
}
119+
tryEmit();
102120
}
103-
}
104-
static final class SingleEmitter<T> extends AtomicLong implements Producer, Subscription {
105-
/** */
106-
private static final long serialVersionUID = -249869671366010660L;
107-
final NotificationLite<T> nl;
108-
final Subscriber<? super T> child;
109-
final Producer producer;
110-
final Subscription cancel;
111-
final Queue<Object> queue;
112-
volatile boolean complete;
113-
/** Guarded by this. */
114-
boolean emitting;
115-
/** Guarded by this. */
116-
boolean missed;
117121

118-
public SingleEmitter(Subscriber<? super T> child, Producer producer, Subscription cancel) {
119-
this.child = child;
120-
this.producer = producer;
121-
this.cancel = cancel;
122-
this.queue = UnsafeAccess.isUnsafeAvailable()
123-
? new SpscArrayQueue<Object>(2)
124-
: new ConcurrentLinkedQueue<Object>();
125-
126-
this.nl = NotificationLite.instance();
122+
void accountProduced() {
123+
long p = produced;
124+
if (p != 0L && producer.get() != null) {
125+
BackpressureUtils.produced(requested, p);
126+
}
127127
}
128+
128129
@Override
129-
public void request(long n) {
130-
for (;;) {
131-
long r = get();
132-
if (r < 0) {
133-
return;
134-
}
135-
long u = r + n;
136-
if (u < 0) {
137-
u = Long.MAX_VALUE;
138-
}
139-
if (compareAndSet(r, u)) {
140-
producer.request(n);
141-
drain();
142-
return;
130+
public void setProducer(Producer p) {
131+
if (producer.compareAndSet(null, p)) {
132+
long r = missedRequested.getAndSet(0L);
133+
if (r != 0L) {
134+
p.request(r);
143135
}
136+
} else {
137+
throw new IllegalStateException("Producer already set!");
144138
}
145139
}
146140

147-
void produced(long n) {
141+
void tryEmit() {
148142
for (;;) {
149-
long r = get();
150-
if (r < 0) {
151-
return;
152-
}
153-
long u = r - n;
154-
if (u < 0) {
155-
throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")");
143+
long r = requested.get();
144+
if ((r & COMPLETED_FLAG) != 0) {
145+
break;
156146
}
157-
if (compareAndSet(r, u)) {
147+
if (requested.compareAndSet(r, r | COMPLETED_FLAG)) {
148+
if (r != 0 || producer.get() == null) {
149+
if (!actual.isUnsubscribed()) {
150+
actual.onNext(value);
151+
}
152+
if (!actual.isUnsubscribed()) {
153+
actual.onCompleted();
154+
}
155+
}
158156
return;
159157
}
160158
}
161159
}
162160

163-
public void offer(T value) {
164-
if (!queue.offer(value)) {
165-
child.onError(new MissingBackpressureException());
166-
unsubscribe();
167-
} else {
168-
drain();
161+
void requestInner(long n) {
162+
if (n < 0L) {
163+
throw new IllegalArgumentException("n >= 0 required but it was " + n);
169164
}
170-
}
171-
public void offerAndComplete(T value) {
172-
if (!this.queue.offer(value)) {
173-
child.onError(new MissingBackpressureException());
174-
unsubscribe();
175-
} else {
176-
this.complete = true;
177-
drain();
178-
}
179-
}
180-
181-
void drain() {
182-
synchronized (this) {
183-
if (emitting) {
184-
missed = true;
185-
return;
186-
}
187-
emitting = true;
188-
missed = false;
165+
if (n == 0L) {
166+
return;
189167
}
190-
boolean skipFinal = false;
191-
try {
192-
for (;;) {
193-
194-
long r = get();
195-
boolean c = complete;
196-
boolean empty = queue.isEmpty();
197-
198-
if (c && empty) {
199-
child.onCompleted();
200-
skipFinal = true;
201-
return;
202-
} else
203-
if (r > 0) {
204-
Object v = queue.poll();
205-
if (v != null) {
206-
child.onNext(nl.getValue(v));
207-
produced(1);
208-
} else
209-
if (c) {
210-
child.onCompleted();
211-
skipFinal = true;
212-
return;
213-
}
214-
}
215-
216-
synchronized (this) {
217-
if (!missed) {
218-
skipFinal = true;
219-
emitting = false;
220-
return;
168+
for (;;) {
169+
long r = requested.get();
170+
171+
if ((r & COMPLETED_FLAG) != 0L) {
172+
long v = r & REQUESTED_MASK;
173+
long u = BackpressureUtils.addCap(v, n) | COMPLETED_FLAG;
174+
if (requested.compareAndSet(r, u)) {
175+
if (v == 0L) {
176+
if (!actual.isUnsubscribed()) {
177+
actual.onNext(value);
178+
}
179+
if (!actual.isUnsubscribed()) {
180+
actual.onCompleted();
181+
}
221182
}
222-
missed = false;
183+
return;
223184
}
224-
}
225-
} finally {
226-
if (!skipFinal) {
227-
synchronized (this) {
228-
emitting = false;
185+
} else {
186+
long u = BackpressureUtils.addCap(r, n);
187+
if (requested.compareAndSet(r, u)) {
188+
break;
229189
}
230190
}
231191
}
232-
}
233-
234-
@Override
235-
public boolean isUnsubscribed() {
236-
return get() < 0;
237-
}
238-
@Override
239-
public void unsubscribe() {
240-
long r = get();
241-
if (r != Long.MIN_VALUE) {
242-
r = getAndSet(Long.MIN_VALUE);
243-
if (r != Long.MIN_VALUE) {
244-
cancel.unsubscribe();
192+
193+
AtomicReference<Producer> localProducer = producer;
194+
Producer actualProducer = localProducer.get();
195+
if (actualProducer != null) {
196+
actualProducer.request(n);
197+
} else {
198+
BackpressureUtils.getAndAddRequest(missedRequested, n);
199+
actualProducer = localProducer.get();
200+
if (actualProducer != null) {
201+
long r = missedRequested.getAndSet(0L);
202+
if (r != 0L) {
203+
actualProducer.request(r);
204+
}
245205
}
246206
}
247207
}

0 commit comments

Comments
 (0)