Skip to content

Commit 4d5515a

Browse files
authored
1.x: merge/flatMap to keep scalar/inner element relative order (#4209)
* 1.x: merge/flatMap to keep scalar/inner element relative order * Read the queue references once
1 parent 65c7070 commit 4d5515a

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,16 @@ void tryEmit(InnerSubscriber<T> subscriber, T value) {
352352
}
353353
}
354354
if (success) {
355-
emitScalar(subscriber, value, r);
355+
RxRingBuffer subscriberQueue = subscriber.queue;
356+
if (subscriberQueue == null || subscriberQueue.isEmpty()) {
357+
emitScalar(subscriber, value, r);
358+
} else {
359+
queueScalar(subscriber, value);
360+
emitLoop();
361+
}
356362
} else {
357363
queueScalar(subscriber, value);
364+
emit();
358365
}
359366
}
360367

@@ -383,7 +390,6 @@ protected void queueScalar(InnerSubscriber<T> subscriber, T value) {
383390
}
384391
return;
385392
}
386-
emit();
387393
}
388394

389395
protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) {
@@ -460,9 +466,16 @@ void tryEmit(T value) {
460466
}
461467
}
462468
if (success) {
463-
emitScalar(value, r);
469+
Queue<Object> mainQueue = queue;
470+
if (mainQueue == null || mainQueue.isEmpty()) {
471+
emitScalar(value, r);
472+
} else {
473+
queueScalar(value);
474+
emitLoop();
475+
}
464476
} else {
465477
queueScalar(value);
478+
emit();
466479
}
467480
}
468481

@@ -495,7 +508,6 @@ protected void queueScalar(T value) {
495508
onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value));
496509
return;
497510
}
498-
emit();
499511
}
500512

501513
protected void emitScalar(T value, long r) {

src/test/java/rx/internal/operators/OperatorMergeTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import rx.Observer;
3535
import rx.Scheduler.Worker;
3636
import rx.functions.*;
37+
import rx.internal.operators.OperatorMerge.*;
3738
import rx.internal.util.*;
3839
import rx.observers.TestSubscriber;
3940
import rx.schedulers.*;
@@ -1498,4 +1499,44 @@ public void flatMapMaxConcurrentJustRange() {
14981499
ts.assertNoErrors();
14991500
ts.assertCompleted();
15001501
}
1502+
1503+
@Test
1504+
public void noInnerReordering() {
1505+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
1506+
MergeSubscriber<Integer> ms = new MergeSubscriber<Integer>(ts, false, 128);
1507+
ms.producer = new MergeProducer<Integer>(ms);
1508+
ts.setProducer(ms.producer);
1509+
1510+
PublishSubject<Integer> ps = PublishSubject.create();
1511+
1512+
ms.onNext(ps);
1513+
1514+
ps.onNext(1);
1515+
1516+
BackpressureUtils.getAndAddRequest(ms.producer, 2);
1517+
1518+
ps.onNext(2);
1519+
1520+
ms.emit();
1521+
1522+
ts.assertValues(1, 2);
1523+
}
1524+
1525+
@Test
1526+
public void noOuterScalarReordering() {
1527+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
1528+
MergeSubscriber<Integer> ms = new MergeSubscriber<Integer>(ts, false, 128);
1529+
ms.producer = new MergeProducer<Integer>(ms);
1530+
ts.setProducer(ms.producer);
1531+
1532+
ms.onNext(Observable.just(1));
1533+
1534+
BackpressureUtils.getAndAddRequest(ms.producer, 2);
1535+
1536+
ms.onNext(Observable.just(2));
1537+
1538+
ms.emit();
1539+
1540+
ts.assertValues(1, 2);
1541+
}
15011542
}

0 commit comments

Comments
 (0)