diff --git a/README.md b/README.md index b56410f4..2193fa20 100644 --- a/README.md +++ b/README.md @@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol: ``` -onError | (onSubscribe onNext* (onError | onComplete)?) +onSubscribe onNext* (onError | onComplete)? ``` +This means that `onSubscribe` is always signalled, +followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled. + #### NOTES - The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt @@ -87,7 +90,7 @@ public interface Publisher { | 6 | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. | | 7 | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. | | 8 | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. | -| 9 | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. | +| 9 | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). | | 10 | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. | | 11 | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. | | 12 | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. | @@ -147,7 +150,7 @@ public interface Subscription { | 11 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). | | 12 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. | | 13 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. | -| 14 | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.12](#1.12)]. +| 14 | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.9](#1.9)]. | 15 | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. | | 16 | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. | | 17 | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. | diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java index c44e3b5b..b1e7caed 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java @@ -106,6 +106,10 @@ private void doSubscribe() { if (iterator == null) iterator = Collections.emptyList().iterator(); // So we can assume that `iterator` is never null } catch(final Throwable t) { + subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.9 + @Override public void cancel() {} + @Override public void request(long n) {} + }); terminateDueTo(t); // Here we send onError, obeying rule 1.09 } @@ -177,7 +181,7 @@ private void terminateDueTo(final Throwable t) { cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 try { subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber` - } catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.12, and all we can do is to log it. + } catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it. (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); } } diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java index 7fe0320e..321b23c3 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java @@ -101,8 +101,9 @@ private final void handleOnSubscribe(final Subscription s) { private final void handleOnNext(final T element) { if (!done) { // If we aren't already done - if(subscription == null) { // Check for spec violation of 2.1 - (new IllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err); + if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + // Check for spec violation of 2.1 and 1.09 + (new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err); } else { try { if (whenNext(element)) { @@ -116,7 +117,7 @@ private final void handleOnNext(final T element) { done(); // This is legal according to rule 2.6 } } catch(final Throwable t) { - done(); + done(); try { onError(t); } catch(final Throwable t2) { @@ -130,14 +131,24 @@ private final void handleOnNext(final T element) { // Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher` private void handleOnComplete() { - done = true; // Obey rule 2.4 - whenComplete(); + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + // Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09 + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); + } else { + done = true; // Obey rule 2.4 + whenComplete(); + } } // Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher` private void handleOnError(final Throwable error) { - done = true; // Obey rule 2.4 - whenError(error); + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + // Publisher is not allowed to signal onError before onSubscribe according to rule 1.09 + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); + } else { + done = true; // Obey rule 2.4 + whenError(error); + } } // We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time @@ -145,6 +156,7 @@ private void handleOnError(final Throwable error) { @Override public final void onSubscribe(final Subscription s) { // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` if (s == null) throw null; + signal(new OnSubscribe(s)); } @@ -180,7 +192,6 @@ private void handleOnError(final Throwable error) { try { final Signal s = inboundSignals.poll(); // We take a signal off the queue if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8 - // Below we simply unpack the `Signal`s and invoke the corresponding methods if (s instanceof OnNext) handleOnNext(((OnNext)s).next); diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java b/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java index c4d57d23..94759f42 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java @@ -41,28 +41,32 @@ public abstract class SyncSubscriber implements Subscriber { } @Override public void onNext(final T element) { - // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` - if (element == null) throw null; + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err); + } else { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` + if (element == null) throw null; - if (!done) { // If we aren't already done - try { - if (foreach(element)) { - try { - subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time - } catch(final Throwable t) { - // Subscription.request is not allowed to throw according to rule 3.16 - (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err); + if (!done) { // If we aren't already done + try { + if (foreach(element)) { + try { + subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time + } catch (final Throwable t) { + // Subscription.request is not allowed to throw according to rule 3.16 + (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err); + } + } else { + done(); } - } else { + } catch (final Throwable t) { done(); - } - } catch(final Throwable t) { - done(); - try { - onError(t); - } catch(final Throwable t2) { - //Subscriber.onError is not allowed to throw an exception, according to rule 2.13 - (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); + try { + onError(t); + } catch (final Throwable t2) { + //Subscriber.onError is not allowed to throw an exception, according to rule 2.13 + (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); + } } } } @@ -86,14 +90,22 @@ private void done() { protected abstract boolean foreach(final T element); @Override public void onError(final Throwable t) { - // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` - if (t == null) throw null; - // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 - // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); + } else { + // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` + if (t == null) throw null; + // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 + // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + } } @Override public void onComplete() { - // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 - // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec + (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); + } else { + // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 + // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 + } } } \ No newline at end of file diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java index 15d5ec9a..7fd9810f 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java @@ -1,5 +1,6 @@ package org.reactivestreams.example.unicast; +import java.lang.Override; import java.util.Collections; import java.util.Iterator; import org.reactivestreams.Publisher; @@ -30,7 +31,11 @@ public IterablePublisherTest() { } @Override public Publisher createErrorStatePublisher() { - return null; + return new AsyncIterablePublisher(new Iterable() { + @Override public Iterator iterator() { + throw new RuntimeException("Error state signal!"); + } + }, e); } @Override public long maxElementsFromPublisher() { diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java index 40548933..f0091b52 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java @@ -8,6 +8,7 @@ import org.testng.annotations.AfterClass; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.Iterator; @Test // Must be here for TestNG to find and run this, do not remove public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification { @@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() { } @Override public Publisher createErrorStatePublisher() { - return null; + return new AsyncIterablePublisher(new Iterable() { + @Override public Iterator iterator() { + throw new RuntimeException("Error state signal!"); + } + }, e); } @Override public long maxElementsFromPublisher() { diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index cfa386a2..ca1e2297 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -277,8 +277,13 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl } @Override @Test - public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { - publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { + publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); + } + + @Override @Test + public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { + publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); } @Override @Test diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 7ec36283..3cc9be9a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -14,6 +14,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.Override; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.ArrayList; @@ -344,16 +345,24 @@ public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { whenHasErrorPublisherTest(new PublisherTestRun() { @Override public void run(final Publisher pub) throws InterruptedException { - final Latch latch = new Latch(env); + final Latch onErrorlatch = new Latch(env); + final Latch onSubscribeLatch = new Latch(env); pub.subscribe(new TestEnvironment.TestSubscriber(env) { + @Override + public void onSubscribe(Subscription subs) { + onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); + onSubscribeLatch.close(); + } @Override public void onError(Throwable cause) { - latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); - latch.close(); + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); + onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); + onErrorlatch.close(); } }); - latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); + onSubscribeLatch.expectClose("Should have received onSubscribe"); + onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); } @@ -432,6 +441,7 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError notVerified(); // can we meaningfully test this? } + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9 @Override @Test public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { notVerified(); // can we meaningfully test this? @@ -440,40 +450,79 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9 @Override @Test public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { - activePublisherTest(0, false, new PublisherTestRun() { - @Override - public void run(Publisher pub) throws Throwable { - try { - pub.subscribe(null); - env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub)); - } catch (NullPointerException npe) { - } - env.verifyNoAsyncErrors(); + activePublisherTest(0, false, new PublisherTestRun() { + @Override + public void run(Publisher pub) throws Throwable { + try { + pub.subscribe(null); + env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe"); + } catch (NullPointerException npe) { } + env.verifyNoAsyncErrors(); + } }); } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.09 + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9 + @Override @Test + public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { + activePublisherTest(0, false, new PublisherTestRun() { + @Override + public void run(Publisher pub) throws Throwable { + final Latch onSubscribeLatch = new Latch(env); + pub.subscribe(new Subscriber() { + @Override + public void onError(Throwable cause) { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); + } + + @Override + public void onSubscribe(Subscription subs) { + onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); + onSubscribeLatch.close(); + } + + @Override + public void onNext(T elem) { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); + } + + @Override + public void onComplete() { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); + } + }); + onSubscribeLatch.expectClose("Should have received onSubscribe"); + env.verifyNoAsyncErrors(); + } + }); + } + + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9 @Override @Test - public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { whenHasErrorPublisherTest(new PublisherTestRun() { @Override public void run(Publisher pub) throws Throwable { final Latch onErrorLatch = new Latch(env); + final Latch onSubscribeLatch = new Latch(env); ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(env) { @Override public void onError(Throwable cause) { + onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); onErrorLatch.assertOpen("Only one onError call expected"); onErrorLatch.close(); } @Override public void onSubscribe(Subscription subs) { - env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber"); + onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); + onSubscribeLatch.close(); } }; pub.subscribe(sub); - onErrorLatch.assertClosed("Should have received onError"); + onSubscribeLatch.expectClose("Should have received onSubscribe"); + onErrorLatch.expectClose("Should have received onError"); env.verifyNoAsyncErrors(); } diff --git a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java index 17d08b58..29adfd51 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java @@ -20,9 +20,10 @@ public interface PublisherVerificationRules { void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable; void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable; void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable; + void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable; void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable; void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable; - void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable; + void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable; void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable; void optional_spec111_maySupportMultiSubscribe() throws Throwable; void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable; diff --git a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java index 0c4f67e9..54ba97b5 100644 --- a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java @@ -251,31 +251,88 @@ public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSigna } @Test - public void optional_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_actuallyPass() throws Throwable { + public void required_spec109_subscribeThrowNPEOnNullSubscriber_shouldFailIfDoesntThrowNPE() throws Throwable { + requireTestFailure(new ThrowingRunnable() { + @Override public void run() throws Throwable { + customPublisherVerification(new Publisher() { + @Override public void subscribe(final Subscriber s) { + + } + }).required_spec109_subscribeThrowNPEOnNullSubscriber(); + } + }, "Publisher did not throw a NullPointerException when given a null Subscribe in subscribe"); + } + + @Test + public void optional_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe_actuallyPass() throws Throwable { customPublisherVerification(SKIP, new Publisher() { @Override public void subscribe(Subscriber s) { + s.onSubscribe(new NoopSubscription()); s.onError(new RuntimeException("Sorry, I'm busy now. Call me later.")); } - }).required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + }).required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); } @Test - public void optional_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_shouldFail() throws Throwable { + public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber_mustFailIfOnCompleteHappensFirst() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { - customPublisherVerification(SKIP, new Publisher() { + customPublisherVerification(new Publisher() { + @Override + public void subscribe(Subscriber s) { + s.onComplete(); + } + }).required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); + } + }, "onSubscribe should be called prior to onComplete always"); + } + + @Test + public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber_mustFailIfOnNextHappensFirst() throws Throwable { + requireTestFailure(new ThrowingRunnable() { + @Override public void run() throws Throwable { + customPublisherVerification(new Publisher() { @Override public void subscribe(Subscriber s) { + s.onNext(1337); + } + }).required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); + } + }, "onSubscribe should be called prior to onNext always"); + } + + @Test + public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber_mustFailIfOnErrorHappensFirst() throws Throwable { + requireTestFailure(new ThrowingRunnable() { + @Override public void run() throws Throwable { + customPublisherVerification(new Publisher() { + @Override public void subscribe(Subscriber s) { + s.onError(new TestException()); + } + }).required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); + } + }, "onSubscribe should be called prior to onError always"); + } + + @Test + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe_shouldFail() throws Throwable { + requireTestFailure(new ThrowingRunnable() { + @Override + public void run() throws Throwable { + customPublisherVerification(SKIP, new Publisher() { + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new NoopSubscription()); } - }).required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + }).required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); } }, "Should have received onError"); } @Test - public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_beSkippedForNoGivenErrorPublisher() throws Throwable { + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe_beSkippedForNoGivenErrorPublisher() throws Throwable { requireTestSkip(new ThrowingRunnable() { @Override public void run() throws Throwable { - noopPublisherVerification().required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + noopPublisherVerification().required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); } }, PublisherVerification.SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); }