diff --git a/README.md b/README.md index d37e4c23..b56410f4 100644 --- a/README.md +++ b/README.md @@ -90,8 +90,7 @@ public interface Publisher { | 9 | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. | | 10 | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. | | 11 | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. | -| 12 | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. | -| 13 | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. | +| 12 | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. | [1] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state. @@ -148,7 +147,7 @@ public interface Subscription { | 11 | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). | | 12 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. | | 13 | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. | -| 14 | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.13](#1.13)]. +| 14 | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.12](#1.12)]. | 15 | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. | | 16 | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. | | 17 | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. | diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java index a0bb43f5..c44e3b5b 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java @@ -106,7 +106,7 @@ private void doSubscribe() { if (iterator == null) iterator = Collections.emptyList().iterator(); // So we can assume that `iterator` is never null } catch(final Throwable t) { - terminateDueTo(t); // Here we send onError, obeying rule 1.12 + terminateDueTo(t); // Here we send onError, obeying rule 1.09 } if (!cancelled) { @@ -177,7 +177,7 @@ private void terminateDueTo(final Throwable t) { cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 try { subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber` - } catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.13, and all we can do is to log it. + } catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.12, and all we can do is to log it. (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); } } diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 054498e2..cfa386a2 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -17,7 +17,7 @@ import java.util.HashSet; import java.util.Set; -public abstract class IdentityProcessorVerification extends WithHelperPublisher +public abstract class IdentityProcessorVerification extends WithHelperPublisher implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { private final TestEnvironment env; @@ -276,6 +276,11 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(); } + @Override @Test + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { + publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + } + @Override @Test public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); @@ -287,23 +292,18 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable { } @Override @Test - public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { - publisherVerification.required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { + publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); } @Override @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { - publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { + publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); } @Override @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { - publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); - } - - @Override @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { - publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { + publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); } @Override @Test @@ -373,7 +373,7 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4 // for multiple subscribers - @Test + @Test public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { optionalMultipleSubscribersTest(2, new Function() { @Override diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index b3857b57..7ec36283 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -453,29 +453,9 @@ public void run(Publisher pub) throws Throwable { }); } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10 - @Override @Test - public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { - notVerified(); // can we meaningfully test this? - } - - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11 + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.09 @Override @Test - public void optional_spec111_maySupportMultiSubscribe() throws Throwable { - optionalActivePublisherTest(1, false, new PublisherTestRun() { - @Override - public void run(Publisher pub) throws Throwable { - ManualSubscriber sub1 = env.newManualSubscriber(pub); - ManualSubscriber sub2 = env.newManualSubscriber(pub); - - env.verifyNoAsyncErrors(); - } - }); - } - - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12 - @Override @Test - public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable { whenHasErrorPublisherTest(new PublisherTestRun() { @Override public void run(Publisher pub) throws Throwable { @@ -496,13 +476,33 @@ public void onSubscribe(Subscription subs) { onErrorLatch.assertClosed("Should have received onError"); env.verifyNoAsyncErrors(); - } + } }); } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13 + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10 + @Override @Test + public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { + notVerified(); // can we meaningfully test this? + } + + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11 @Override @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { + public void optional_spec111_maySupportMultiSubscribe() throws Throwable { + optionalActivePublisherTest(1, false, new PublisherTestRun() { + @Override + public void run(Publisher pub) throws Throwable { + ManualSubscriber sub1 = env.newManualSubscriber(pub); + ManualSubscriber sub2 = env.newManualSubscriber(pub); + + env.verifyNoAsyncErrors(); + } + }); + } + + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12 + @Override @Test + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { optionalActivePublisherTest(5, true, new PublisherTestRun() { // This test is skipped if the publisher is unbounded (never sends onComplete) @Override public void run(Publisher pub) throws InterruptedException { @@ -551,9 +551,9 @@ public void run(Publisher pub) throws InterruptedException { }); } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13 + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12 @Override @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { optionalActivePublisherTest(3, false, new PublisherTestRun() { // This test is skipped if the publisher cannot produce enough elements @Override public void run(Publisher pub) throws Throwable { @@ -584,9 +584,9 @@ public void run(Publisher pub) throws Throwable { }); } - // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13 + // Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12 @Override @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { optionalActivePublisherTest(3, true, new PublisherTestRun() { // This test is skipped if the publisher is unbounded (never sends onComplete) @Override public void run(Publisher pub) throws Throwable { diff --git a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java b/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java index db89afff..17d08b58 100644 --- a/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java +++ b/tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java @@ -22,12 +22,12 @@ public interface PublisherVerificationRules { void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable; void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable; void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable; + void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable; void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable; void optional_spec111_maySupportMultiSubscribe() throws Throwable; - void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable; - void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable; - void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable; - void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable; + void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable; + void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable; + void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable; void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable; void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable; void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception; diff --git a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java index ba557cfe..0c4f67e9 100644 --- a/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java @@ -251,51 +251,51 @@ public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSigna } @Test - public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice_shouldFailBy_skippingSinceOptional() throws Throwable { - requireTestFailure(new ThrowingRunnable() { - @Override public void run() throws Throwable { - noopPublisherVerification().untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); + public void optional_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_actuallyPass() throws Throwable { + customPublisherVerification(SKIP, new Publisher() { + @Override public void subscribe(Subscriber s) { + s.onError(new RuntimeException("Sorry, I'm busy now. Call me later.")); } - }, "Not verified by this TCK."); + }).required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); } @Test - public void optional_spec111_maySupportMultiSubscribe_shouldFailBy_actuallyPass() throws Throwable { - noopPublisherVerification().optional_spec111_maySupportMultiSubscribe(); - } - - @Test - public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_shouldFail() throws Throwable { + public void optional_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_shouldFail() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { customPublisherVerification(SKIP, new Publisher() { @Override public void subscribe(Subscriber s) { } - }).required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + }).required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); } }, "Should have received onError"); } @Test - public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_actuallyPass() throws Throwable { - customPublisherVerification(SKIP, new Publisher() { - @Override public void subscribe(Subscriber s) { - s.onError(new RuntimeException("Sorry, I'm busy now. Call me later.")); + public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_beSkippedForNoGivenErrorPublisher() throws Throwable { + requireTestSkip(new ThrowingRunnable() { + @Override public void run() throws Throwable { + noopPublisherVerification().required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); } - }).required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + }, PublisherVerification.SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); } @Test - public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_beSkippedForNoGivenErrorPublisher() throws Throwable { - requireTestSkip(new ThrowingRunnable() { + public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice_shouldFailBy_skippingSinceOptional() throws Throwable { + requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { - noopPublisherVerification().required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe(); + noopPublisherVerification().untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); } - }, PublisherVerification.SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); + }, "Not verified by this TCK."); + } + + @Test + public void optional_spec111_maySupportMultiSubscribe_shouldFailBy_actuallyPass() throws Throwable { + noopPublisherVerification().optional_spec111_maySupportMultiSubscribe(); } @Test - public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable { + public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable { requireTestFailure(new ThrowingRunnable() { @Override public void run() throws Throwable { customPublisherVerification(new Publisher() { @@ -313,7 +313,7 @@ public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfI } }); } - }).required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); + }).required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); } }, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers: Lists differ at element "); }