diff --git a/src/main/java/io/reactivex/processors/MulticastProcessor.java b/src/main/java/io/reactivex/processors/MulticastProcessor.java new file mode 100644 index 0000000000..fdc1652dc2 --- /dev/null +++ b/src/main/java/io/reactivex/processors/MulticastProcessor.java @@ -0,0 +1,640 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.processors; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.annotations.*; +import io.reactivex.exceptions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.queue.*; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * A {@link FlowableProcessor} implementation that coordinates downstream requests through + * a front-buffer and stable-prefetching, optionally canceling the upstream if all + * subscribers have cancelled. + *

+ * + *

+ * This processor does not have a public constructor by design; a new empty instance of this + * {@code MulticastProcessor} can be created via the following {@code create} methods that + * allow configuring it: + *

+ *

+ * When the reference counting behavior is enabled, the {@code MulticastProcessor} cancels its + * upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be + * immediately completed. + *

+ * Because {@code MulticastProcessor} implements the {@link Subscriber} interface, calling + * {@code onSubscribe} is mandatory (Rule 2.12). + * If {@code MulticastProcessor} should run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, + * use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer. + * Failing to do so will lead to a {@link NullPointerException} at runtime. + *

+ * Use {@link #offer(Object)} to try and offer/emit items but don't fail if the + * internal buffer is full. + *

+ * A {@code MulticastProcessor} is a {@link Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (Rule 2.13) as + * parameters to {@link #onSubscribe(Subscription)}, {@link #offer(Object)}, {@link #onNext(Object)} and {@link #onError(Throwable)}. + * Such calls will result in a {@link NullPointerException} being thrown and the processor's state is not changed. + *

+ * Since a {@code MulticastProcessor} is a {@link io.reactivex.Flowable}, it supports backpressure. + * The backpressure from the currently subscribed {@link Subscriber}s are coordinated by emitting upstream + * items only if all of those {@code Subscriber}s have requested at least one item. This behavior + * is also called lockstep-mode because even if some {@code Subscriber}s can take any number + * of items, other {@code Subscriber}s requesting less or infrequently will slow down the overall + * throughput of the flow. + *

+ * Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@link FlowableProcessor}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} + * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively). + *

+ * This {@code MulticastProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasSubscribers()}. This processor doesn't allow peeking into its buffer. + *

