From d57323126db1a3ebe080960092590500699c0d5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 25 May 2016 22:35:30 +0200 Subject: [PATCH] 1.x: observeOn + immediate scheduler to be a request rebatcher --- .../internal/operators/OperatorObserveOn.java | 5 +--- .../operators/OperatorObserveOnTest.java | 25 ++++++++++++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index f09a424020..636436f181 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -63,10 +63,7 @@ public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize @Override public Subscriber call(Subscriber child) { - if (scheduler instanceof ImmediateScheduler) { - // avoid overhead, execute directly - return child; - } else if (scheduler instanceof TrampolineScheduler) { + if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index 6e066c19ad..d9e27fe2b3 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; import org.mockito.InOrder; import rx.*; @@ -933,4 +933,27 @@ public void bufferSizesWork() { ts.assertNoErrors(); } } + + @Test + public void synchronousRebatching() { + final List requests = new ArrayList(); + + TestSubscriber ts = new TestSubscriber(); + + Observable.range(1, 50) + .doOnRequest(new Action1() { + @Override + public void call(Long r) { + requests.add(r); + } + }) + .observeOn(Schedulers.immediate(), 20) + .subscribe(ts); + + ts.assertValueCount(50); + ts.assertNoErrors(); + ts.assertCompleted(); + + Assert.assertEquals(Arrays.asList(20L, 15L, 15L, 15L), requests); + } }