diff --git a/src/main/java/rx/subjects/PublishSubject.java b/src/main/java/rx/subjects/PublishSubject.java index 90fae91c00..5072a3d6e7 100644 --- a/src/main/java/rx/subjects/PublishSubject.java +++ b/src/main/java/rx/subjects/PublishSubject.java @@ -16,13 +16,13 @@ package rx.subjects; import java.util.*; +import java.util.concurrent.atomic.*; +import rx.*; import rx.Observer; import rx.annotations.Beta; -import rx.exceptions.Exceptions; -import rx.functions.Action1; -import rx.internal.operators.NotificationLite; -import rx.subjects.SubjectSubscriptionManager.SubjectObserver; +import rx.exceptions.*; +import rx.internal.operators.BackpressureUtils; /** * Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the @@ -50,8 +50,8 @@ * the type of items observed and emitted by the Subject */ public final class PublishSubject extends Subject { - final SubjectSubscriptionManager state; - private final NotificationLite nl = NotificationLite.instance(); + + final PublishSubjectState state; /** * Creates and returns a new {@code PublishSubject}. @@ -60,63 +60,33 @@ public final class PublishSubject extends Subject { * @return the new {@code PublishSubject} */ public static PublishSubject create() { - final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); - state.onTerminated = new Action1>() { - - @Override - public void call(SubjectObserver o) { - o.emitFirst(state.getLatest(), state.nl); - } - - }; - return new PublishSubject(state, state); + return new PublishSubject(new PublishSubjectState()); } - - protected PublishSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager state) { - super(onSubscribe); + + protected PublishSubject(PublishSubjectState state) { + super(state); this.state = state; } @Override - public void onCompleted() { - if (state.active) { - Object n = nl.completed(); - for (SubjectObserver bo : state.terminate(n)) { - bo.emitNext(n, state.nl); - } - } - + public void onNext(T v) { + state.onNext(v); } @Override - public void onError(final Throwable e) { - if (state.active) { - Object n = nl.error(e); - List errors = null; - for (SubjectObserver bo : state.terminate(n)) { - try { - bo.emitNext(n, state.nl); - } catch (Throwable e2) { - if (errors == null) { - errors = new ArrayList(); - } - errors.add(e2); - } - } - Exceptions.throwIfAny(errors); - } + public void onError(Throwable e) { + state.onError(e); } @Override - public void onNext(T v) { - for (SubjectObserver bo : state.observers()) { - bo.onNext(v); - } + public void onCompleted() { + state.onCompleted(); } + @Override public boolean hasObservers() { - return state.observers().length > 0; + return state.get().length != 0; } /** @@ -125,8 +95,7 @@ public boolean hasObservers() { */ @Beta public boolean hasThrowable() { - Object o = state.getLatest(); - return nl.isError(o); + return state.get() == PublishSubjectState.TERMINATED && state.error != null; } /** * Check if the Subject has terminated normally. @@ -134,8 +103,7 @@ public boolean hasThrowable() { */ @Beta public boolean hasCompleted() { - Object o = state.getLatest(); - return o != null && !nl.isError(o); + return state.get() == PublishSubjectState.TERMINATED && state.error == null; } /** * Returns the Throwable that terminated the Subject. @@ -144,10 +112,216 @@ public boolean hasCompleted() { */ @Beta public Throwable getThrowable() { - Object o = state.getLatest(); - if (nl.isError(o)) { - return nl.getError(o); + if (state.get() == PublishSubjectState.TERMINATED) { + return state.error; } return null; } + + static final class PublishSubjectState + extends AtomicReference[]> + implements OnSubscribe, Observer { + + /** */ + private static final long serialVersionUID = -7568940796666027140L; + + @SuppressWarnings("rawtypes") + static final PublishSubjectProducer[] EMPTY = new PublishSubjectProducer[0]; + @SuppressWarnings("rawtypes") + static final PublishSubjectProducer[] TERMINATED = new PublishSubjectProducer[0]; + + Throwable error; + + @SuppressWarnings("unchecked") + public PublishSubjectState() { + lazySet(EMPTY); + } + + @Override + public void call(Subscriber t) { + PublishSubjectProducer pp = new PublishSubjectProducer(this, t); + t.add(pp); + t.setProducer(pp); + + if (add(pp)) { + if (pp.isUnsubscribed()) { + remove(pp); + } + } else { + Throwable ex = error; + if (ex != null) { + t.onError(ex); + } else { + t.onCompleted(); + } + } + } + + + boolean add(PublishSubjectProducer inner) { + for (;;) { + PublishSubjectProducer[] curr = get(); + if (curr == TERMINATED) { + return false; + } + + int n = curr.length; + + @SuppressWarnings("unchecked") + PublishSubjectProducer[] next = new PublishSubjectProducer[n + 1]; + System.arraycopy(curr, 0, next, 0, n); + + next[n] = inner; + if (compareAndSet(curr, next)) { + return true; + } + } + } + + @SuppressWarnings("unchecked") + void remove(PublishSubjectProducer inner) { + for (;;) { + PublishSubjectProducer[] curr = get(); + if (curr == TERMINATED || curr == EMPTY) { + return; + } + + int n = curr.length; + int j = -1; + for (int i = 0; i < n; i++) { + if (curr[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + PublishSubjectProducer[] next; + if (n == 1) { + next = EMPTY; + } else { + next = new PublishSubjectProducer[n - 1]; + System.arraycopy(curr, 0, next, 0, j); + System.arraycopy(curr, j + 1, next, j, n - j - 1); + } + + if (compareAndSet(curr, next)) { + return; + } + } + } + + @Override + public void onNext(T t) { + for (PublishSubjectProducer pp : get()) { + pp.onNext(t); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable e) { + error = e; + List errors = null; + for (PublishSubjectProducer pp : getAndSet(TERMINATED)) { + try { + pp.onError(e); + } catch (Throwable ex) { + if (errors == null) { + errors = new ArrayList(1); + } + errors.add(ex); + } + } + + Exceptions.throwIfAny(errors); + } + + @SuppressWarnings("unchecked") + @Override + public void onCompleted() { + for (PublishSubjectProducer pp : getAndSet(TERMINATED)) { + pp.onCompleted(); + } + } + + } + + static final class PublishSubjectProducer + extends AtomicLong + implements Producer, Subscription, Observer { + /** */ + private static final long serialVersionUID = 6451806817170721536L; + + final PublishSubjectState parent; + + final Subscriber actual; + + long produced; + + public PublishSubjectProducer(PublishSubjectState parent, Subscriber actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void request(long n) { + if (BackpressureUtils.validate(n)) { + for (;;) { + long r = get(); + if (r == Long.MIN_VALUE) { + return; + } + long u = BackpressureUtils.addCap(r, n); + if (compareAndSet(r, u)) { + return; + } + } + } + } + + @Override + public boolean isUnsubscribed() { + return get() == Long.MIN_VALUE; + } + + @Override + public void unsubscribe() { + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + } + } + + @Override + public void onNext(T t) { + long r = get(); + if (r != Long.MIN_VALUE) { + long p = produced; + if (r != p) { + produced = p + 1; + actual.onNext(t); + } else { + unsubscribe(); + actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests")); + } + } + } + + @Override + public void onError(Throwable e) { + if (get() != Long.MIN_VALUE) { + actual.onError(e); + } + } + + @Override + public void onCompleted() { + if (get() != Long.MIN_VALUE) { + actual.onCompleted(); + } + } + } } diff --git a/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java b/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java index 3e94a20b8b..cd58bb2004 100644 --- a/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java +++ b/src/test/java/rx/internal/operators/OperatorMapNotificationTest.java @@ -132,9 +132,16 @@ public Integer call() { ps.onNext(3); ps.onCompleted(); - ts.assertValues(2, 3, 4, 5); + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValue(0); ts.assertNoErrors(); ts.assertCompleted(); + } } diff --git a/src/test/java/rx/internal/operators/OperatorPublishFunctionTest.java b/src/test/java/rx/internal/operators/OperatorPublishFunctionTest.java index 0221c921ac..cbfa331977 100644 --- a/src/test/java/rx/internal/operators/OperatorPublishFunctionTest.java +++ b/src/test/java/rx/internal/operators/OperatorPublishFunctionTest.java @@ -222,7 +222,7 @@ public Observable call(Observable o) { ts.assertError(MissingBackpressureException.class); ts.assertNotCompleted(); - Assert.assertEquals("Queue full?!", ts.getOnErrorEvents().get(0).getMessage()); + Assert.assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage()); Assert.assertFalse("Source has subscribers?", ps.hasObservers()); } @@ -249,7 +249,7 @@ public Observable call(Observable o) { ts.assertError(MissingBackpressureException.class); ts.assertNotCompleted(); - Assert.assertEquals("Queue full?!", ts.getOnErrorEvents().get(0).getMessage()); + Assert.assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage()); Assert.assertFalse("Source has subscribers?", ps.hasObservers()); } } diff --git a/src/test/java/rx/internal/operators/OperatorTakeTest.java b/src/test/java/rx/internal/operators/OperatorTakeTest.java index df23a64150..efffdbccc1 100644 --- a/src/test/java/rx/internal/operators/OperatorTakeTest.java +++ b/src/test/java/rx/internal/operators/OperatorTakeTest.java @@ -425,7 +425,9 @@ public void testReentrantTake() { TestSubscriber ts = new TestSubscriber(); - source.take(1).doOnNext(new Action1() { + source + .rebatchRequests(2) // take(1) requests 1 + .take(1).doOnNext(new Action1() { @Override public void call(Integer v) { source.onNext(2); diff --git a/src/test/java/rx/subjects/PublishSubjectTest.java b/src/test/java/rx/subjects/PublishSubjectTest.java index 4dd974837a..e5f4ee647d 100644 --- a/src/test/java/rx/subjects/PublishSubjectTest.java +++ b/src/test/java/rx/subjects/PublishSubjectTest.java @@ -33,9 +33,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.exceptions.CompositeException; -import rx.exceptions.OnErrorNotImplementedException; -import rx.exceptions.TestException; +import rx.exceptions.*; import rx.functions.Action1; import rx.functions.Func1; import rx.observers.TestSubscriber; @@ -501,4 +499,64 @@ public void testPublishSubjectValueError() { assertTrue(async.hasThrowable()); assertSame(te, async.getThrowable()); } + + @Test + public void backpressureFailFast() { + PublishSubject ps = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(1); + + ps.subscribe(ts); + + ps.onNext(1); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ps.onNext(2); + + ts.assertValue(1); + ts.assertError(MissingBackpressureException.class); + ts.assertNotCompleted(); + + assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage()); + } + + @Test + public void crossUnsubscribe() { + PublishSubject ps = PublishSubject.create(); + + final TestSubscriber ts0 = TestSubscriber.create(); + + TestSubscriber ts1 = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 2) { + ts0.unsubscribe(); + } + } + }; + + ps.subscribe(ts1); + ps.subscribe(ts0); + + ps.onNext(1); + + ts0.assertValue(1); + ts1.assertValue(1); + + ps.onNext(2); + ps.onCompleted(); + + ts0.assertValue(1); + ts0.assertNoErrors(); + ts0.assertNotCompleted(); + ts0.assertUnsubscribed(); + + ts1.assertValues(1, 2); + ts1.assertNoErrors(); + ts1.assertCompleted(); + } }