Skip to content

Commit 00f4fac

Browse files
committed
Clarifies the signalling sequence in the spec and
adds TCK verification to ensure signal ordering is proper.
1 parent 9333df9 commit 00f4fac

File tree

8 files changed

+146
-20
lines changed

8 files changed

+146
-20
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme
5757
In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol:
5858

5959
```
60-
onError | (onSubscribe onNext* (onError | onComplete)?)
60+
onSubscribe onNext* (onError | onComplete)?
6161
```
6262

63+
This means that `onSubscribe` is always signalled,
64+
followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled.
65+
6366
#### NOTES
6467

6568
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
@@ -87,10 +90,10 @@ public interface Publisher<T> {
8790
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8891
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8992
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
90-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
93+
| <a name="1.9">9</a> | `Publisher.subscribe` MUST signal `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
9194
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9295
| <a name="1.11">11</a> | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. |
93-
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
96+
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` after calling `onSubscribe`. |
9497
| <a name="1.13">13</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
9598

9699
[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ private void doSubscribe() {
106106
if (iterator == null)
107107
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
108108
} catch(final Throwable t) {
109+
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.12
110+
@Override public void cancel() {}
111+
@Override public void request(long n) {}
112+
});
109113
terminateDueTo(t); // Here we send onError, obeying rule 1.12
110114
}
111115

examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivestreams.example.unicast;
22

3+
import java.lang.Override;
34
import java.util.Collections;
45
import java.util.Iterator;
56
import org.reactivestreams.Publisher;
@@ -30,7 +31,11 @@ public IterablePublisherTest() {
3031
}
3132

3233
@Override public Publisher<Integer> createErrorStatePublisher() {
33-
return null;
34+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
35+
@Override public Iterator<Integer> iterator() {
36+
throw new RuntimeException("Error state signal!");
37+
}
38+
}, e);
3439
}
3540

3641
@Override public long maxElementsFromPublisher() {

examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.testng.annotations.AfterClass;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.ExecutorService;
11+
import java.util.Iterator;
1112

1213
@Test // Must be here for TestNG to find and run this, do not remove
1314
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
@@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() {
2526
}
2627

2728
@Override public Publisher<Integer> createErrorStatePublisher() {
28-
return null;
29+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
30+
@Override public Iterator<Integer> iterator() {
31+
throw new RuntimeException("Error state signal!");
32+
}
33+
}, e);
2934
}
3035

3136
@Override public long maxElementsFromPublisher() {

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,11 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
286286
publisherVerification.optional_spec111_maySupportMultiSubscribe();
287287
}
288288

289+
@Override @Test
290+
public void required_spec112_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
291+
publisherVerification.required_spec112_mustIssueOnSubscribeForNonNullSubscriber();
292+
}
293+
289294
@Override @Test
290295
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
291296
publisherVerification.required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.testng.annotations.BeforeMethod;
1515
import org.testng.annotations.Test;
1616

17+
import java.lang.Override;
1718
import java.lang.ref.ReferenceQueue;
1819
import java.lang.ref.WeakReference;
1920
import java.util.ArrayList;
@@ -344,16 +345,24 @@ public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
344345
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
345346
@Override
346347
public void run(final Publisher<T> pub) throws InterruptedException {
347-
final Latch latch = new Latch(env);
348+
final Latch onErrorlatch = new Latch(env);
349+
final Latch onSubscribeLatch = new Latch(env);
348350
pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
351+
@Override
352+
public void onSubscribe(Subscription subs) {
353+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
354+
onSubscribeLatch.close();
355+
}
349356
@Override
350357
public void onError(Throwable cause) {
351-
latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
352-
latch.close();
358+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
359+
onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
360+
onErrorlatch.close();
353361
}
354362
});
355363

356-
latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
364+
onSubscribeLatch.expectClose("Should have received onSubscribe");
365+
onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
357366

358367
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
359368
}
@@ -432,6 +441,7 @@ public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnError
432441
notVerified(); // can we meaningfully test this?
433442
}
434443

