Skip to content

Commit 5865f4a

Browse files
committed
1.x: range and flatMap overhead reduction
1 parent 9bc1987 commit 5865f4a

File tree

2 files changed

+70
-47
lines changed

2 files changed

+70
-47
lines changed

src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,90 +25,96 @@
2525
*/
2626
public final class OnSubscribeRange implements OnSubscribe<Integer> {
2727

28-
private final int start;
29-
private final int end;
28+
private final int startIndex;
29+
private final int endIndex;
3030

31-
public OnSubscribeRange(int start, int end) {
32-
this.start = start;
33-
this.end = end;
31+
public OnSubscribeRange(int startIndex, int endIndex) {
32+
this.startIndex = startIndex;
33+
this.endIndex = endIndex;
3434
}
3535

3636
@Override
37-
public void call(final Subscriber<? super Integer> o) {
38-
o.setProducer(new RangeProducer(o, start, end));
37+
public void call(final Subscriber<? super Integer> childSubscriber) {
38+
childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex));
3939
}
4040

4141
private static final class RangeProducer extends AtomicLong implements Producer {
4242
/** */
4343
private static final long serialVersionUID = 4114392207069098388L;
4444

45-
private final Subscriber<? super Integer> o;
46-
private final int end;
47-
private long index;
45+
private final Subscriber<? super Integer> childSubscriber;
46+
private final int endOfRange;
47+
private long currentIndex;
4848

49-
private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
50-
this.o = o;
51-
this.index = start;
52-
this.end = end;
49+
private RangeProducer(Subscriber<? super Integer> childSubscriber, int startIndex, int endIndex) {
50+
this.childSubscriber = childSubscriber;
51+
this.currentIndex = startIndex;
52+
this.endOfRange = endIndex;
5353
}
5454

5555
@Override
56-
public void request(long n) {
56+
public void request(long requestedAmount) {
5757
if (get() == Long.MAX_VALUE) {
5858
// already started with fast-path
5959
return;
6060
}
61-
if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
61+
if (requestedAmount == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
6262
// fast-path without backpressure
6363
fastpath();
64-
} else if (n > 0L) {
65-
long c = BackpressureUtils.getAndAddRequest(this, n);
64+
} else if (requestedAmount > 0L) {
65+
long c = BackpressureUtils.getAndAddRequest(this, requestedAmount);
6666
if (c == 0L) {
6767
// backpressure is requested
68-
slowpath(n);
68+
slowpath(requestedAmount);
6969
}
7070
}
7171
}
7272

7373
/**
7474
*
7575
*/
76-
void slowpath(long r) {
77-
long idx = index;
76+
void slowpath(long requestedAmount) {
77+
long positionInRange = currentIndex;
78+
long emitted = 0L;
7879
while (true) {
7980
/*
8081
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
8182
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
8283
*/
83-
long fs = end - idx + 1;
84-
long e = Math.min(fs, r);
85-
final boolean complete = fs <= r;
84+
long remainingAmountOrEndIndex = endOfRange - positionInRange + 1;
85+
final boolean complete = remainingAmountOrEndIndex <= requestedAmount;
8686

87-
fs = e + idx;
88-
final Subscriber<? super Integer> o = this.o;
87+
remainingAmountOrEndIndex = Math.min(remainingAmountOrEndIndex, requestedAmount) + positionInRange;
88+
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
8989

90-
for (long i = idx; i != fs; i++) {
91-
if (o.isUnsubscribed()) {
90+
for (long runningIndex = positionInRange; runningIndex != remainingAmountOrEndIndex; runningIndex++) {
91+
if (childSubscriber.isUnsubscribed()) {
9292
return;
9393
}
94-
o.onNext((int) i);
94+
childSubscriber.onNext((int) runningIndex);
9595
}
9696

9797
if (complete) {
98-
if (o.isUnsubscribed()) {
98+
if (childSubscriber.isUnsubscribed()) {
9999
return;
100100
}
101-
o.onCompleted();
101+
childSubscriber.onCompleted();
102102
return;
103103
}
104104

105-
idx = fs;
106-
index = fs;
105+
emitted -= remainingAmountOrEndIndex - positionInRange;
107106

108-
r = addAndGet(-e);
109-
if (r == 0L) {
110-
// we're done emitting the number requested so return
111-
return;
107+
positionInRange = remainingAmountOrEndIndex;
108+
109+
requestedAmount = get() + emitted;
110+
if (requestedAmount == 0L) {
111+
currentIndex = remainingAmountOrEndIndex;
112+
requestedAmount = addAndGet(emitted);
113+
if (requestedAmount == 0L) {
114+
// we're done emitting the number requested so return
115+
return;
116+
}
117+
emitted = 0L;
112118
}
113119
}
114120
}
@@ -117,18 +123,18 @@ void slowpath(long r) {
117123
*
118124
*/
119125
void fastpath() {
120-
final long end = this.end + 1L;
121-
final Subscriber<? super Integer> o = this.o;
122-
for (long i = index; i != end; i++) {
123-
if (o.isUnsubscribed()) {
126+
final long end = this.endOfRange + 1L;
127+
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
128+
for (long runningIndex = currentIndex; runningIndex != end; runningIndex++) {
129+
if (childSubscriber.isUnsubscribed()) {
124130
return;
125131
}
126-
o.onNext((int) i);
132+
childSubscriber.onNext((int) runningIndex);
127133
}
128-
if (!o.isUnsubscribed()) {
129-
o.onCompleted();
134+
if (!childSubscriber.isUnsubscribed()) {
135+
childSubscriber.onCompleted();
130136
}
131137
}
132138
}
133139

134-
}
140+
}

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,28 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
177177
/** An empty array to avoid creating new empty arrays in removeInner. */
178178
static final InnerSubscriber<?>[] EMPTY = new InnerSubscriber<?>[0];
179179

180+
int scalarEmission;
181+
182+
final int scalarLimit;
183+
180184
public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
181185
this.child = child;
182186
this.delayErrors = delayErrors;
183187
this.maxConcurrent = maxConcurrent;
184188
this.nl = NotificationLite.instance();
185189
this.innerGuard = new Object();
186190
this.innerSubscribers = EMPTY;
187-
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
191+
long r;
192+
int lim;
193+
if (maxConcurrent == Integer.MAX_VALUE) {
194+
r = Long.MAX_VALUE;
195+
lim = Integer.MAX_VALUE;
196+
} else {
197+
r = maxConcurrent;
198+
lim = Math.max(maxConcurrent >> 1, 1);
199+
}
200+
this.scalarLimit = lim;
201+
request(r);
188202
}
189203

190204
Queue<Throwable> getOrCreateErrorQueue() {
@@ -488,7 +502,10 @@ protected void emitScalar(T value, long r) {
488502
if (r != Long.MAX_VALUE) {
489503
producer.produced(1);
490504
}
491-
this.requestMore(1);
505+
if (++scalarEmission == scalarLimit) {
506+
scalarEmission = 0;
507+
this.requestMore(scalarLimit);
508+
}
492509
// check if some state changed while emitting
493510
synchronized (this) {
494511
skipFinal = true;

0 commit comments

Comments
 (0)