Skip to content

Commit 6cf6b90

Browse files
committed
Clarifies the signalling sequence in the spec and
adds TCK verification to ensure signal ordering is proper, also amends the examples to reflect the spec change.
1 parent 9333df9 commit 6cf6b90

File tree

10 files changed

+202
-53
lines changed

10 files changed

+202
-53
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme
5757
In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol:
5858

5959
```
60-
onError | (onSubscribe onNext* (onError | onComplete)?)
60+
onSubscribe onNext* (onError | onComplete)?
6161
```
6262

63+
This means that `onSubscribe` is always signalled,
64+
followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled.
65+
6366
#### NOTES
6467

6568
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
@@ -87,10 +90,10 @@ public interface Publisher<T> {
8790
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8891
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8992
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
90-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
93+
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
9194
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9295
| <a name="1.11">11</a> | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. |
93-
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
96+
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` after calling `onSubscribe`. |
9497
| <a name="1.13">13</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
9598

9699
[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ private void doSubscribe() {
106106
if (iterator == null)
107107
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
108108
} catch(final Throwable t) {
109+
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.12
110+
@Override public void cancel() {}
111+
@Override public void request(long n) {}
112+
});
109113
terminateDueTo(t); // Here we send onError, obeying rule 1.12
110114
}
111115

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ private final void handleOnSubscribe(final Subscription s) {
101101

102102
private final void handleOnNext(final T element) {
103103
if (!done) { // If we aren't already done
104-
if(subscription == null) { // Check for spec violation of 2.1
105-
(new IllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
104+
if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
105+
// Check for spec violation of 2.1 and 1.09
106+
(new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
106107
} else {
107108
try {
108109
if (whenNext(element)) {
@@ -116,7 +117,7 @@ private final void handleOnNext(final T element) {
116117
done(); // This is legal according to rule 2.6
117118
}
118119
} catch(final Throwable t) {
119-
done();
120+
done();
120121
try {
121122
onError(t);
122123
} catch(final Throwable t2) {
@@ -130,21 +131,32 @@ private final void handleOnNext(final T element) {
130131

131132
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
132133
private void handleOnComplete() {
133-
done = true; // Obey rule 2.4
134-
whenComplete();
134+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
135+
// Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09
136+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
137+
} else {
138+
done = true; // Obey rule 2.4
139+
whenComplete();
140+
}
135141
}
136142

137143
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
138144
private void handleOnError(final Throwable error) {
139-
done = true; // Obey rule 2.4
140-
whenError(error);
145+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
146+
// Publisher is not allowed to signal onError before onSubscribe according to rule 1.09
147+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
148+
} else {
149+
done = true; // Obey rule 2.4
150+
whenError(error);
151+
}
141152
}
142153

143154
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
144155

145156
@Override public final void onSubscribe(final Subscription s) {
146157
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
147158
if (s == null) throw null;
159+
148160
signal(new OnSubscribe(s));
149161
}
150162

@@ -180,7 +192,6 @@ private void handleOnError(final Throwable error) {
180192
try {
181193
final Signal s = inboundSignals.poll(); // We take a signal off the queue
182194
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
183-
184195
// Below we simply unpack the `Signal`s and invoke the corresponding methods
185196
if (s instanceof OnNext<?>)
186197
handleOnNext(((OnNext<T>)s).next);

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,32 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
4141
}
4242

4343
@Override public void onNext(final T element) {
44-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
45-
if (element == null) throw null;
44+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
45+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
46+
} else {
47+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
48+
if (element == null) throw null;
4649

47-
if (!done) { // If we aren't already done
48-
try {
49-
if (foreach(element)) {
50-
try {
51-
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
52-
} catch(final Throwable t) {
53-
// Subscription.request is not allowed to throw according to rule 3.16
54-
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
50+
if (!done) { // If we aren't already done
51+
try {
52+
if (foreach(element)) {
53+
try {
54+
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
55+
} catch (final Throwable t) {
56+
// Subscription.request is not allowed to throw according to rule 3.16
57+
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
58+
}
59+
} else {
60+
done();
5561
}
56-
} else {
62+
} catch (final Throwable t) {
5763
done();
58-
}
59-
} catch(final Throwable t) {
60-
done();
61-
try {
62-
onError(t);
63-
} catch(final Throwable t2) {
64-
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
65-
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
64+
try {
65+
onError(t);
66+
} catch (final Throwable t2) {
67+
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
68+
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
69+
}
6670
}
6771
}
6872
}
@@ -86,14 +90,22 @@ private void done() {
8690
protected abstract boolean foreach(final T element);
8791

8892
@Override public void onError(final Throwable t) {
89-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
90-
if (t == null) throw null;
91-
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
92-
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
93+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
94+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
95+
} else {
96+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
97+
if (t == null) throw null;
98+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
99+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
100+
}
93101
}
94102

95103
@Override public void onComplete() {
96-
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
97-
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
104+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
105+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
106+
} else {
107+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
108+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
109+
}
98110
}
99111
}

examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivestreams.example.unicast;
22

3+
import java.lang.Override;
34
import java.util.Collections;
45
import java.util.Iterator;
56
import org.reactivestreams.Publisher;
@@ -30,7 +31,11 @@ public IterablePublisherTest() {
3031
}
3132

3233
@Override public Publisher<Integer> createErrorStatePublisher() {
33-
return null;
34+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
35+
@Override public Iterator<Integer> iterator() {
36+
throw new RuntimeException("Error state signal!");
37+
}
38+
}, e);
3439
}
3540

3641
@Override public long maxElementsFromPublisher() {

examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.testng.annotations.AfterClass;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.ExecutorService;
11+
import java.util.Iterator;
1112

1213
@Test // Must be here for TestNG to find and run this, do not remove
1314
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
@@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() {
2526
}
2627

2728
@Override public Publisher<Integer> createErrorStatePublisher() {
28-
return null;
29+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
30+
@Override public Iterator<Integer> iterator() {
31+
throw new RuntimeException("Error state signal!");
32+
}
33+
}, e);
2934
}
3035

3136
@Override public long maxElementsFromPublisher() {

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,11 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
286286
publisherVerification.optional_spec111_maySupportMultiSubscribe();
287287
}
288288

289+
@Override @Test
290+
public void required_spec112_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
291+
publisherVerification.required_spec112_mustIssueOnSubscribeForNonNullSubscriber();
292+
}
293+
289294
@Override @Test
290295
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
291296
publisherVerification.required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();

0 commit comments

Comments
 (0)