Skip to content

Fixes #210 by removing 1.12 and repurposing its TCK checks for 1.09 #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ public interface Publisher<T> {
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
| <a name="1.13">13</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |

[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

Expand Down Expand Up @@ -148,7 +147,7 @@ public interface Subscription {
| <a name="3.11">11</a> | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). |
| <a name="3.12">12</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. |
| <a name="3.13">13</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. |
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.13](#1.13)].
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.12](#1.12)].
| <a name="3.15">15</a> | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
| <a name="3.16">16</a> | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
| <a name="3.17">17</a> | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void doSubscribe() {
if (iterator == null)
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
} catch(final Throwable t) {
terminateDueTo(t); // Here we send onError, obeying rule 1.12
terminateDueTo(t); // Here we send onError, obeying rule 1.09
}

if (!cancelled) {
Expand Down Expand Up @@ -177,7 +177,7 @@ private void terminateDueTo(final Throwable t) {
cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
try {
subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber`
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.13, and all we can do is to log it.
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.12, and all we can do is to log it.
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.HashSet;
import java.util.Set;

public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {

private final TestEnvironment env;
Expand Down Expand Up @@ -276,6 +276,11 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl
publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber();
}

@Override @Test
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
}

@Override @Test
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice();
Expand All @@ -287,23 +292,18 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
}

@Override @Test
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
publisherVerification.required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
}

@Override @Test
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}

@Override @Test
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}

@Override @Test
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
publisherVerification.required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
}

@Override @Test
Expand Down Expand Up @@ -373,7 +373,7 @@ public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue()

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
// for multiple subscribers
@Test
@Test
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable {
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,29 +453,9 @@ public void run(Publisher<T> pub) throws Throwable {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
@Override @Test
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
notVerified(); // can we meaningfully test this?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.09
@Override @Test
public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);

env.verifyNoAsyncErrors();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand All @@ -496,13 +476,33 @@ public void onSubscribe(Subscription subs) {
onErrorLatch.assertClosed("Should have received onError");

env.verifyNoAsyncErrors();
}
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.10
@Override @Test
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable {
notVerified(); // can we meaningfully test this?
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
@Override @Test
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);

env.verifyNoAsyncErrors();
}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws InterruptedException {
Expand Down Expand Up @@ -551,9 +551,9 @@ public void run(Publisher<T> pub) throws InterruptedException {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand Down Expand Up @@ -584,9 +584,9 @@ public void run(Publisher<T> pub) throws Throwable {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.13
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ public interface PublisherVerificationRules {
void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable;
void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable;
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable;
void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable;
void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception;
Expand Down
Loading