444+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
435445
@Override @Test
436446
public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable {
437447
notVerified(); // can we meaningfully test this?
@@ -440,16 +450,16 @@ public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws T
440450
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.9
441451
@Override @Test
442452
public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable {
443-
activePublisherTest(0, false, new PublisherTestRun<T>() {
444-
@Override
445-
public void run(Publisher<T> pub) throws Throwable {
446-
try {
447-
pub.subscribe(null);
448-
env.flop(String.format("Publisher (%s) did not throw a NullPointerException when given a null Subscribe in subscribe", pub));
449-
} catch (NullPointerException npe) {
450-
}
451-
env.verifyNoAsyncErrors();
453+
activePublisherTest(0, false, new PublisherTestRun<T>() {
454+
@Override
455+
public void run(Publisher<T> pub) throws Throwable {
456+
try {
457+
pub.subscribe(null);
458+
env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
459+
} catch (NullPointerException npe) {
452460
}
461+
env.verifyNoAsyncErrors();
462+
}
453463
});
454464
}
455465

@@ -473,27 +483,66 @@ public void run(Publisher<T> pub) throws Throwable {
473483
});
474484
}
475485

486+
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
487+
@Override @Test
488+
public void required_spec112_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable {
489+
activePublisherTest(0, false, new PublisherTestRun<T>() {
490+
@Override
491+
public void run(Publisher<T> pub) throws Throwable {
492+
final Latch onSubscribeLatch = new Latch(env);
493+
pub.subscribe(new Subscriber<T>() {
494+
@Override
495+
public void onError(Throwable cause) {
496+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
497+
}
498+
499+
@Override
500+
public void onSubscribe(Subscription subs) {
501+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
502+
onSubscribeLatch.close();
503+
}
504+
505+
@Override
506+
public void onNext(T elem) {
507+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
508+
}
509+
510+
@Override
511+
public void onComplete() {
512+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
513+
}
514+
});
515+
onSubscribeLatch.expectClose("Should have received onSubscribe");
516+
env.verifyNoAsyncErrors();
517+
}
518+
});
519+
}
520+
476521
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
477522
@Override @Test
478523
public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable {
479524
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
480525
@Override
481526
public void run(Publisher<T> pub) throws Throwable {
482527
final Latch onErrorLatch = new Latch(env);
528+
final Latch onSubscribeLatch = new Latch(env);
483529
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
484530
@Override
485531
public void onError(Throwable cause) {
532+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
486533
onErrorLatch.assertOpen("Only one onError call expected");
487534
onErrorLatch.close();
488535
}
489536

490537
@Override
491538
public void onSubscribe(Subscription subs) {
492-
env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber");
539+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
540+
onSubscribeLatch.close();
493541
}
494542
};
495543
pub.subscribe(sub);
496-
onErrorLatch.assertClosed("Should have received onError");
544+
onSubscribeLatch.expectClose("Should have received onSubscribe");
545+
onErrorLatch.expectClose("Should have received onError");
497546

498547
env.verifyNoAsyncErrors();
499548
}

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public interface PublisherVerificationRules {
2424
void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable;
2525
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
2626
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
27+
void required_spec112_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable;
2728
void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe() throws Throwable;
2829
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
2930
void required_spec113_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;

tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,19 @@ public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSigna
250250
}, "Unexpected element 0 received after stream completed");
251251
}
252252

253+
@Test
254+
public void required_spec109_subscribeThrowNPEOnNullSubscriber_shouldFailIfDoesntThrowNPE() throws Throwable {
255+
requireTestFailure(new ThrowingRunnable() {
256+
@Override public void run() throws Throwable {
257+
customPublisherVerification(new Publisher<Integer>() {
258+
@Override public void subscribe(final Subscriber<? super Integer> s) {
259+
260+
}
261+
}).required_spec109_subscribeThrowNPEOnNullSubscriber();
262+
}
263+
}, "Publisher did not throw a NullPointerException when given a null Subscribe in subscribe");
264+
}
265+
253266
@Test
254267
public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice_shouldFailBy_skippingSinceOptional() throws Throwable {
255268
requireTestFailure(new ThrowingRunnable() {
@@ -264,12 +277,52 @@ public void optional_spec111_maySupportMultiSubscribe_shouldFailBy_actuallyPass(
264277
noopPublisherVerification().optional_spec111_maySupportMultiSubscribe();
265278
}
266279

280+
@Test
281+
public void required_spec112_mustIssueOnSubscribeForNonNullSubscriber_mustFailIfOnCompleteHappensFirst() throws Throwable {
282+
requireTestFailure(new ThrowingRunnable() {
283+
@Override public void run() throws Throwable {
284+
customPublisherVerification(new Publisher<Integer>() {
285+
@Override public void subscribe(Subscriber<? super Integer> s) {
286+
s.onComplete();
287+
}
288+
}).required_spec112_mustIssueOnSubscribeForNonNullSubscriber();
289+
}
290+
}, "onSubscribe should be called prior to onComplete always");
291+
}
292+
293+
@Test
294+
public void required_spec112_mustIssueOnSubscribeForNonNullSubscriber_mustFailIfOnNextHappensFirst() throws Throwable {
295+
requireTestFailure(new ThrowingRunnable() {
296+
@Override public void run() throws Throwable {
297+
customPublisherVerification(new Publisher<Integer>() {
298+
@Override public void subscribe(Subscriber<? super Integer> s) {
299+
s.onNext(1337);
300+
}
301+
}).required_spec112_mustIssueOnSubscribeForNonNullSubscriber();
302+
}
303+
}, "onSubscribe should be called prior to onNext always");
304+
}
305+
306+
@Test
307+
public void required_spec112_mustIssueOnSubscribeForNonNullSubscriber_mustFailIfOnErrorHappensFirst() throws Throwable {
308+
requireTestFailure(new ThrowingRunnable() {
309+
@Override public void run() throws Throwable {
310+
customPublisherVerification(new Publisher<Integer>() {
311+
@Override public void subscribe(Subscriber<? super Integer> s) {
312+
s.onError(new TestException());
313+
}
314+
}).required_spec112_mustIssueOnSubscribeForNonNullSubscriber();
315+
}
316+
}, "onSubscribe should be called prior to onError always");
317+
}
318+
267319
@Test
268320
public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_shouldFail() throws Throwable {
269321
requireTestFailure(new ThrowingRunnable() {
270322
@Override public void run() throws Throwable {
271323
customPublisherVerification(SKIP, new Publisher<Integer>() {
272324
@Override public void subscribe(Subscriber<? super Integer> s) {
325+
s.onSubscribe(new NoopSubscription());
273326
}
274327
}).required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
275328
}
@@ -280,6 +333,7 @@ public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwil
280333
public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_actuallyPass() throws Throwable {
281334
customPublisherVerification(SKIP, new Publisher<Integer>() {
282335
@Override public void subscribe(Subscriber<? super Integer> s) {
336+
s.onSubscribe(new NoopSubscription());
283337
s.onError(new RuntimeException("Sorry, I'm busy now. Call me later."));
284338
}
285339
}).required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();

0 commit comments

Comments
 (0)