From e55dcd46ecfe301e78427d9b0eccd25752c1f364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sun, 4 Mar 2018 14:05:06 +0100 Subject: [PATCH] 2.x: Upgrade the algo of Observable.timeout(time|selector) operators --- .../disposables/ObserverFullArbiter.java | 180 -------- .../observers/FullArbiterObserver.java | 56 --- .../observable/ObservableTimeout.java | 404 ++++++++++-------- .../observable/ObservableTimeoutTimed.java | 297 ++++++------- .../disposables/ObserverFullArbiterTest.java | 128 ------ .../observers/FullArbiterObserverTest.java | 48 --- .../FlowableTimeoutWithSelectorTest.java | 170 ++++++-- .../observable/ObservableTimeoutTests.java | 91 +++- .../ObservableTimeoutWithSelectorTest.java | 314 +++++++++++++- 9 files changed, 911 insertions(+), 777 deletions(-) delete mode 100644 src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java delete mode 100644 src/main/java/io/reactivex/internal/observers/FullArbiterObserver.java delete mode 100644 src/test/java/io/reactivex/internal/disposables/ObserverFullArbiterTest.java delete mode 100644 src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java diff --git a/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java b/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java deleted file mode 100644 index dd85b6b473..0000000000 --- a/src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.disposables; - -import java.util.concurrent.atomic.AtomicInteger; - -import io.reactivex.Observer; -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.queue.SpscLinkedArrayQueue; -import io.reactivex.internal.util.NotificationLite; -import io.reactivex.plugins.RxJavaPlugins; - -/** - * Performs full arbitration of Subscriber events with strict drain (i.e., old emissions of another - * subscriber are dropped). - * - * @param the value type - */ -public final class ObserverFullArbiter extends FullArbiterPad1 implements Disposable { - final Observer actual; - final SpscLinkedArrayQueue queue; - - volatile Disposable s; - - Disposable resource; - - volatile boolean cancelled; - - public ObserverFullArbiter(Observer actual, Disposable resource, int capacity) { - this.actual = actual; - this.resource = resource; - this.queue = new SpscLinkedArrayQueue(capacity); - this.s = EmptyDisposable.INSTANCE; - } - - @Override - public void dispose() { - if (!cancelled) { - cancelled = true; - disposeResource(); - } - } - - @Override - public boolean isDisposed() { - Disposable d = resource; - return d != null ? d.isDisposed() : cancelled; - } - - void disposeResource() { - Disposable d = resource; - resource = null; - if (d != null) { - d.dispose(); - } - } - - public boolean setDisposable(Disposable s) { - if (cancelled) { - return false; - } - - queue.offer(this.s, NotificationLite.disposable(s)); - drain(); - return true; - } - - public boolean onNext(T value, Disposable s) { - if (cancelled) { - return false; - } - - queue.offer(s, NotificationLite.next(value)); - drain(); - return true; - } - - public void onError(Throwable value, Disposable s) { - if (cancelled) { - RxJavaPlugins.onError(value); - return; - } - queue.offer(s, NotificationLite.error(value)); - drain(); - } - - public void onComplete(Disposable s) { - queue.offer(s, NotificationLite.complete()); - drain(); - } - - void drain() { - if (wip.getAndIncrement() != 0) { - return; - } - - int missed = 1; - - final SpscLinkedArrayQueue q = queue; - final Observer a = actual; - - for (;;) { - - for (;;) { - Object o = q.poll(); - if (o == null) { - break; - } - - Object v = q.poll(); - - if (o == s) { - if (NotificationLite.isDisposable(v)) { - Disposable next = NotificationLite.getDisposable(v); - s.dispose(); - if (!cancelled) { - s = next; - } else { - next.dispose(); - } - } else if (NotificationLite.isError(v)) { - q.clear(); - disposeResource(); - - Throwable ex = NotificationLite.getError(v); - if (!cancelled) { - cancelled = true; - a.onError(ex); - } else { - RxJavaPlugins.onError(ex); - } - } else if (NotificationLite.isComplete(v)) { - q.clear(); - disposeResource(); - - if (!cancelled) { - cancelled = true; - a.onComplete(); - } - } else { - a.onNext(NotificationLite.getValue(v)); - } - } - } - - missed = wip.addAndGet(-missed); - if (missed == 0) { - break; - } - } - } -} - -/** Pads the object header away. */ -class FullArbiterPad0 { - volatile long p1a, p2a, p3a, p4a, p5a, p6a, p7a; - volatile long p8a, p9a, p10a, p11a, p12a, p13a, p14a, p15a; -} - -/** The work-in-progress counter. */ -class FullArbiterWip extends FullArbiterPad0 { - final AtomicInteger wip = new AtomicInteger(); -} - -/** Pads the wip counter away. */ -class FullArbiterPad1 extends FullArbiterWip { - volatile long p1b, p2b, p3b, p4b, p5b, p6b, p7b; - volatile long p8b, p9b, p10b, p11b, p12b, p13b, p14b, p15b; -} diff --git a/src/main/java/io/reactivex/internal/observers/FullArbiterObserver.java b/src/main/java/io/reactivex/internal/observers/FullArbiterObserver.java deleted file mode 100644 index 79b1bd723d..0000000000 --- a/src/main/java/io/reactivex/internal/observers/FullArbiterObserver.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.observers; - -import io.reactivex.Observer; -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.*; - -/** - * Subscriber that communicates with a FullArbiter. - * - * @param the value type - */ -public final class FullArbiterObserver implements Observer { - final ObserverFullArbiter arbiter; - - Disposable s; - - public FullArbiterObserver(ObserverFullArbiter arbiter) { - this.arbiter = arbiter; - } - - @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - arbiter.setDisposable(s); - } - } - - @Override - public void onNext(T t) { - arbiter.onNext(t, s); - } - - @Override - public void onError(Throwable t) { - arbiter.onError(t, s); - } - - @Override - public void onComplete() { - arbiter.onComplete(s); - } -} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java index e3b76736e6..5cfcb620ba 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java @@ -13,17 +13,16 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.*; -import io.reactivex.internal.observers.FullArbiterObserver; -import io.reactivex.observers.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.operators.observable.ObservableTimeoutTimed.TimeoutSupport; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableTimeout extends AbstractObservableWithUpstream { @@ -32,10 +31,10 @@ public final class ObservableTimeout extends AbstractObservableWithUpst final ObservableSource other; public ObservableTimeout( - ObservableSource source, + Observable source, ObservableSource firstTimeoutIndicator, Function> itemTimeoutIndicator, - ObservableSource other) { + ObservableSource other) { super(source); this.firstTimeoutIndicator = firstTimeoutIndicator; this.itemTimeoutIndicator = itemTimeoutIndicator; @@ -43,306 +42,337 @@ public ObservableTimeout( } @Override - public void subscribeActual(Observer t) { + protected void subscribeActual(Observer s) { if (other == null) { - source.subscribe(new TimeoutObserver( - new SerializedObserver(t), - firstTimeoutIndicator, itemTimeoutIndicator)); + TimeoutObserver parent = new TimeoutObserver(s, itemTimeoutIndicator); + s.onSubscribe(parent); + parent.startFirstTimeout(firstTimeoutIndicator); + source.subscribe(parent); } else { - source.subscribe(new TimeoutOtherObserver( - t, firstTimeoutIndicator, itemTimeoutIndicator, other)); + TimeoutFallbackObserver parent = new TimeoutFallbackObserver(s, itemTimeoutIndicator, other); + s.onSubscribe(parent); + parent.startFirstTimeout(firstTimeoutIndicator); + source.subscribe(parent); } } - static final class TimeoutObserver - extends AtomicReference - implements Observer, Disposable, OnTimeout { + interface TimeoutSelectorSupport extends TimeoutSupport { + void onTimeoutError(long idx, Throwable ex); + } + + static final class TimeoutObserver extends AtomicLong + implements Observer, Disposable, TimeoutSelectorSupport { + + private static final long serialVersionUID = 3764492702657003550L; - private static final long serialVersionUID = 2672739326310051084L; final Observer actual; - final ObservableSource firstTimeoutIndicator; - final Function> itemTimeoutIndicator; - Disposable s; + final Function> itemTimeoutIndicator; - volatile long index; + final SequentialDisposable task; - TimeoutObserver(Observer actual, - ObservableSource firstTimeoutIndicator, - Function> itemTimeoutIndicator) { + final AtomicReference upstream; + + TimeoutObserver(Observer actual, Function> itemTimeoutIndicator) { this.actual = actual; - this.firstTimeoutIndicator = firstTimeoutIndicator; this.itemTimeoutIndicator = itemTimeoutIndicator; + this.task = new SequentialDisposable(); + this.upstream = new AtomicReference(); } @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - - Observer a = actual; - - ObservableSource p = firstTimeoutIndicator; - - if (p != null) { - TimeoutInnerObserver tis = new TimeoutInnerObserver(this, 0); - - if (compareAndSet(null, tis)) { - a.onSubscribe(this); - p.subscribe(tis); - } - } else { - a.onSubscribe(this); - } - } + DisposableHelper.setOnce(upstream, s); } @Override public void onNext(T t) { - long idx = index + 1; - index = idx; - - actual.onNext(t); + long idx = get(); + if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) { + return; + } - Disposable d = get(); + Disposable d = task.get(); if (d != null) { d.dispose(); } - ObservableSource p; + actual.onNext(t); + + ObservableSource itemTimeoutObservableSource; try { - p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The ObservableSource returned is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - dispose(); - actual.onError(e); + itemTimeoutObservableSource = ObjectHelper.requireNonNull( + itemTimeoutIndicator.apply(t), + "The itemTimeoutIndicator returned a null ObservableSource."); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.get().dispose(); + getAndSet(Long.MAX_VALUE); + actual.onError(ex); return; } - TimeoutInnerObserver tis = new TimeoutInnerObserver(this, idx); + TimeoutConsumer consumer = new TimeoutConsumer(idx + 1, this); + if (task.replace(consumer)) { + itemTimeoutObservableSource.subscribe(consumer); + } + } - if (compareAndSet(d, tis)) { - p.subscribe(tis); + void startFirstTimeout(ObservableSource firstTimeoutIndicator) { + if (firstTimeoutIndicator != null) { + TimeoutConsumer consumer = new TimeoutConsumer(0L, this); + if (task.replace(consumer)) { + firstTimeoutIndicator.subscribe(consumer); + } } } @Override public void onError(Throwable t) { - DisposableHelper.dispose(this); - actual.onError(t); + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } } @Override public void onComplete() { - DisposableHelper.dispose(this); - actual.onComplete(); - } + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); - @Override - public void dispose() { - if (DisposableHelper.dispose(this)) { - s.dispose(); + actual.onComplete(); } } @Override - public boolean isDisposed() { - return s.isDisposed(); - } + public void onTimeout(long idx) { + if (compareAndSet(idx, Long.MAX_VALUE)) { + DisposableHelper.dispose(upstream); - @Override - public void timeout(long idx) { - if (idx == index) { - dispose(); actual.onError(new TimeoutException()); } } @Override - public void innerError(Throwable e) { - s.dispose(); - actual.onError(e); - } - } - - interface OnTimeout { - void timeout(long index); - - void innerError(Throwable e); - } - - static final class TimeoutInnerObserver extends DisposableObserver { - final OnTimeout parent; - final long index; + public void onTimeoutError(long idx, Throwable ex) { + if (compareAndSet(idx, Long.MAX_VALUE)) { + DisposableHelper.dispose(upstream); - boolean done; - - TimeoutInnerObserver(OnTimeout parent, final long index) { - this.parent = parent; - this.index = index; - } - - @Override - public void onNext(Object t) { - if (done) { - return; + actual.onError(ex); + } else { + RxJavaPlugins.onError(ex); } - done = true; - dispose(); - parent.timeout(index); } @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; - parent.innerError(t); + public void dispose() { + DisposableHelper.dispose(upstream); + task.dispose(); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - parent.timeout(index); + public boolean isDisposed() { + return DisposableHelper.isDisposed(upstream.get()); } } - static final class TimeoutOtherObserver + static final class TimeoutFallbackObserver extends AtomicReference - implements Observer, Disposable, OnTimeout { + implements Observer, Disposable, TimeoutSelectorSupport { + + private static final long serialVersionUID = -7508389464265974549L; - private static final long serialVersionUID = -1957813281749686898L; final Observer actual; - final ObservableSource firstTimeoutIndicator; - final Function> itemTimeoutIndicator; - final ObservableSource other; - final ObserverFullArbiter arbiter; - Disposable s; + final Function> itemTimeoutIndicator; + + final SequentialDisposable task; - boolean done; + final AtomicLong index; - volatile long index; + final AtomicReference upstream; - TimeoutOtherObserver(Observer actual, - ObservableSource firstTimeoutIndicator, - Function> itemTimeoutIndicator, ObservableSource other) { + ObservableSource fallback; + + TimeoutFallbackObserver(Observer actual, + Function> itemTimeoutIndicator, + ObservableSource fallback) { this.actual = actual; - this.firstTimeoutIndicator = firstTimeoutIndicator; this.itemTimeoutIndicator = itemTimeoutIndicator; - this.other = other; - this.arbiter = new ObserverFullArbiter(actual, this, 8); + this.task = new SequentialDisposable(); + this.fallback = fallback; + this.index = new AtomicLong(); + this.upstream = new AtomicReference(); } @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - - arbiter.setDisposable(s); - - Observer a = actual; - - ObservableSource p = firstTimeoutIndicator; - - if (p != null) { - TimeoutInnerObserver tis = new TimeoutInnerObserver(this, 0); - - if (compareAndSet(null, tis)) { - a.onSubscribe(arbiter); - p.subscribe(tis); - } - } else { - a.onSubscribe(arbiter); - } - } + DisposableHelper.setOnce(upstream, s); } @Override public void onNext(T t) { - if (done) { - return; - } - long idx = index + 1; - index = idx; - - if (!arbiter.onNext(t, s)) { + long idx = index.get(); + if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) { return; } - Disposable d = get(); + Disposable d = task.get(); if (d != null) { d.dispose(); } - ObservableSource p; + actual.onNext(t); + + ObservableSource itemTimeoutObservableSource; try { - p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The ObservableSource returned is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - actual.onError(e); + itemTimeoutObservableSource = ObjectHelper.requireNonNull( + itemTimeoutIndicator.apply(t), + "The itemTimeoutIndicator returned a null ObservableSource."); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.get().dispose(); + index.getAndSet(Long.MAX_VALUE); + actual.onError(ex); return; } - TimeoutInnerObserver tis = new TimeoutInnerObserver(this, idx); + TimeoutConsumer consumer = new TimeoutConsumer(idx + 1, this); + if (task.replace(consumer)) { + itemTimeoutObservableSource.subscribe(consumer); + } + } - if (compareAndSet(d, tis)) { - p.subscribe(tis); + void startFirstTimeout(ObservableSource firstTimeoutIndicator) { + if (firstTimeoutIndicator != null) { + TimeoutConsumer consumer = new TimeoutConsumer(0L, this); + if (task.replace(consumer)) { + firstTimeoutIndicator.subscribe(consumer); + } } } @Override public void onError(Throwable t) { - if (done) { + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + + task.dispose(); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - dispose(); - arbiter.onError(t, s); } @Override public void onComplete() { - if (done) { - return; + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onComplete(); + + task.dispose(); } - done = true; - dispose(); - arbiter.onComplete(s); } @Override - public void dispose() { - if (DisposableHelper.dispose(this)) { - s.dispose(); + public void onTimeout(long idx) { + if (index.compareAndSet(idx, Long.MAX_VALUE)) { + DisposableHelper.dispose(upstream); + + ObservableSource f = fallback; + fallback = null; + + f.subscribe(new ObservableTimeoutTimed.FallbackObserver(actual, this)); } } + @Override + public void onTimeoutError(long idx, Throwable ex) { + if (index.compareAndSet(idx, Long.MAX_VALUE)) { + DisposableHelper.dispose(this); + + actual.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + } + + @Override + public void dispose() { + DisposableHelper.dispose(upstream); + DisposableHelper.dispose(this); + task.dispose(); + } + @Override public boolean isDisposed() { - return s.isDisposed(); + return DisposableHelper.isDisposed(get()); + } + } + + static final class TimeoutConsumer extends AtomicReference + implements Observer, Disposable { + + private static final long serialVersionUID = 8708641127342403073L; + + final TimeoutSelectorSupport parent; + + final long idx; + + TimeoutConsumer(long idx, TimeoutSelectorSupport parent) { + this.idx = idx; + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable s) { + DisposableHelper.setOnce(this, s); + } + + @Override + public void onNext(Object t) { + Disposable upstream = get(); + if (upstream != DisposableHelper.DISPOSED) { + upstream.dispose(); + lazySet(DisposableHelper.DISPOSED); + parent.onTimeout(idx); + } } @Override - public void timeout(long idx) { - if (idx == index) { - dispose(); - other.subscribe(new FullArbiterObserver(arbiter)); + public void onError(Throwable t) { + if (get() != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + parent.onTimeoutError(idx, t); + } else { + RxJavaPlugins.onError(t); } } @Override - public void innerError(Throwable e) { - s.dispose(); - actual.onError(e); + public void onComplete() { + if (get() != DisposableHelper.DISPOSED) { + lazySet(DisposableHelper.DISPOSED); + parent.onTimeout(idx); + } + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(this.get()); } } + } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java index 0c152df34d..fdca2d3882 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java @@ -14,14 +14,11 @@ package io.reactivex.internal.operators.observable; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import io.reactivex.*; -import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; -import io.reactivex.internal.observers.FullArbiterObserver; -import io.reactivex.observers.SerializedObserver; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableTimeoutTimed extends AbstractObservableWithUpstream { @@ -30,10 +27,7 @@ public final class ObservableTimeoutTimed extends AbstractObservableWithUpstr final Scheduler scheduler; final ObservableSource other; - - static final Disposable NEW_TIMER = new EmptyDisposable(); - - public ObservableTimeoutTimed(ObservableSource source, + public ObservableTimeoutTimed(Observable source, long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource other) { super(source); this.timeout = timeout; @@ -43,265 +37,276 @@ public ObservableTimeoutTimed(ObservableSource source, } @Override - public void subscribeActual(Observer t) { + protected void subscribeActual(Observer s) { if (other == null) { - source.subscribe(new TimeoutTimedObserver( - new SerializedObserver(t), // because errors can race - timeout, unit, scheduler.createWorker())); + TimeoutObserver parent = new TimeoutObserver(s, timeout, unit, scheduler.createWorker()); + s.onSubscribe(parent); + parent.startTimeout(0L); + source.subscribe(parent); } else { - source.subscribe(new TimeoutTimedOtherObserver( - t, // the FullArbiter serializes - timeout, unit, scheduler.createWorker(), other)); + TimeoutFallbackObserver parent = new TimeoutFallbackObserver(s, timeout, unit, scheduler.createWorker(), other); + s.onSubscribe(parent); + parent.startTimeout(0L); + source.subscribe(parent); } } - static final class TimeoutTimedOtherObserver - extends AtomicReference implements Observer, Disposable { - private static final long serialVersionUID = -4619702551964128179L; + static final class TimeoutObserver extends AtomicLong + implements Observer, Disposable, TimeoutSupport { + + private static final long serialVersionUID = 3764492702657003550L; final Observer actual; + final long timeout; - final TimeUnit unit; - final Scheduler.Worker worker; - final ObservableSource other; - Disposable s; + final TimeUnit unit; - final ObserverFullArbiter arbiter; + final Scheduler.Worker worker; - volatile long index; + final SequentialDisposable task; - volatile boolean done; + final AtomicReference upstream; - TimeoutTimedOtherObserver(Observer actual, long timeout, TimeUnit unit, Worker worker, - ObservableSource other) { + TimeoutObserver(Observer actual, long timeout, TimeUnit unit, Scheduler.Worker worker) { this.actual = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; - this.other = other; - this.arbiter = new ObserverFullArbiter(actual, this, 8); + this.task = new SequentialDisposable(); + this.upstream = new AtomicReference(); } @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - if (arbiter.setDisposable(s)) { - actual.onSubscribe(arbiter); - - scheduleTimeout(0L); - } - } - + DisposableHelper.setOnce(upstream, s); } @Override public void onNext(T t) { - if (done) { + long idx = get(); + if (idx == Long.MAX_VALUE || !compareAndSet(idx, idx + 1)) { return; } - long idx = index + 1; - index = idx; - if (arbiter.onNext(t, s)) { - scheduleTimeout(idx); - } - } - - void scheduleTimeout(final long idx) { - Disposable d = get(); - if (d != null) { - d.dispose(); - } + task.get().dispose(); - if (compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new SubscribeNext(idx), timeout, unit); + actual.onNext(t); - DisposableHelper.replace(this, d); - } + startTimeout(idx + 1); } - void subscribeNext() { - other.subscribe(new FullArbiterObserver(arbiter)); + void startTimeout(long nextIndex) { + task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { - if (done) { + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + + worker.dispose(); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - arbiter.onError(t, s); - worker.dispose(); } @Override public void onComplete() { - if (done) { - return; + if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onComplete(); + + worker.dispose(); + } + } + + @Override + public void onTimeout(long idx) { + if (compareAndSet(idx, Long.MAX_VALUE)) { + DisposableHelper.dispose(upstream); + + actual.onError(new TimeoutException()); + + worker.dispose(); } - done = true; - arbiter.onComplete(s); - worker.dispose(); } @Override public void dispose() { - s.dispose(); + DisposableHelper.dispose(upstream); worker.dispose(); } @Override public boolean isDisposed() { - return worker.isDisposed(); + return DisposableHelper.isDisposed(upstream.get()); } + } - final class SubscribeNext implements Runnable { - private final long idx; - SubscribeNext(long idx) { - this.idx = idx; - } + static final class TimeoutTask implements Runnable { - @Override - public void run() { - if (idx == index) { - done = true; - s.dispose(); - DisposableHelper.dispose(TimeoutTimedOtherObserver.this); + final TimeoutSupport parent; - subscribeNext(); + final long idx; - worker.dispose(); - } - } + TimeoutTask(long idx, TimeoutSupport parent) { + this.idx = idx; + this.parent = parent; + } + + @Override + public void run() { + parent.onTimeout(idx); } } - static final class TimeoutTimedObserver - extends AtomicReference - implements Observer, Disposable { - private static final long serialVersionUID = -8387234228317808253L; + static final class TimeoutFallbackObserver extends AtomicReference + implements Observer, Disposable, TimeoutSupport { + + private static final long serialVersionUID = 3764492702657003550L; final Observer actual; + final long timeout; + final TimeUnit unit; + final Scheduler.Worker worker; - Disposable s; + final SequentialDisposable task; + + final AtomicLong index; - volatile long index; + final AtomicReference upstream; - volatile boolean done; + ObservableSource fallback; - TimeoutTimedObserver(Observer actual, long timeout, TimeUnit unit, Worker worker) { + TimeoutFallbackObserver(Observer actual, long timeout, TimeUnit unit, + Scheduler.Worker worker, ObservableSource fallback) { this.actual = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; + this.fallback = fallback; + this.task = new SequentialDisposable(); + this.index = new AtomicLong(); + this.upstream = new AtomicReference(); } @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); - scheduleTimeout(0L); - } - + DisposableHelper.setOnce(upstream, s); } @Override public void onNext(T t) { - if (done) { + long idx = index.get(); + if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) { return; } - long idx = index + 1; - index = idx; + + task.get().dispose(); actual.onNext(t); - scheduleTimeout(idx); + startTimeout(idx + 1); } - void scheduleTimeout(final long idx) { - Disposable d = get(); - if (d != null) { - d.dispose(); - } - - if (compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new TimeoutTask(idx), timeout, unit); - - DisposableHelper.replace(this, d); - } + void startTimeout(long nextIndex) { + task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { - if (done) { + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onError(t); + + worker.dispose(); + } else { RxJavaPlugins.onError(t); - return; } - done = true; - - actual.onError(t); - dispose(); } @Override public void onComplete() { - if (done) { - return; + if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { + task.dispose(); + + actual.onComplete(); + + worker.dispose(); } - done = true; + } - actual.onComplete(); - dispose(); + @Override + public void onTimeout(long idx) { + if (index.compareAndSet(idx, Long.MAX_VALUE)) { + DisposableHelper.dispose(upstream); + + ObservableSource f = fallback; + fallback = null; + + f.subscribe(new FallbackObserver(actual, this)); + + worker.dispose(); + } } @Override public void dispose() { - s.dispose(); + DisposableHelper.dispose(upstream); + DisposableHelper.dispose(this); worker.dispose(); } @Override public boolean isDisposed() { - return worker.isDisposed(); + return DisposableHelper.isDisposed(get()); } + } - final class TimeoutTask implements Runnable { - private final long idx; + static final class FallbackObserver implements Observer { - TimeoutTask(long idx) { - this.idx = idx; - } + final Observer actual; - @Override - public void run() { - if (idx == index) { - done = true; - s.dispose(); - DisposableHelper.dispose(TimeoutTimedObserver.this); + final AtomicReference arbiter; - actual.onError(new TimeoutException()); + FallbackObserver(Observer actual, AtomicReference arbiter) { + this.actual = actual; + this.arbiter = arbiter; + } - worker.dispose(); - } - } + @Override + public void onSubscribe(Disposable s) { + DisposableHelper.replace(arbiter, s); } - } - static final class EmptyDisposable implements Disposable { @Override - public void dispose() { } + public void onNext(T t) { + actual.onNext(t); + } @Override - public boolean isDisposed() { - return true; + public void onError(Throwable t) { + actual.onError(t); } + + @Override + public void onComplete() { + actual.onComplete(); + } + } + + interface TimeoutSupport { + + void onTimeout(long idx); + } } diff --git a/src/test/java/io/reactivex/internal/disposables/ObserverFullArbiterTest.java b/src/test/java/io/reactivex/internal/disposables/ObserverFullArbiterTest.java deleted file mode 100644 index 48d3507d7c..0000000000 --- a/src/test/java/io/reactivex/internal/disposables/ObserverFullArbiterTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.disposables; - -import static org.junit.Assert.*; - -import java.util.List; - -import org.junit.Test; - -import io.reactivex.TestHelper; -import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; -import io.reactivex.internal.disposables.ObserverFullArbiter; -import io.reactivex.internal.util.NotificationLite; -import io.reactivex.observers.TestObserver; -import io.reactivex.plugins.RxJavaPlugins; - -public class ObserverFullArbiterTest { - - @Test - public void setSubscriptionAfterCancel() { - ObserverFullArbiter fa = new ObserverFullArbiter(new TestObserver(), null, 128); - - fa.dispose(); - - Disposable bs = Disposables.empty(); - - assertFalse(fa.setDisposable(bs)); - - assertFalse(fa.setDisposable(null)); - } - - @Test - public void cancelAfterPoll() { - ObserverFullArbiter fa = new ObserverFullArbiter(new TestObserver(), null, 128); - - Disposable bs = Disposables.empty(); - - fa.queue.offer(fa.s, NotificationLite.disposable(bs)); - - assertFalse(fa.isDisposed()); - - fa.dispose(); - - assertTrue(fa.isDisposed()); - - fa.drain(); - - assertTrue(bs.isDisposed()); - } - - @Test - public void errorAfterCancel() { - ObserverFullArbiter fa = new ObserverFullArbiter(new TestObserver(), null, 128); - - Disposable bs = Disposables.empty(); - - fa.dispose(); - - List errors = TestHelper.trackPluginErrors(); - try { - fa.onError(new TestException(), bs); - - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void cancelAfterError() { - ObserverFullArbiter fa = new ObserverFullArbiter(new TestObserver(), null, 128); - - List errors = TestHelper.trackPluginErrors(); - try { - fa.queue.offer(fa.s, NotificationLite.error(new TestException())); - - fa.dispose(); - - fa.drain(); - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void offerDifferentSubscription() { - TestObserver ts = new TestObserver(); - - ObserverFullArbiter fa = new ObserverFullArbiter(ts, null, 128); - - Disposable bs = Disposables.empty(); - - fa.queue.offer(bs, NotificationLite.next(1)); - - fa.drain(); - - ts.assertNoValues(); - } - - @Test - public void dontEnterDrain() { - TestObserver ts = new TestObserver(); - - ObserverFullArbiter fa = new ObserverFullArbiter(ts, null, 128); - - fa.queue.offer(fa.s, NotificationLite.next(1)); - - fa.wip.getAndIncrement(); - - fa.drain(); - - ts.assertNoValues(); - } -} diff --git a/src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java b/src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java deleted file mode 100644 index c2b4c64e76..0000000000 --- a/src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.observers; - -import org.junit.Test; - -import io.reactivex.TestHelper; -import io.reactivex.disposables.Disposables; -import io.reactivex.exceptions.TestException; -import io.reactivex.internal.disposables.ObserverFullArbiter; -import io.reactivex.observers.TestObserver; - -public class FullArbiterObserverTest { - - @Test - public void doubleOnSubscribe() { - TestObserver to = new TestObserver(); - ObserverFullArbiter fa = new ObserverFullArbiter(to, null, 16); - FullArbiterObserver fo = new FullArbiterObserver(fa); - to.onSubscribe(fa); - - TestHelper.doubleOnSubscribe(fo); - } - - @Test - public void error() { - TestObserver to = new TestObserver(); - ObserverFullArbiter fa = new ObserverFullArbiter(to, null, 16); - FullArbiterObserver fo = new FullArbiterObserver(fa); - to.onSubscribe(fa); - - fo.onSubscribe(Disposables.empty()); - fo.onError(new TestException()); - - to.assertFailure(TestException.class); - } -} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java index bad1090d85..3641de632c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java @@ -19,7 +19,7 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.junit.Test; import org.mockito.InOrder; @@ -29,12 +29,12 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.processors.PublishProcessor; +import io.reactivex.processors.*; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -414,13 +414,13 @@ public void error() { @Test public void emptyInner() { - PublishProcessor ps = PublishProcessor.create(); + PublishProcessor pp = PublishProcessor.create(); - TestSubscriber to = ps + TestSubscriber to = pp .timeout(Functions.justFunction(Flowable.empty())) .test(); - ps.onNext(1); + pp.onNext(1); to.assertFailure(TimeoutException.class, 1); } @@ -429,9 +429,9 @@ public void emptyInner() { public void badInnerSource() { List errors = TestHelper.trackPluginErrors(); try { - PublishProcessor ps = PublishProcessor.create(); + PublishProcessor pp = PublishProcessor.create(); - TestSubscriber to = ps + TestSubscriber to = pp .timeout(Functions.justFunction(new Flowable() { @Override protected void subscribeActual(Subscriber observer) { @@ -444,7 +444,7 @@ protected void subscribeActual(Subscriber observer) { })) .test(); - ps.onNext(1); + pp.onNext(1); to.assertFailureAndMessage(TestException.class, "First", 1); @@ -458,9 +458,9 @@ protected void subscribeActual(Subscriber observer) { public void badInnerSourceOther() { List errors = TestHelper.trackPluginErrors(); try { - PublishProcessor ps = PublishProcessor.create(); + PublishProcessor pp = PublishProcessor.create(); - TestSubscriber to = ps + TestSubscriber to = pp .timeout(Functions.justFunction(new Flowable() { @Override protected void subscribeActual(Subscriber observer) { @@ -473,7 +473,7 @@ protected void subscribeActual(Subscriber observer) { }), Flowable.just(2)) .test(); - ps.onNext(1); + pp.onNext(1); to.assertFailureAndMessage(TestException.class, "First", 1); @@ -513,36 +513,36 @@ protected void subscribeActual(Subscriber observer) { @Test public void selectorTake() { - PublishProcessor ps = PublishProcessor.create(); + PublishProcessor pp = PublishProcessor.create(); - TestSubscriber to = ps + TestSubscriber to = pp .timeout(Functions.justFunction(Flowable.never())) .take(1) .test(); - assertTrue(ps.hasSubscribers()); + assertTrue(pp.hasSubscribers()); - ps.onNext(1); + pp.onNext(1); - assertFalse(ps.hasSubscribers()); + assertFalse(pp.hasSubscribers()); to.assertResult(1); } @Test public void selectorFallbackTake() { - PublishProcessor ps = PublishProcessor.create(); + PublishProcessor pp = PublishProcessor.create(); - TestSubscriber to = ps + TestSubscriber to = pp .timeout(Functions.justFunction(Flowable.never()), Flowable.just(2)) .take(1) .test(); - assertTrue(ps.hasSubscribers()); + assertTrue(pp.hasSubscribers()); - ps.onNext(1); + pp.onNext(1); - assertFalse(ps.hasSubscribers()); + assertFalse(pp.hasSubscribers()); to.assertResult(1); } @@ -712,7 +712,7 @@ public void run() { } @Test - public void onECompleteOnTimeoutRace() { + public void onCompleteOnTimeoutRace() { for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { List errors = TestHelper.trackPluginErrors(); try { @@ -763,4 +763,126 @@ public void run() { } } } + + @Test + public void onCompleteOnTimeoutRaceFallback() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final Subscriber[] sub = { null, null }; + + final Flowable pp2 = new Flowable() { + + int count; + + @Override + protected void subscribeActual( + Subscriber s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(new BooleanSubscription()); + sub[count++] = s; + } + }; + + TestSubscriber ts = pp.timeout(Functions.justFunction(pp2), Flowable.never()).test(); + + pp.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void disposedUpfront() { + PublishProcessor pp = PublishProcessor.create(); + final AtomicInteger counter = new AtomicInteger(); + + Flowable timeoutAndFallback = Flowable.never().doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + counter.incrementAndGet(); + } + }); + + pp + .timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback)) + .test(1, true) + .assertEmpty(); + + assertEquals(0, counter.get()); + } + + @Test + public void disposedUpfrontFallback() { + PublishProcessor pp = PublishProcessor.create(); + final AtomicInteger counter = new AtomicInteger(); + + Flowable timeoutAndFallback = Flowable.never().doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + counter.incrementAndGet(); + } + }); + + pp + .timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback), timeoutAndFallback) + .test(1, true) + .assertEmpty(); + + assertEquals(0, counter.get()); + } + + @Test + public void timeoutConsumerDoubleOnSubscribe() { + List errors = TestHelper.trackPluginErrors(); + try { + BehaviorProcessor.createDefault(1) + .timeout(Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + BooleanSubscription bs1 = new BooleanSubscription(); + s.onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + s.onSubscribe(bs2); + + assertFalse(bs1.isCancelled()); + assertTrue(bs2.isCancelled()); + + s.onComplete(); + } + })) + .test() + .assertFailure(TimeoutException.class, 1); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java index b32903164d..3c244964c9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java @@ -424,12 +424,6 @@ public void timedEmpty() { .assertResult(); } - @Test - public void newTimer() { - ObservableTimeoutTimed.NEW_TIMER.dispose(); - assertTrue(ObservableTimeoutTimed.NEW_TIMER.isDisposed()); - } - @Test public void badSource() { List errors = TestHelper.trackPluginErrors(); @@ -515,4 +509,89 @@ public void timedFallbackTake() { to.assertResult(1); } + + @Test + public void fallbackErrors() { + Observable.never() + .timeout(1, TimeUnit.MILLISECONDS, Observable.error(new TestException())) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void onNextOnTimeoutRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final TestScheduler sch = new TestScheduler(); + + final PublishSubject pp = PublishSubject.create(); + + TestObserver ts = pp.timeout(1, TimeUnit.SECONDS, sch).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sch.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + if (ts.valueCount() != 0) { + if (ts.errorCount() != 0) { + ts.assertFailure(TimeoutException.class, 1); + } else { + ts.assertValuesOnly(1); + } + } else { + ts.assertFailure(TimeoutException.class); + } + } + } + + @Test + public void onNextOnTimeoutRaceFallback() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final TestScheduler sch = new TestScheduler(); + + final PublishSubject pp = PublishSubject.create(); + + TestObserver ts = pp.timeout(1, TimeUnit.SECONDS, sch, Observable.just(2)).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sch.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + TestHelper.race(r1, r2); + + if (ts.isTerminated()) { + int c = ts.valueCount(); + if (c == 1) { + int v = ts.values().get(0); + assertTrue("" + v, v == 1 || v == 2); + } else { + ts.assertResult(1, 2); + } + } else { + ts.assertValuesOnly(1); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java index a76affd787..276971e5c9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutWithSelectorTest.java @@ -19,7 +19,7 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.junit.Test; import org.mockito.InOrder; @@ -31,7 +31,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -546,4 +546,314 @@ public void selectorFallbackTake() { to.assertResult(1); } + + @Test + public void lateOnTimeoutError() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final Observer[] sub = { null, null }; + + final Observable pp2 = new Observable() { + + int count; + + @Override + protected void subscribeActual( + Observer s) { + s.onSubscribe(Disposables.empty()); + sub[count++] = s; + } + }; + + TestObserver ts = ps.timeout(Functions.justFunction(pp2)).test(); + + ps.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + final Throwable ex = new TestException(); + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onError(ex); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void lateOnTimeoutFallbackRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final Observer[] sub = { null, null }; + + final Observable pp2 = new Observable() { + + int count; + + @Override + protected void subscribeActual( + Observer s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(Disposables.empty()); + sub[count++] = s; + } + }; + + TestObserver ts = ps.timeout(Functions.justFunction(pp2), Observable.never()).test(); + + ps.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + } + }; + + final Throwable ex = new TestException(); + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onError(ex); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onErrorOnTimeoutRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final Observer[] sub = { null, null }; + + final Observable pp2 = new Observable() { + + int count; + + @Override + protected void subscribeActual( + Observer s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(Disposables.empty()); + sub[count++] = s; + } + }; + + TestObserver ts = ps.timeout(Functions.justFunction(pp2)).test(); + + ps.onNext(0); + + final Throwable ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onCompleteOnTimeoutRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final Observer[] sub = { null, null }; + + final Observable pp2 = new Observable() { + + int count; + + @Override + protected void subscribeActual( + Observer s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(Disposables.empty()); + sub[count++] = s; + } + }; + + TestObserver ts = ps.timeout(Functions.justFunction(pp2)).test(); + + ps.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onCompleteOnTimeoutRaceFallback() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishSubject ps = PublishSubject.create(); + + final Observer[] sub = { null, null }; + + final Observable pp2 = new Observable() { + + int count; + + @Override + protected void subscribeActual( + Observer s) { + assertFalse(((Disposable)s).isDisposed()); + s.onSubscribe(Disposables.empty()); + sub[count++] = s; + } + }; + + TestObserver ts = ps.timeout(Functions.justFunction(pp2), Observable.never()).test(); + + ps.onNext(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + sub[0].onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValueAt(0, 0); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void disposedUpfront() { + PublishSubject ps = PublishSubject.create(); + final AtomicInteger counter = new AtomicInteger(); + + Observable timeoutAndFallback = Observable.never().doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + counter.incrementAndGet(); + } + }); + + ps + .timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback)) + .test(true) + .assertEmpty(); + + assertEquals(0, counter.get()); + } + + @Test + public void disposedUpfrontFallback() { + PublishSubject ps = PublishSubject.create(); + final AtomicInteger counter = new AtomicInteger(); + + Observable timeoutAndFallback = Observable.never().doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + counter.incrementAndGet(); + } + }); + + ps + .timeout(timeoutAndFallback, Functions.justFunction(timeoutAndFallback), timeoutAndFallback) + .test(true) + .assertEmpty(); + + assertEquals(0, counter.get()); + } }