Skip to content

Attempt to clarify the signalling sequence in the spec #212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme
In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol:

```
onError | (onSubscribe onNext* (onError | onComplete)?)
onSubscribe onNext* (onError | onComplete)?
```

This means that `onSubscribe` is always signalled,
followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drewhk WDYT about the updated text?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine!


#### NOTES

- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
Expand Down Expand Up @@ -87,7 +90,7 @@ public interface Publisher<T> {
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
Expand Down Expand Up @@ -147,7 +150,7 @@ public interface Subscription {
| <a name="3.11">11</a> | While the `Subscription` is not cancelled, `Subscription.request(long n)` MAY synchronously call `onComplete` or `onError` on this (or other) subscriber(s). |
| <a name="3.12">12</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually stop signaling its `Subscriber`. The operation is NOT REQUIRED to affect the `Subscription` immediately. |
| <a name="3.13">13</a> | While the `Subscription` is not cancelled, `Subscription.cancel()` MUST request the `Publisher` to eventually drop any references to the corresponding subscriber. Re-subscribing with the same `Subscriber` object is discouraged [see [2.12](#2.12)], but this specification does not mandate that it is disallowed since that would mean having to store previously cancelled subscriptions indefinitely. |
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.12](#1.12)].
| <a name="3.14">14</a> | While the `Subscription` is not cancelled, calling `Subscription.cancel` MAY cause the `Publisher`, if stateful, to transition into the `shut-down` state if no other `Subscription` exists at this point [see [1.9](#1.9)].
| <a name="3.15">15</a> | Calling `Subscription.cancel` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
| <a name="3.16">16</a> | Calling `Subscription.request` MUST return normally. The only legal way to signal failure to a `Subscriber` is via the `onError` method. |
| <a name="3.17">17</a> | A `Subscription` MUST support an unbounded number of calls to request and MUST support a demand (sum requested - sum delivered) up to 2^63-1 (`java.lang.Long.MAX_VALUE`). A demand equal or greater than 2^63-1 (`java.lang.Long.MAX_VALUE`) MAY be considered by the `Publisher` as “effectively unbounded”[[1](#footnote-3-1)]. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ private void doSubscribe() {
if (iterator == null)
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
} catch(final Throwable t) {
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.9
@Override public void cancel() {}
@Override public void request(long n) {}
});
terminateDueTo(t); // Here we send onError, obeying rule 1.09
}

Expand Down Expand Up @@ -177,7 +181,7 @@ private void terminateDueTo(final Throwable t) {
cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
try {
subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber`
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.12, and all we can do is to log it.
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it.
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ private final void handleOnSubscribe(final Subscription s) {

private final void handleOnNext(final T element) {
if (!done) { // If we aren't already done
if(subscription == null) { // Check for spec violation of 2.1
(new IllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
// Check for spec violation of 2.1 and 1.09
(new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
} else {
try {
if (whenNext(element)) {
Expand All @@ -116,7 +117,7 @@ private final void handleOnNext(final T element) {
done(); // This is legal according to rule 2.6
}
} catch(final Throwable t) {
done();
done();
try {
onError(t);
} catch(final Throwable t2) {
Expand All @@ -130,21 +131,32 @@ private final void handleOnNext(final T element) {

// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
private void handleOnComplete() {
done = true; // Obey rule 2.4
whenComplete();
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
// Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
} else {
done = true; // Obey rule 2.4
whenComplete();
}
}

// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
private void handleOnError(final Throwable error) {
done = true; // Obey rule 2.4
whenError(error);
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
// Publisher is not allowed to signal onError before onSubscribe according to rule 1.09
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
} else {
done = true; // Obey rule 2.4
whenError(error);
}
}

// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time

@Override public final void onSubscribe(final Subscription s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) throw null;

signal(new OnSubscribe(s));
}

Expand Down Expand Up @@ -180,7 +192,6 @@ private void handleOnError(final Throwable error) {
try {
final Signal s = inboundSignals.poll(); // We take a signal off the queue
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8

// Below we simply unpack the `Signal`s and invoke the corresponding methods
if (s instanceof OnNext<?>)
handleOnNext(((OnNext<T>)s).next);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,32 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
}

@Override public void onNext(final T element) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
if (element == null) throw null;
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
} else {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
if (element == null) throw null;

if (!done) { // If we aren't already done
try {
if (foreach(element)) {
try {
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
} catch(final Throwable t) {
// Subscription.request is not allowed to throw according to rule 3.16
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
if (!done) { // If we aren't already done
try {
if (foreach(element)) {
try {
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
} catch (final Throwable t) {
// Subscription.request is not allowed to throw according to rule 3.16
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
}
} else {
done();
}
} else {
} catch (final Throwable t) {
done();
}
} catch(final Throwable t) {
done();
try {
onError(t);
} catch(final Throwable t2) {
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
try {
onError(t);
} catch (final Throwable t2) {
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
}
}
}
}
Expand All @@ -86,14 +90,22 @@ private void done() {
protected abstract boolean foreach(final T element);

@Override public void onError(final Throwable t) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
if (t == null) throw null;
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
} else {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
if (t == null) throw null;
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
}
}

@Override public void onComplete() {
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
} else {
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.reactivestreams.example.unicast;

import java.lang.Override;
import java.util.Collections;
import java.util.Iterator;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -30,7 +31,11 @@ public IterablePublisherTest() {
}

@Override public Publisher<Integer> createErrorStatePublisher() {
return null;
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
@Override public Iterator<Integer> iterator() {
throw new RuntimeException("Error state signal!");
}
}, e);
}

@Override public long maxElementsFromPublisher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.testng.annotations.AfterClass;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.Iterator;

@Test // Must be here for TestNG to find and run this, do not remove
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
Expand All @@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() {
}

@Override public Publisher<Integer> createErrorStatePublisher() {
return null;
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
@Override public Iterator<Integer> iterator() {
throw new RuntimeException("Error state signal!");
}
}, e);
}

@Override public long maxElementsFromPublisher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,13 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl
}

@Override @Test
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
}

@Override @Test
public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
}

@Override @Test
Expand Down
Loading