Skip to content

Commit 65d3ea5

Browse files
committed
Clarifies the signalling sequence in the spec and
adds TCK verification to ensure signal ordering is proper, also amends the examples to reflect the spec change.
1 parent b56ee39 commit 65d3ea5

File tree

10 files changed

+216
-64
lines changed

10 files changed

+216
-64
lines changed

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme
5757
In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol:
5858

5959
```
60-
onError | (onSubscribe onNext* (onError | onComplete)?)
60+
onSubscribe onNext* (onError | onComplete)?
6161
```
6262

63+
This means that `onSubscribe` is always signalled,
64+
followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled.
65+
6366
#### NOTES
6467

6568
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
@@ -87,7 +90,7 @@ public interface Publisher<T> {
8790
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8891
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8992
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
90-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
93+
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
9194
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9295
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
9396
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ private void doSubscribe() {
106106
if (iterator == null)
107107
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
108108
} catch(final Throwable t) {
109+
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.12
110+
@Override public void cancel() {}
111+
@Override public void request(long n) {}
112+
});
109113
terminateDueTo(t); // Here we send onError, obeying rule 1.09
110114
}
111115

examples/src/main/java/org/reactivestreams/example/unicast/AsyncSubscriber.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ private final void handleOnSubscribe(final Subscription s) {
101101

102102
private final void handleOnNext(final T element) {
103103
if (!done) { // If we aren't already done
104-
if(subscription == null) { // Check for spec violation of 2.1
105-
(new IllegalStateException("Someone violated the Reactive Streams rule 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
104+
if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
105+
// Check for spec violation of 2.1 and 1.09
106+
(new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
106107
} else {
107108
try {
108109
if (whenNext(element)) {
@@ -116,7 +117,7 @@ private final void handleOnNext(final T element) {
116117
done(); // This is legal according to rule 2.6
117118
}
118119
} catch(final Throwable t) {
119-
done();
120+
done();
120121
try {
121122
onError(t);
122123
} catch(final Throwable t2) {
@@ -130,21 +131,32 @@ private final void handleOnNext(final T element) {
130131

131132
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
132133
private void handleOnComplete() {
133-
done = true; // Obey rule 2.4
134-
whenComplete();
134+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
135+
// Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09
136+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
137+
} else {
138+
done = true; // Obey rule 2.4
139+
whenComplete();
140+
}
135141
}
136142

137143
// Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
138144
private void handleOnError(final Throwable error) {
139-
done = true; // Obey rule 2.4
140-
whenError(error);
145+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
146+
// Publisher is not allowed to signal onError before onSubscribe according to rule 1.09
147+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
148+
} else {
149+
done = true; // Obey rule 2.4
150+
whenError(error);
151+
}
141152
}
142153

143154
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
144155

145156
@Override public final void onSubscribe(final Subscription s) {
146157
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
147158
if (s == null) throw null;
159+
148160
signal(new OnSubscribe(s));
149161
}
150162

@@ -180,7 +192,6 @@ private void handleOnError(final Throwable error) {
180192
try {
181193
final Signal s = inboundSignals.poll(); // We take a signal off the queue
182194
if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
183-
184195
// Below we simply unpack the `Signal`s and invoke the corresponding methods
185196
if (s instanceof OnNext<?>)
186197
handleOnNext(((OnNext<T>)s).next);

examples/src/main/java/org/reactivestreams/example/unicast/SyncSubscriber.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,32 @@ public abstract class SyncSubscriber<T> implements Subscriber<T> {
4141
}
4242

4343
@Override public void onNext(final T element) {
44-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
45-
if (element == null) throw null;
44+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
45+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
46+
} else {
47+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
48+
if (element == null) throw null;
4649

47-
if (!done) { // If we aren't already done
48-
try {
49-
if (foreach(element)) {
50-
try {
51-
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
52-
} catch(final Throwable t) {
53-
// Subscription.request is not allowed to throw according to rule 3.16
54-
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
50+
if (!done) { // If we aren't already done
51+
try {
52+
if (foreach(element)) {
53+
try {
54+
subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
55+
} catch (final Throwable t) {
56+
// Subscription.request is not allowed to throw according to rule 3.16
57+
(new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from cancel.", t)).printStackTrace(System.err);
58+
}
59+
} else {
60+
done();
5561
}
56-
} else {
62+
} catch (final Throwable t) {
5763
done();
58-
}
59-
} catch(final Throwable t) {
60-
done();
61-
try {
62-
onError(t);
63-
} catch(final Throwable t2) {
64-
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
65-
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
64+
try {
65+
onError(t);
66+
} catch (final Throwable t2) {
67+
//Subscriber.onError is not allowed to throw an exception, according to rule 2.13
68+
(new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
69+
}
6670
}
6771
}
6872
}
@@ -86,14 +90,22 @@ private void done() {
8690
protected abstract boolean foreach(final T element);
8791

8892
@Override public void onError(final Throwable t) {
89-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
90-
if (t == null) throw null;
91-
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
92-
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
93+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
94+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
95+
} else {
96+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
97+
if (t == null) throw null;
98+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
99+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
100+
}
93101
}
94102

95103
@Override public void onComplete() {
96-
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
97-
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
104+
if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
105+
(new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
106+
} else {
107+
// Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
108+
// And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
109+
}
98110
}
99111
}

examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivestreams.example.unicast;
22

3+
import java.lang.Override;
34
import java.util.Collections;
45
import java.util.Iterator;
56
import org.reactivestreams.Publisher;
@@ -30,7 +31,11 @@ public IterablePublisherTest() {
3031
}
3132

3233
@Override public Publisher<Integer> createErrorStatePublisher() {
33-
return null;
34+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
35+
@Override public Iterator<Integer> iterator() {
36+
throw new RuntimeException("Error state signal!");
37+
}
38+
}, e);
3439
}
3540

3641
@Override public long maxElementsFromPublisher() {

examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.testng.annotations.AfterClass;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.ExecutorService;
11+
import java.util.Iterator;
1112

1213
@Test // Must be here for TestNG to find and run this, do not remove
1314
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
@@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() {
2526
}
2627

2728
@Override public Publisher<Integer> createErrorStatePublisher() {
28-
return null;
29+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
30+
@Override public Iterator<Integer> iterator() {
31+
throw new RuntimeException("Error state signal!");
32+
}
33+
}, e);
2934
}
3035

3136
@Override public long maxElementsFromPublisher() {

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,13 @@ public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwabl
277277
}
278278

279279
@Override @Test
280-
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
281-
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
280+
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
281+
publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe();
282+
}
283+
284+
@Override @Test
285+
public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
286+
publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber();
282287
}
283288

284289
@Override @Test

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.testng.annotations.BeforeMethod;
1515
import org.testng.annotations.Test;
1616

17+
import java.lang.Override;
1718
import java.lang.ref.ReferenceQueue;
1819
import java.lang.ref.WeakReference;
1920
import java.util.ArrayList;
@@ -344,16 +345,24 @@ public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
344345
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
345346
@Override
346347
public void run(final Publisher<T> pub) throws InterruptedException {
347-
final Latch latch = new Latch(env);
348+
final Latch onErrorlatch = new Latch(env);
349+
final Latch onSubscribeLatch = new Latch(env);
348350
pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
351+
@Override
352+
public void onSubscribe(Subscription subs) {
353+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
354+
onSubscribeLatch.close();
355+
}
349356
@Override
350357
public void onError(Throwable cause) {
351-
latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
352-
latch.close();
358+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
359+
onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
360+
onErrorlatch.close();
353361
}
354362
});
355363

356-
latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
364+
onSubscribeLatch.expectClose("Should have received onSubscribe");
365+
onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
357366

358367
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
359368
}
@@ -432,6 +441,7 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError
432441
notVerified(); // can we meaningfully test this?
433442
}
434443

444+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
435445
@Override @Test
436446
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
437447
notVerified(); // can we meaningfully test this?
@@ -440,40 +450,79 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T
440450
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
441451
@Override @Test
442452
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
443-
activePublisherTest(0, false, new PublisherTestRun<T>() {
444-
@Override
445-
public void run(Publisher<T> pub) throws Throwable {
446-
try {
447-
pub.subscribe(null);
448-
env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub));
449-
} catch (NullPointerException npe) {
450-
}
451-
env.verifyNoAsyncErrors();
453+
activePublisherTest(0, false, new PublisherTestRun<T>() {
454+
@Override
455+
public void run(Publisher<T> pub) throws Throwable {
456+
try {
457+
pub.subscribe(null);
458+
env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
459+
} catch (NullPointerException npe) {
452460
}
461+
env.verifyNoAsyncErrors();
462+
}
453463
});
454464
}
455465

456-
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.09
466+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
467+
@Override @Test
468+
public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
469+
activePublisherTest(0, false, new PublisherTestRun<T>() {
470+
@Override
471+
public void run(Publisher<T> pub) throws Throwable {
472+
final Latch onSubscribeLatch = new Latch(env);
473+
pub.subscribe(new Subscriber<T>() {
474+
@Override
475+
public void onError(Throwable cause) {
476+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
477+
}
478+
479+
@Override
480+
public void onSubscribe(Subscription subs) {
481+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
482+
onSubscribeLatch.close();
483+
}
484+
485+
@Override
486+
public void onNext(T elem) {
487+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
488+
}
489+
490+
@Override
491+
public void onComplete() {
492+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
493+
}
494+
});
495+
onSubscribeLatch.expectClose("Should have received onSubscribe");
496+
env.verifyNoAsyncErrors();
497+
}
498+
});
499+
}
500+
501+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
457502
@Override @Test
458-
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
503+
public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable {
459504
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
460505
@Override
461506
public void run(Publisher<T> pub) throws Throwable {
462507
final Latch onErrorLatch = new Latch(env);
508+
final Latch onSubscribeLatch = new Latch(env);
463509
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
464510
@Override
465511
public void onError(Throwable cause) {
512+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
466513
onErrorLatch.assertOpen("Only one onError call expected");
467514
onErrorLatch.close();
468515
}
469516

470517
@Override
471518
public void onSubscribe(Subscription subs) {
472-
env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber");
519+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
520+
onSubscribeLatch.close();
473521
}
474522
};
475523
pub.subscribe(sub);
476-
onErrorLatch.assertClosed("Should have received onError");
524+
onSubscribeLatch.expectClose("Should have received onSubscribe");
525+
onErrorLatch.expectClose("Should have received onError");
477526

478527
env.verifyNoAsyncErrors();
479528
}

0 commit comments

Comments
 (0)