Skip to content

Commit 7c7bb7a

Browse files
committed
1.x: overhead reduction for merge and flatMap
Changes to the scalar fast-path was inspired by the Project Reactor's flatMap which was in turn inspired by RxJava 2.x's implementation of flatMap.
1 parent ca7f862 commit 7c7bb7a

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,16 @@ public void request(long n) {
7575
*/
7676
void slowpath(long r) {
7777
long idx = index;
78+
long e = 0L;
7879
while (true) {
7980
/*
8081
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
8182
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
8283
*/
8384
long fs = end - idx + 1;
84-
long e = Math.min(fs, r);
8585
final boolean complete = fs <= r;
8686

87-
fs = e + idx;
87+
fs = Math.min(fs, r) + idx;
8888
final Subscriber<? super Integer> o = this.o;
8989

9090
for (long i = idx; i != fs; i++) {
@@ -102,13 +102,19 @@ void slowpath(long r) {
102102
return;
103103
}
104104

105+
e -= fs - idx;
106+
105107
idx = fs;
106-
index = fs;
107108

108-
r = addAndGet(-e);
109+
r = get() + e;
109110
if (r == 0L) {
110-
// we're done emitting the number requested so return
111-
return;
111+
index = fs;
112+
r = addAndGet(e);
113+
if (r == 0L) {
114+
// we're done emitting the number requested so return
115+
return;
116+
}
117+
e = 0L;
112118
}
113119
}
114120
}

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
175175
/** An empty array to avoid creating new empty arrays in removeInner. */
176176
static final InnerSubscriber<?>[] EMPTY = new InnerSubscriber<?>[0];
177177

178+
int scalarEmission;
179+
180+
final int scalarLimit;
181+
178182
public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
179183
this.child = child;
180184
this.delayErrors = delayErrors;
@@ -183,6 +187,7 @@ public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int max
183187
this.innerGuard = new Object();
184188
this.innerSubscribers = EMPTY;
185189
long r = Math.min(maxConcurrent, RxRingBuffer.SIZE);
190+
scalarLimit = Math.max((int)r >> 1, 1);
186191
request(r);
187192
}
188193

@@ -483,7 +488,11 @@ protected void emitScalar(T value, long r) {
483488
if (r != Long.MAX_VALUE) {
484489
producer.produced(1);
485490
}
486-
this.requestMore(1);
491+
492+
if (++scalarEmission == scalarLimit) {
493+
scalarEmission = 0;
494+
this.requestMore(scalarLimit);
495+
}
487496
// check if some state changed while emitting
488497
synchronized (this) {
489498
skipFinal = true;

0 commit comments

Comments
 (0)