diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java index 147f9ecc11..9bc0d63b65 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java @@ -148,7 +148,7 @@ static final class PublishSubscriber final int bufferSize; /** Tracks the subscribed InnerSubscribers. */ - final AtomicReference subscribers; + final AtomicReference[]> subscribers; /** * Atomically changed from false to true by connect to make sure the * connection is only performed by one thread. @@ -165,8 +165,9 @@ static final class PublishSubscriber /** Holds notifications from upstream. */ volatile SimpleQueue queue; + @SuppressWarnings("unchecked") PublishSubscriber(AtomicReference> current, int bufferSize) { - this.subscribers = new AtomicReference(EMPTY); + this.subscribers = new AtomicReference[]>(EMPTY); this.current = current; this.shouldConnect = new AtomicBoolean(); this.bufferSize = bufferSize; @@ -175,6 +176,7 @@ static final class PublishSubscriber @Override public void dispose() { if (subscribers.get() != TERMINATED) { + @SuppressWarnings("unchecked") InnerSubscriber[] ps = subscribers.getAndSet(TERMINATED); if (ps != TERMINATED) { current.compareAndSet(PublishSubscriber.this, null); @@ -263,7 +265,7 @@ boolean add(InnerSubscriber producer) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array - InnerSubscriber[] c = subscribers.get(); + InnerSubscriber[] c = subscribers.get(); // if this subscriber-to-source reached a terminal state by receiving // an onError or onComplete, just refuse to add the new producer if (c == TERMINATED) { @@ -271,7 +273,8 @@ boolean add(InnerSubscriber producer) { } // we perform a copy-on-write logic int len = c.length; - InnerSubscriber[] u = new InnerSubscriber[len + 1]; + @SuppressWarnings("unchecked") + InnerSubscriber[] u = new InnerSubscriber[len + 1]; System.arraycopy(c, 0, u, 0, len); u[len] = producer; // try setting the subscribers array @@ -287,11 +290,12 @@ boolean add(InnerSubscriber producer) { * Atomically removes the given InnerSubscriber from the subscribers array. * @param producer the producer to remove */ + @SuppressWarnings("unchecked") void remove(InnerSubscriber producer) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // let's read the current subscribers array - InnerSubscriber[] c = subscribers.get(); + InnerSubscriber[] c = subscribers.get(); int len = c.length; // if it is either empty or terminated, there is nothing to remove so we quit if (len == 0) { @@ -311,7 +315,7 @@ void remove(InnerSubscriber producer) { return; } // we do copy-on-write logic here - InnerSubscriber[] u; + InnerSubscriber[] u; // we don't create a new empty array if producer was the single inhabitant // but rather reuse an empty array if (len == 1) { @@ -340,6 +344,7 @@ void remove(InnerSubscriber producer) { * @param empty set to true if the queue is empty * @return true if there is indeed a terminal condition */ + @SuppressWarnings("unchecked") boolean checkTerminated(Object term, boolean empty) { // first of all, check if there is actually a terminal event if (term != null) { @@ -404,6 +409,17 @@ void dispatch() { return; } int missed = 1; + + // saving a local copy because this will be accessed after every item + // delivered to detect changes in the subscribers due to an onNext + // and thus not dropping items + AtomicReference[]> subscribers = this.subscribers; + + // We take a snapshot of the current child subscribers. + // Concurrent subscribers may miss this iteration, but it is to be expected + InnerSubscriber[] ps = subscribers.get(); + + outer: for (;;) { /* * We need to read terminalEvent before checking the queue for emptiness because @@ -434,10 +450,6 @@ void dispatch() { // this loop is the only one which can turn a non-empty queue into an empty one // and as such, no need to ask the queue itself again for that. if (!empty) { - // We take a snapshot of the current child subscribers. - // Concurrent subscribers may miss this iteration, but it is to be expected - @SuppressWarnings("unchecked") - InnerSubscriber[] ps = subscribers.get(); int len = ps.length; // Let's assume everyone requested the maximum value. @@ -452,14 +464,11 @@ void dispatch() { long r = ip.get(); // if there is one child subscriber that hasn't requested yet // we can't emit anything to anyone - if (r >= 0L) { - maxRequested = Math.min(maxRequested, r); - } else - // cancellation is indicated by a special value - if (r == CANCELLED) { + if (r != CANCELLED) { + maxRequested = Math.min(maxRequested, r - ip.emitted); + } else { cancelled++; } - // we ignore those with NOT_REQUESTED as if they aren't even there } // it may happen everyone has cancelled between here and subscribers.get() @@ -518,20 +527,36 @@ void dispatch() { } // we need to unwrap potential nulls T value = NotificationLite.getValue(v); + + boolean subscribersChanged = false; + // let's emit this value to all child subscribers for (InnerSubscriber ip : ps) { // if ip.get() is negative, the child has either cancelled in the // meantime or hasn't requested anything yet // this eager behavior will skip cancelled children in case // multiple values are available in the queue - if (ip.get() > 0L) { + long ipr = ip.get(); + if (ipr != CANCELLED) { + if (ipr != Long.MAX_VALUE) { + // indicate this child has received 1 element + ip.emitted++; + } ip.child.onNext(value); - // indicate this child has received 1 element - ip.produced(1); + } else { + subscribersChanged = true; } } // indicate we emitted one element d++; + + // see if the array of subscribers changed as a consequence + // of emission or concurrent activity + InnerSubscriber[] freshArray = subscribers.get(); + if (subscribersChanged || freshArray != ps) { + ps = freshArray; + continue outer; + } } // if we did emit at least one element, request more to replenish the queue @@ -552,6 +577,9 @@ void dispatch() { if (missed == 0) { break; } + + // get a fresh copy of the current subscribers + ps = subscribers.get(); } } } @@ -571,6 +599,9 @@ static final class InnerSubscriber extends AtomicLong implements Subscription */ volatile PublishSubscriber parent; + /** Track the number of emitted items (avoids decrementing the request counter). */ + long emitted; + InnerSubscriber(Subscriber child) { this.child = child; } @@ -586,15 +617,6 @@ public void request(long n) { } } - /** - * Indicate that values have been emitted to this child subscriber by the dispatch() method. - * @param n the number of items emitted - * @return the updated request value (may indicate how much can be produced or a terminal state) - */ - public long produced(long n) { - return BackpressureHelper.producedCancel(this, n); - } - @Override public void cancel() { long r = get(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java index d7cd85d716..079ef019c6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java @@ -153,8 +153,6 @@ static final class MulticastProcessor extends Flowable implements Flowable int consumed; - long emitted; - @SuppressWarnings("unchecked") MulticastProcessor(int prefetch, boolean delayError) { this.prefetch = prefetch; @@ -325,10 +323,12 @@ void drain() { int upstreamConsumed = consumed; int localLimit = limit; boolean canRequest = sourceMode != QueueSubscription.SYNC; - long e = emitted; + AtomicReference[]> subs = subscribers; + + MulticastSubscription[] array = subs.get(); + outer: for (;;) { - MulticastSubscription[] array = subscribers.get(); int n = array.length; @@ -336,7 +336,7 @@ void drain() { long r = Long.MAX_VALUE; for (MulticastSubscription ms : array) { - long u = ms.get(); + long u = ms.get() - ms.emitted; if (u != Long.MIN_VALUE) { if (r > u) { r = u; @@ -347,10 +347,10 @@ void drain() { } if (n == 0) { - r = e; + r = 0; } - while (e != r) { + while (r != 0) { if (isDisposed()) { q.clear(); return; @@ -393,21 +393,35 @@ void drain() { break; } + boolean subscribersChange = false; + for (MulticastSubscription ms : array) { - if (ms.get() != Long.MIN_VALUE) { + long msr = ms.get(); + if (msr != Long.MIN_VALUE) { + if (msr != Long.MAX_VALUE) { + ms.emitted++; + } ms.actual.onNext(v); + } else { + subscribersChange = true; } } - e++; + r--; if (canRequest && ++upstreamConsumed == localLimit) { upstreamConsumed = 0; s.get().request(localLimit); } + + MulticastSubscription[] freshArray = subs.get(); + if (subscribersChange || freshArray != array) { + array = freshArray; + continue outer; + } } - if (e == r) { + if (r == 0) { if (isDisposed()) { q.clear(); return; @@ -435,7 +449,6 @@ void drain() { } } - emitted = e; consumed = upstreamConsumed; missed = wip.addAndGet(-missed); if (missed == 0) { @@ -444,6 +457,7 @@ void drain() { if (q == null) { q = queue; } + array = subs.get(); } } @@ -476,6 +490,8 @@ static final class MulticastSubscription final MulticastProcessor parent; + long emitted; + MulticastSubscription(Subscriber actual, MulticastProcessor parent) { this.actual = actual; this.parent = parent; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java index 432f5231c4..866e8de378 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java @@ -19,7 +19,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; import org.reactivestreams.*; import io.reactivex.*; @@ -941,4 +941,365 @@ protected void subscribeActual(Subscriber s) { ref.get().add(new InnerSubscriber(new TestSubscriber())); ref.get().remove(null); } + + @Test + @Ignore("publish() keeps consuming the upstream if there are no subscribers, 3.x should change this") + public void subscriberSwap() { + final ConnectableFlowable co = Flowable.range(1, 5).publish(); + + co.connect(); + + TestSubscriber ts1 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + co.subscribe(ts1); + + ts1.assertResult(1); + + TestSubscriber ts2 = new TestSubscriber(0); + co.subscribe(ts2); + + ts2 + .assertEmpty() + .requestMore(4) + .assertResult(2, 3, 4, 5); + } + + @Test + public void subscriberLiveSwap() { + final ConnectableFlowable co = Flowable.range(1, 5).publish(); + + final TestSubscriber ts2 = new TestSubscriber(0); + + TestSubscriber ts1 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + co.subscribe(ts2); + } + }; + + co.subscribe(ts1); + + co.connect(); + + ts1.assertResult(1); + + ts2 + .assertEmpty() + .requestMore(4) + .assertResult(2, 3, 4, 5); + } + + @Test + public void selectorSubscriberSwap() { + final AtomicReference> ref = new AtomicReference>(); + + Flowable.range(1, 5).publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + ref.set(f); + return Flowable.never(); + } + }).test(); + + ref.get().take(2).test().assertResult(1, 2); + + ref.get() + .test(0) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(3, 4) + .requestMore(1) + .assertResult(3, 4, 5); + } + + @Test + public void leavingSubscriberOverrequests() { + final AtomicReference> ref = new AtomicReference>(); + + PublishProcessor pp = PublishProcessor.create(); + + pp.publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + ref.set(f); + return Flowable.never(); + } + }).test(); + + TestSubscriber ts1 = ref.get().take(2).test(); + + pp.onNext(1); + pp.onNext(2); + + ts1.assertResult(1, 2); + + pp.onNext(3); + pp.onNext(4); + + TestSubscriber ts2 = ref.get().test(0L); + + ts2.assertEmpty(); + + ts2.requestMore(2); + + ts2.assertValuesOnly(3, 4); + } + + // call a transformer only if the input is non-empty + @Test + public void composeIfNotEmpty() { + final FlowableTransformer transformer = new FlowableTransformer() { + @Override + public Publisher apply(Flowable g) { + return g.map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }); + } + }; + + final AtomicInteger calls = new AtomicInteger(); + Flowable.range(1, 5) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(final Flowable shared) + throws Exception { + return shared.take(1).concatMap(new Function>() { + @Override + public Publisher apply(Integer first) + throws Exception { + calls.incrementAndGet(); + return transformer.apply(Flowable.just(first).concatWith(shared)); + } + }); + } + }) + .test() + .assertResult(2, 3, 4, 5, 6); + + assertEquals(1, calls.get()); + } + + // call a transformer only if the input is non-empty + @Test + public void composeIfNotEmptyNotFused() { + final FlowableTransformer transformer = new FlowableTransformer() { + @Override + public Publisher apply(Flowable g) { + return g.map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }); + } + }; + + final AtomicInteger calls = new AtomicInteger(); + Flowable.range(1, 5).hide() + .publish(new Function, Publisher>() { + @Override + public Publisher apply(final Flowable shared) + throws Exception { + return shared.take(1).concatMap(new Function>() { + @Override + public Publisher apply(Integer first) + throws Exception { + calls.incrementAndGet(); + return transformer.apply(Flowable.just(first).concatWith(shared)); + } + }); + } + }) + .test() + .assertResult(2, 3, 4, 5, 6); + + assertEquals(1, calls.get()); + } + + // call a transformer only if the input is non-empty + @Test + public void composeIfNotEmptyIsEmpty() { + final FlowableTransformer transformer = new FlowableTransformer() { + @Override + public Publisher apply(Flowable g) { + return g.map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return v + 1; + } + }); + } + }; + + final AtomicInteger calls = new AtomicInteger(); + Flowable.empty().hide() + .publish(new Function, Publisher>() { + @Override + public Publisher apply(final Flowable shared) + throws Exception { + return shared.take(1).concatMap(new Function>() { + @Override + public Publisher apply(Integer first) + throws Exception { + calls.incrementAndGet(); + return transformer.apply(Flowable.just(first).concatWith(shared)); + } + }); + } + }) + .test() + .assertResult(); + + assertEquals(0, calls.get()); + } + + @Test + public void publishFunctionCancelOuterAfterOneInner() { + final AtomicReference> ref = new AtomicReference>(); + + PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + ref.set(f); + return Flowable.never(); + } + }).test(); + + ref.get().subscribe(new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + onComplete(); + ts.cancel(); + } + }); + + pp.onNext(1); + } + + @Test + public void publishFunctionCancelOuterAfterOneInnerBackpressured() { + final AtomicReference> ref = new AtomicReference>(); + + PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + ref.set(f); + return Flowable.never(); + } + }).test(); + + ref.get().subscribe(new TestSubscriber(1L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + onComplete(); + ts.cancel(); + } + }); + + pp.onNext(1); + } + + @Test + public void publishCancelOneAsync() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> ref = new AtomicReference>(); + + pp.publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + ref.set(f); + return Flowable.never(); + } + }).test(); + + final TestSubscriber ts1 = ref.get().test(); + TestSubscriber ts2 = ref.get().test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts2.assertValuesOnly(1); + } + } + + @Test + public void publishCancelOneAsync2() { + final PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable co = pp.publish(); + + final TestSubscriber ts1 = new TestSubscriber(); + + final AtomicReference> ref = new AtomicReference>(); + + co.subscribe(new FlowableSubscriber() { + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription s) { + ts1.onSubscribe(new BooleanSubscription()); + // pretend to be cancelled without removing it from the subscriber list + ref.set((InnerSubscriber)s); + } + + @Override + public void onNext(Integer t) { + ts1.onNext(t); + } + + @Override + public void onError(Throwable t) { + ts1.onError(t); + } + + @Override + public void onComplete() { + ts1.onComplete(); + } + }); + TestSubscriber ts2 = co.test(); + + co.connect(); + + ref.get().set(Long.MIN_VALUE); + + pp.onNext(1); + + ts1.assertEmpty(); + ts2.assertValuesOnly(1); + } }