+ * When this {@code MulticastProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, + * all previously signaled but not yet consumed items will be still available to {@code Subscriber}s and the respective + * terminal even is only emitted when all previous items have been successfully delivered to {@code Subscriber}s. + * If there are no {@code Subscriber}s, the remaining items will be buffered indefinitely. + *

+ * The {@code MulticastProcessor} does not support clearing its cached events (to appear empty again). + *

+ *
Backpressure:
+ *
The backpressure from the currently subscribed {@code Subscriber}s are coordinated by emitting upstream + * items only if all of those {@code Subscriber}s have requested at least one item. This behavior + * is also called lockstep-mode because even if some {@code Subscriber}s can take any number + * of items, other {@code Subscriber}s requesting less or infrequently will slow down the overall + * throughput of the flow.
+ *
Scheduler:
+ *
{@code MulticastProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Subscriber}s get notified on an arbitrary thread in a serialized fashion.
+ *
+ *

+ * Example: + *


+    MulticastProcessor<Integer> mp = Flowable.range(1, 10)
+    .subscribeWith(MulticastProcessor.create());
+
+    mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+    // --------------------
+
+    MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
+    mp2.start();
+
+    assertTrue(mp2.offer(1));
+    assertTrue(mp2.offer(2));
+    assertTrue(mp2.offer(3));
+    assertTrue(mp2.offer(4));
+
+    assertFalse(mp2.offer(5));
+
+    mp2.onComplete();
+
+    mp2.test().assertResult(1, 2, 3, 4);
+ * 
+ * @param the input and output value type + * @since 2.1.14 - experimental + */ +@Experimental +@BackpressureSupport(BackpressureKind.FULL) +@SchedulerSupport(SchedulerSupport.NONE) +public final class MulticastProcessor extends FlowableProcessor { + + final AtomicInteger wip; + + final AtomicReference upstream; + + final AtomicReference[]> subscribers; + + final AtomicBoolean once; + + final int bufferSize; + + final int limit; + + final boolean refcount; + + volatile SimpleQueue queue; + + volatile boolean done; + volatile Throwable error; + + int consumed; + + int fusionMode; + + @SuppressWarnings("rawtypes") + static final MulticastSubscription[] EMPTY = new MulticastSubscription[0]; + + @SuppressWarnings("rawtypes") + static final MulticastSubscription[] TERMINATED = new MulticastSubscription[0]; + + /** + * Constructs a fresh instance with the default Flowable.bufferSize() prefetch + * amount and no refCount-behavior. + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + @CheckReturnValue + @NonNull + public static MulticastProcessor create() { + return new MulticastProcessor(bufferSize(), false); + } + + /** + * Constructs a fresh instance with the default Flowable.bufferSize() prefetch + * amount and no refCount-behavior. + * @param the input and output value type + * @param refCount if true and if all Subscribers have canceled, the upstream + * is cancelled + * @return the new MulticastProcessor instance + */ + @CheckReturnValue + @NonNull + public static MulticastProcessor create(boolean refCount) { + return new MulticastProcessor(bufferSize(), refCount); + } + + /** + * Constructs a fresh instance with the given prefetch amount and no refCount behavior. + * @param bufferSize the prefetch amount + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + @CheckReturnValue + @NonNull + public static MulticastProcessor create(int bufferSize) { + return new MulticastProcessor(bufferSize, false); + } + + /** + * Constructs a fresh instance with the given prefetch amount and the optional + * refCount-behavior. + * @param bufferSize the prefetch amount + * @param refCount if true and if all Subscribers have canceled, the upstream + * is cancelled + * @param the input and output value type + * @return the new MulticastProcessor instance + */ + @CheckReturnValue + @NonNull + public static MulticastProcessor create(int bufferSize, boolean refCount) { + return new MulticastProcessor(bufferSize, refCount); + } + + /** + * Constructs a fresh instance with the given prefetch amount and the optional + * refCount-behavior. + * @param bufferSize the prefetch amount + * @param refCount if true and if all Subscribers have canceled, the upstream + * is cancelled + */ + @SuppressWarnings("unchecked") + MulticastProcessor(int bufferSize, boolean refCount) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + this.bufferSize = bufferSize; + this.limit = bufferSize - (bufferSize >> 2); + this.wip = new AtomicInteger(); + this.subscribers = new AtomicReference[]>(EMPTY); + this.upstream = new AtomicReference(); + this.refcount = refCount; + this.once = new AtomicBoolean(); + } + + /** + * Initializes this Processor by setting an upstream Subscription that + * ignores request amounts, uses a fixed buffer + * and allows using the onXXX and offer methods + * afterwards. + */ + public void start() { + if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { + queue = new SpscArrayQueue(bufferSize); + } + } + + /** + * Initializes this Processor by setting an upstream Subscription that + * ignores request amounts, uses an unbounded buffer + * and allows using the onXXX and offer methods + * afterwards. + */ + public void startUnbounded() { + if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { + queue = new SpscLinkedArrayQueue(bufferSize); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(upstream, s)) { + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + int m = qs.requestFusion(QueueSubscription.ANY); + if (m == QueueSubscription.SYNC) { + fusionMode = m; + queue = qs; + done = true; + drain(); + return; + } + if (m == QueueSubscription.ASYNC) { + fusionMode = m; + queue = qs; + + s.request(bufferSize); + return; + } + } + + queue = new SpscArrayQueue(bufferSize); + + s.request(bufferSize); + } + } + + @Override + public void onNext(T t) { + if (once.get()) { + return; + } + if (fusionMode == QueueSubscription.NONE) { + ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); + if (!queue.offer(t)) { + SubscriptionHelper.cancel(upstream); + onError(new MissingBackpressureException()); + return; + } + } + drain(); + } + + /** + * Tries to offer an item into the internal queue and returns false + * if the queue is full. + * @param t the item to offer, not null + * @return true if successful, false if the queue is full + */ + public boolean offer(T t) { + if (once.get()) { + return false; + } + ObjectHelper.requireNonNull(t, "offer called with null. Null values are generally not allowed in 2.x operators and sources."); + if (fusionMode == QueueSubscription.NONE) { + if (queue.offer(t)) { + drain(); + return true; + } + } + return false; + } + + @Override + public void onError(Throwable t) { + ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); + if (once.compareAndSet(false, true)) { + error = t; + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + done = true; + drain(); + } + } + + @Override + public boolean hasSubscribers() { + return subscribers.get().length != 0; + } + + @Override + public boolean hasThrowable() { + return once.get() && error != null; + } + + @Override + public boolean hasComplete() { + return once.get() && error == null; + } + + @Override + public Throwable getThrowable() { + return once.get() ? error : null; + } + + @Override + protected void subscribeActual(Subscriber s) { + MulticastSubscription ms = new MulticastSubscription(s, this); + s.onSubscribe(ms); + if (add(ms)) { + if (ms.get() == Long.MIN_VALUE) { + remove(ms); + } else { + drain(); + } + } else { + if (once.get() || !refcount) { + Throwable ex = error; + if (ex != null) { + s.onError(ex); + return; + } + } + s.onComplete(); + } + } + + boolean add(MulticastSubscription inner) { + for (;;) { + MulticastSubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + @SuppressWarnings("unchecked") + MulticastSubscription[] b = new MulticastSubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + @SuppressWarnings("unchecked") + void remove(MulticastSubscription inner) { + for (;;) { + MulticastSubscription[] a = subscribers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + break; + } + + if (n == 1) { + if (refcount) { + if (subscribers.compareAndSet(a, TERMINATED)) { + SubscriptionHelper.cancel(upstream); + once.set(true); + break; + } + } else { + if (subscribers.compareAndSet(a, EMPTY)) { + break; + } + } + } else { + MulticastSubscription[] b = new MulticastSubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + if (subscribers.compareAndSet(a, b)) { + break; + } + } + } + } + + @SuppressWarnings("unchecked") + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + int missed = 1; + AtomicReference[]> subs = subscribers; + int c = consumed; + int lim = limit; + int fm = fusionMode; + + outer: + for (;;) { + + SimpleQueue q = queue; + + if (q != null) { + MulticastSubscription[] as = subs.get(); + int n = as.length; + + if (n != 0) { + long r = -1L; + + for (MulticastSubscription a : as) { + long ra = a.get(); + if (ra >= 0L) { + if (r == -1L) { + r = ra - a.emitted; + } else { + r = Math.min(r, ra - a.emitted); + } + } + } + + while (r > 0L) { + MulticastSubscription[] bs = subs.get(); + + if (bs == TERMINATED) { + q.clear(); + return; + } + + if (as != bs) { + continue outer; + } + + boolean d = done; + + T v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + SubscriptionHelper.cancel(upstream); + d = true; + v = null; + error = ex; + done = true; + } + boolean empty = v == null; + + if (d && empty) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } + } + return; + } + + if (empty) { + break; + } + + for (MulticastSubscription inner : as) { + inner.onNext(v); + } + + r--; + + if (fm != QueueSubscription.SYNC) { + if (++c == lim) { + c = 0; + upstream.get().request(lim); + } + } + } + + if (r == 0) { + MulticastSubscription[] bs = subs.get(); + + if (bs == TERMINATED) { + q.clear(); + return; + } + + if (as != bs) { + continue outer; + } + + if (done && q.isEmpty()) { + Throwable ex = error; + if (ex != null) { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onError(ex); + } + } else { + for (MulticastSubscription inner : subs.getAndSet(TERMINATED)) { + inner.onComplete(); + } + } + return; + } + } + } + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class MulticastSubscription extends AtomicLong implements Subscription { + + private static final long serialVersionUID = -363282618957264509L; + + final Subscriber actual; + + final MulticastProcessor parent; + + long emitted; + + MulticastSubscription(Subscriber actual, MulticastProcessor parent) { + this.actual = actual; + this.parent = parent; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + for (;;) { + long r = get(); + if (r == Long.MIN_VALUE || r == Long.MAX_VALUE) { + break; + } + long u = r + n; + if (u < 0L) { + u = Long.MAX_VALUE; + } + if (compareAndSet(r, u)) { + parent.drain(); + break; + } + } + } + } + + @Override + public void cancel() { + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + } + } + + void onNext(T t) { + if (get() != Long.MIN_VALUE) { + emitted++; + actual.onNext(t); + } + } + + void onError(Throwable t) { + if (get() != Long.MIN_VALUE) { + actual.onError(t); + } + } + + void onComplete() { + if (get() != Long.MIN_VALUE) { + actual.onComplete(); + } + } + } +} diff --git a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java new file mode 100644 index 0000000000..c85665eacd --- /dev/null +++ b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java @@ -0,0 +1,789 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.processors; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.TestSubscriber; + +public class MulticastProcessorTest { + + @Test + public void complete() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + TestSubscriber ts = mp.test(); + + assertTrue(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.onNext(1); + mp.onComplete(); + + ts.assertResult(1); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void error() { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + TestSubscriber ts = mp.test(); + + assertTrue(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.onNext(1); + mp.onError(new IOException()); + + ts.assertFailure(IOException.class, 1); + + assertFalse(mp.hasSubscribers()); + assertFalse(mp.hasComplete()); + assertTrue(mp.hasThrowable()); + assertNotNull(mp.getThrowable()); + assertTrue("" + mp.getThrowable(), mp.getThrowable() instanceof IOException); + + mp.test().assertFailure(IOException.class); + } + + @Test + public void overflow() { + MulticastProcessor mp = MulticastProcessor.create(1); + mp.start(); + + TestSubscriber ts = mp.test(0); + + assertTrue(mp.offer(1)); + assertFalse(mp.offer(2)); + + mp.onNext(3); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertFailure(MissingBackpressureException.class, 1); + + mp.test().assertFailure(MissingBackpressureException.class); + } + + @Test + public void backpressure() { + MulticastProcessor mp = MulticastProcessor.create(16, false); + mp.start(); + + for (int i = 0; i < 10; i++) { + mp.onNext(i); + } + mp.onComplete(); + + mp.test(0) + .assertEmpty() + .requestMore(1) + .assertValuesOnly(0) + .requestMore(2) + .assertValuesOnly(0, 1, 2) + .requestMore(3) + .assertValuesOnly(0, 1, 2, 3, 4, 5) + .requestMore(4) + .assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void refCounted() { + MulticastProcessor mp = MulticastProcessor.create(true); + BooleanSubscription bs = new BooleanSubscription(); + + mp.onSubscribe(bs); + + assertFalse(bs.isCancelled()); + + mp.test().cancel(); + + assertTrue(bs.isCancelled()); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void refCounted2() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + BooleanSubscription bs = new BooleanSubscription(); + + mp.onSubscribe(bs); + + assertFalse(bs.isCancelled()); + + mp.test(1, true); + + assertTrue(bs.isCancelled()); + + assertFalse(mp.hasSubscribers()); + assertTrue(mp.hasComplete()); + assertFalse(mp.hasThrowable()); + assertNull(mp.getThrowable()); + + mp.test().assertResult(); + } + + @Test + public void longRunning() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp.test().assertValueCount(1000).assertNoErrors().assertComplete(); + } + + + @Test + public void oneByOne() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp + .rebatchRequests(1) + .test() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + MulticastProcessor mp = MulticastProcessor.create(16); + Flowable.range(1, 1000).subscribe(mp); + + mp.take(10).test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeRefCount() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + Flowable.range(1, 1000).subscribe(mp); + + mp.take(10).test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeRefCountExact() { + MulticastProcessor mp = MulticastProcessor.create(16, true); + Flowable.range(1, 10).subscribe(mp); + + mp + .rebatchRequests(10) + .take(10) + .test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void crossCancel() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(); + ts2.assertResult(1); + } + + @Test + public void crossCancelError() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onError(Throwable t) { + super.onError(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onError(new IOException()); + + ts1.assertResult(1); + ts2.assertFailure(IOException.class, 1); + } + + @Test + public void crossCancelComplete() { + + final TestSubscriber ts1 = new TestSubscriber(); + + TestSubscriber ts2 = new TestSubscriber() { + @Override + public void onComplete() { + super.onComplete(); + ts1.cancel(); + ts1.onNext(2); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(1, 2); + ts2.assertResult(1); + } + + @Test + public void crossCancel1() { + + final TestSubscriber ts1 = new TestSubscriber(1); + + TestSubscriber ts2 = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + ts1.cancel(); + ts1.onComplete(); + } + }; + + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(ts2); + mp.subscribe(ts1); + + mp.start(); + + mp.onNext(1); + mp.onComplete(); + + ts1.assertResult(); + ts2.assertResult(1); + } + + @Test + public void requestCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(false); + + mp.subscribe(new FlowableSubscriber() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription t) { + t.request(-1); + t.request(1); + t.request(Long.MAX_VALUE); + t.request(Long.MAX_VALUE); + t.cancel(); + t.cancel(); + t.request(2); + } + }); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void unbounded() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.startUnbounded(); + + for (int i = 0; i < 10; i++) { + assertTrue(mp.offer(i)); + } + mp.onComplete(); + + mp.test().assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void multiStart() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(4, false); + + mp.start(); + mp.start(); + mp.startUnbounded(); + BooleanSubscription bs = new BooleanSubscription(); + mp.onSubscribe(bs); + + assertTrue(bs.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertError(errors, 1, ProtocolViolationException.class); + TestHelper.assertError(errors, 2, ProtocolViolationException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test(expected = NullPointerException.class) + public void onNextNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.onNext(null); + } + + + @Test(expected = NullPointerException.class) + public void onOfferNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.offer(null); + } + + @Test(expected = NullPointerException.class) + public void onErrorNull() { + MulticastProcessor mp = MulticastProcessor.create(4, false); + mp.start(); + mp.onError(null); + } + + @Test + public void afterTerminated() { + List errors = TestHelper.trackPluginErrors(); + try { + MulticastProcessor mp = MulticastProcessor.create(); + mp.start(); + mp.onComplete(); + mp.onComplete(); + mp.onError(new IOException()); + mp.onNext(1); + mp.offer(1); + + mp.test().assertResult(); + + TestHelper.assertUndeliverable(errors, 0, IOException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void asyncFused() { + UnicastProcessor up = UnicastProcessor.create(); + MulticastProcessor mp = MulticastProcessor.create(4); + + up.subscribe(mp); + + TestSubscriber ts = mp.test(); + + for (int i = 0; i < 10; i++) { + up.onNext(i); + } + + assertFalse(mp.offer(10)); + + up.onComplete(); + + ts.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + } + + @Test + public void fusionCrash() { + MulticastProcessor mp = Flowable.range(1, 5) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new IOException(); + } + }) + .subscribeWith(MulticastProcessor.create()); + + mp.test().assertFailure(IOException.class); + } + + @Test + public void lockstep() { + MulticastProcessor mp = MulticastProcessor.create(); + + TestSubscriber ts1 = mp.test(); + mp.start(); + + mp.onNext(1); + mp.onNext(2); + + ts1.assertValues(1, 2); + + TestSubscriber ts2 = mp.test(0); + + ts2.assertEmpty(); + + mp.onNext(3); + + ts1.assertValues(1, 2); + ts2.assertEmpty(); + + mp.onComplete(); + + ts1.assertValues(1, 2); + ts2.assertEmpty(); + + ts2.request(1); + + ts1.assertResult(1, 2, 3); + ts2.assertResult(3); + } + + @Test + public void rejectedFusion() { + + MulticastProcessor mp = MulticastProcessor.create(); + + TestHelper.rejectFlowableFusion() + .subscribe(mp); + + mp.test().assertEmpty(); + } + + @Test + public void addRemoveRaceNoRefCount() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(); + + final TestSubscriber ts = mp.test(); + final TestSubscriber ts2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(mp.hasSubscribers()); + } + } + + @Test + public void addRemoveRaceNoRefCountNonEmpty() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(); + + mp.test(); + final TestSubscriber ts = mp.test(); + final TestSubscriber ts2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(mp.hasSubscribers()); + } + } + + @Test + public void addRemoveRaceWitRefCount() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final TestSubscriber ts = mp.test(); + final TestSubscriber ts2 = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void cancelUpfront() { + MulticastProcessor mp = MulticastProcessor.create(); + + mp.test(0, true).assertEmpty(); + + assertFalse(mp.hasSubscribers()); + } + + + @Test + public void cancelUpfrontOtherConsumersPresent() { + MulticastProcessor mp = MulticastProcessor.create(); + + mp.test(); + + mp.test(0, true).assertEmpty(); + + assertTrue(mp.hasSubscribers()); + } + + @Test + public void consumerRequestRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + mp.startUnbounded(); + mp.onNext(1); + mp.onNext(2); + + final TestSubscriber ts = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertValuesOnly(1, 2); + } + } + + @Test + public void consumerUpstreamRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final Flowable source = Flowable.range(1, 5); + + final TestSubscriber ts = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.request(5); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + source.subscribe(mp); + } + }; + + TestHelper.race(r1, r2); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + } + + @Test + public void emitCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + mp.startUnbounded(); + + final TestSubscriber ts = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + mp.onNext(1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void cancelCancelDrain() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final TestSubscriber ts1 = mp.test(); + final TestSubscriber ts2 = mp.test(); + + mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts2.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void requestCancelRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final MulticastProcessor mp = MulticastProcessor.create(true); + + final TestSubscriber ts1 = mp.test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts1.request(1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void noUpstream() { + MulticastProcessor mp = MulticastProcessor.create(); + + TestSubscriber ts = mp.test(0); + + ts.request(1); + + assertTrue(mp.hasSubscribers()); + } + +}