Skip to content

Commit d3ebd70

Browse files
committed
Merge pull request #3476 from akarnokd/FlatMapRangePerfFix1x
1.x: overhead reduction for merge and flatMap
2 parents 84ac53e + 3b93232 commit d3ebd70

File tree

2 files changed

+76
-56
lines changed

2 files changed

+76
-56
lines changed

src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -25,108 +25,110 @@
2525
*/
2626
public final class OnSubscribeRange implements OnSubscribe<Integer> {
2727

28-
private final int start;
29-
private final int end;
28+
private final int startIndex;
29+
private final int endIndex;
3030

3131
public OnSubscribeRange(int start, int end) {
32-
this.start = start;
33-
this.end = end;
32+
this.startIndex = start;
33+
this.endIndex = end;
3434
}
3535

3636
@Override
37-
public void call(final Subscriber<? super Integer> o) {
38-
o.setProducer(new RangeProducer(o, start, end));
37+
public void call(final Subscriber<? super Integer> childSubscriber) {
38+
childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex));
3939
}
4040

4141
private static final class RangeProducer extends AtomicLong implements Producer {
4242
/** */
4343
private static final long serialVersionUID = 4114392207069098388L;
4444

45-
private final Subscriber<? super Integer> o;
46-
private final int end;
47-
private long index;
45+
private final Subscriber<? super Integer> childSubscriber;
46+
private final int endOfRange;
47+
private long currentIndex;
4848

49-
RangeProducer(Subscriber<? super Integer> o, int start, int end) {
50-
this.o = o;
51-
this.index = start;
52-
this.end = end;
49+
RangeProducer(Subscriber<? super Integer> childSubscriber, int startIndex, int endIndex) {
50+
this.childSubscriber = childSubscriber;
51+
this.currentIndex = startIndex;
52+
this.endOfRange = endIndex;
5353
}
5454

5555
@Override
56-
public void request(long n) {
56+
public void request(long requestedAmount) {
5757
if (get() == Long.MAX_VALUE) {
5858
// already started with fast-path
5959
return;
6060
}
61-
if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
61+
if (requestedAmount == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
6262
// fast-path without backpressure
6363
fastpath();
64-
} else if (n > 0L) {
65-
long c = BackpressureUtils.getAndAddRequest(this, n);
64+
} else if (requestedAmount > 0L) {
65+
long c = BackpressureUtils.getAndAddRequest(this, requestedAmount);
6666
if (c == 0L) {
6767
// backpressure is requested
68-
slowpath(n);
68+
slowpath(requestedAmount);
6969
}
7070
}
7171
}
7272

7373
/**
74-
*
74+
* Emits as many values as requested or remaining from the range, whichever is smaller.
7575
*/
76-
void slowpath(long r) {
77-
long idx = index;
78-
while (true) {
79-
/*
80-
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
81-
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
82-
*/
83-
long fs = end - idx + 1;
84-
long e = Math.min(fs, r);
85-
final boolean complete = fs <= r;
86-
87-
fs = e + idx;
88-
final Subscriber<? super Integer> o = this.o;
76+
void slowpath(long requestedAmount) {
77+
long emitted = 0L;
78+
long endIndex = endOfRange + 1L;
79+
long index = currentIndex;
80+
81+
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
82+
83+
for (;;) {
8984

90-
for (long i = idx; i != fs; i++) {
91-
if (o.isUnsubscribed()) {
85+
while (emitted != requestedAmount && index != endIndex) {
86+
if (childSubscriber.isUnsubscribed()) {
9287
return;
9388
}
94-
o.onNext((int) i);
89+
90+
childSubscriber.onNext((int)index);
91+
92+
index++;
93+
emitted++;
9594
}
9695

97-
if (complete) {
98-
if (o.isUnsubscribed()) {
99-
return;
100-
}
101-
o.onCompleted();
96+
if (childSubscriber.isUnsubscribed()) {
10297
return;
10398
}
10499

105-
idx = fs;
106-
index = fs;
107-
108-
r = addAndGet(-e);
109-
if (r == 0L) {
110-
// we're done emitting the number requested so return
100+
if (index == endIndex) {
101+
childSubscriber.onCompleted();
111102
return;
112103
}
104+
105+
requestedAmount = get();
106+
107+
if (requestedAmount == emitted) {
108+
currentIndex = index;
109+
requestedAmount = addAndGet(-emitted);
110+
if (requestedAmount == 0L) {
111+
break;
112+
}
113+
emitted = 0L;
114+
}
113115
}
114116
}
115117

116118
/**
117-
*
119+
* Emits all remaining values without decrementing the requested amount.
118120
*/
119121
void fastpath() {
120-
final long end = this.end + 1L;
121-
final Subscriber<? super Integer> o = this.o;
122-
for (long i = index; i != end; i++) {
123-
if (o.isUnsubscribed()) {
122+
final long endIndex = this.endOfRange + 1L;
123+
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
124+
for (long index = currentIndex; index != endIndex; index++) {
125+
if (childSubscriber.isUnsubscribed()) {
124126
return;
125127
}
126-
o.onNext((int) i);
128+
childSubscriber.onNext((int) index);
127129
}
128-
if (!o.isUnsubscribed()) {
129-
o.onCompleted();
130+
if (!childSubscriber.isUnsubscribed()) {
131+
childSubscriber.onCompleted();
130132
}
131133
}
132134
}

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,24 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
180180
/** An empty array to avoid creating new empty arrays in removeInner. */
181181
static final InnerSubscriber<?>[] EMPTY = new InnerSubscriber<?>[0];
182182

183+
final int scalarEmissionLimit;
184+
185+
int scalarEmissionCount;
186+
183187
public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
184188
this.child = child;
185189
this.delayErrors = delayErrors;
186190
this.maxConcurrent = maxConcurrent;
187191
this.nl = NotificationLite.instance();
188192
this.innerGuard = new Object();
189193
this.innerSubscribers = EMPTY;
190-
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
194+
if (maxConcurrent == Integer.MAX_VALUE) {
195+
scalarEmissionLimit = Integer.MAX_VALUE;
196+
request(Long.MAX_VALUE);
197+
} else {
198+
scalarEmissionLimit = Math.max(1, maxConcurrent >> 1);
199+
request(maxConcurrent);
200+
}
191201
}
192202

193203
Queue<Throwable> getOrCreateErrorQueue() {
@@ -491,7 +501,15 @@ protected void emitScalar(T value, long r) {
491501
if (r != Long.MAX_VALUE) {
492502
producer.produced(1);
493503
}
494-
this.requestMore(1);
504+
505+
int produced = scalarEmissionCount + 1;
506+
if (produced == scalarEmissionLimit) {
507+
scalarEmissionCount = 0;
508+
this.requestMore(produced);
509+
} else {
510+
scalarEmissionCount = produced;
511+
}
512+
495513
// check if some state changed while emitting
496514
synchronized (this) {
497515
skipFinal = true;

0 commit comments

Comments
 (0)