diff --git a/src/main/java/rx/internal/operators/OperatorDelay.java b/src/main/java/rx/internal/operators/OperatorDelay.java index 48b8454dc8..00ab5d1b49 100644 --- a/src/main/java/rx/internal/operators/OperatorDelay.java +++ b/src/main/java/rx/internal/operators/OperatorDelay.java @@ -49,22 +49,36 @@ public Subscriber call(final Subscriber child) { final Worker worker = scheduler.createWorker(); child.add(worker); return new Subscriber(child) { - + // indicates an error cut ahead + // accessed from the worker thread only + boolean done; @Override public void onCompleted() { worker.schedule(new Action0() { @Override public void call() { - child.onCompleted(); + if (!done) { + done = true; + child.onCompleted(); + } } }, delay, unit); } @Override - public void onError(Throwable e) { - child.onError(e); + public void onError(final Throwable e) { + worker.schedule(new Action0() { + @Override + public void call() { + if (!done) { + done = true; + child.onError(e); + worker.unsubscribe(); + } + } + }); } @Override @@ -73,7 +87,9 @@ public void onNext(final T t) { @Override public void call() { - child.onNext(t); + if (!done) { + child.onNext(t); + } } }, delay, unit); diff --git a/src/test/java/rx/internal/operators/OperatorDelayTest.java b/src/test/java/rx/internal/operators/OperatorDelayTest.java index 9f80f0dc73..e4db021eaf 100644 --- a/src/test/java/rx/internal/operators/OperatorDelayTest.java +++ b/src/test/java/rx/internal/operators/OperatorDelayTest.java @@ -798,4 +798,27 @@ public Integer call(Integer t) { ts.assertNoErrors(); assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); } + + @Test + public void testErrorRunsBeforeOnNext() { + TestScheduler test = Schedulers.test(); + + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + ps.delay(1, TimeUnit.SECONDS, test).subscribe(ts); + + ps.onNext(1); + + test.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + ps.onError(new TestException()); + + test.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } }