Skip to content

Commit a946146

Browse files
committed
Attempt to clarify the signalling sequence in the spec
1 parent 9333df9 commit a946146

File tree

6 files changed

+43
-11
lines changed

6 files changed

+43
-11
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ A *Publisher* is a provider of a potentially unbounded number of sequenced eleme
5757
In response to a call to `Publisher.subscribe(Subscriber)` the possible invocation sequences for methods on the `Subscriber` are given by the following protocol:
5858

5959
```
60-
onError | (onSubscribe onNext* (onError | onComplete)?)
60+
onSubscribe onNext* (onError | onComplete)?
6161
```
6262

63+
This means that `onSubscribe` is always signalled,
64+
followed by a possibly unbounded number of `onNext` signals (as requested by `Subscriber`) followed by an `onError` signal if there is a failure, or an `onComplete` signal when no more elements are available—all as long as the `Subscription` is not cancelled.
65+
6366
#### NOTES
6467

6568
- The specifications below use binding words in capital letters from https://www.ietf.org/rfc/rfc2119.txt
@@ -87,10 +90,10 @@ public interface Publisher<T> {
8790
| <a name="1.6">6</a> | If a `Publisher` signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` MUST be considered cancelled. |
8891
| <a name="1.7">7</a> | Once a terminal state has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
8992
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
90-
| <a name="1.9">9</a> | Calling `Publisher.subscribe` MUST return normally except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject a `Subscriber`) is via the `onError` method. |
93+
| <a name="1.9">9</a> | `Publisher.subscribe` MUST signal `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
9194
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
9295
| <a name="1.11">11</a> | A `Publisher` MAY support multi-subscribe and choose whether each `Subscription` is unicast or multicast. |
93-
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`. |
96+
| <a name="1.12">12</a> | A `Publisher` MAY reject calls to its `subscribe` method if it is unable or unwilling to serve them [[1](#footnote-1-1)]. If rejecting it MUST do this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` after calling `onSubscribe`. |
9497
| <a name="1.13">13</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |
9598

9699
[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ private void doSubscribe() {
106106
if (iterator == null)
107107
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
108108
} catch(final Throwable t) {
109+
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.12
110+
@Override public void cancel() {}
111+
@Override public void request(long n) {}
112+
});
109113
terminateDueTo(t); // Here we send onError, obeying rule 1.12
110114
}
111115

examples/src/test/java/org/reactivestreams/example/unicast/IterablePublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivestreams.example.unicast;
22

3+
import java.lang.Override;
34
import java.util.Collections;
45
import java.util.Iterator;
56
import org.reactivestreams.Publisher;
@@ -30,7 +31,11 @@ public IterablePublisherTest() {
3031
}
3132

3233
@Override public Publisher<Integer> createErrorStatePublisher() {
33-
return null;
34+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
35+
@Override public Iterator<Integer> iterator() {
36+
throw new RuntimeException("Error state signal!");
37+
}
38+
}, e);
3439
}
3540

3641
@Override public long maxElementsFromPublisher() {

examples/src/test/java/org/reactivestreams/example/unicast/UnboundedIntegerIncrementPublisherTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.testng.annotations.AfterClass;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.ExecutorService;
11+
import java.util.Iterator;
1112

1213
@Test // Must be here for TestNG to find and run this, do not remove
1314
public class UnboundedIntegerIncrementPublisherTest extends PublisherVerification<Integer> {
@@ -25,7 +26,11 @@ public UnboundedIntegerIncrementPublisherTest() {
2526
}
2627

2728
@Override public Publisher<Integer> createErrorStatePublisher() {
28-
return null;
29+
return new AsyncIterablePublisher<Integer>(new Iterable<Integer>() {
30+
@Override public Iterator<Integer> iterator() {
31+
throw new RuntimeException("Error state signal!");
32+
}
33+
}, e);
2934
}
3035

3136
@Override public long maxElementsFromPublisher() {

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.testng.annotations.BeforeMethod;
1515
import org.testng.annotations.Test;
1616

17+
import java.lang.Override;
1718
import java.lang.ref.ReferenceQueue;
1819
import java.lang.ref.WeakReference;
1920
import java.util.ArrayList;
@@ -344,16 +345,24 @@ public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable {
344345
whenHasErrorPublisherTest(new PublisherTestRun<T>() {
345346
@Override
346347
public void run(final Publisher<T> pub) throws InterruptedException {
347-
final Latch latch = new Latch(env);
348+
final Latch onErrorlatch = new Latch(env);
349+
final Latch onSubscribeLatch = new Latch(env);
348350
pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) {
351+
@Override
352+
public void onSubscribe(Subscription subs) {
353+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
354+
onSubscribeLatch.close();
355+
}
349356
@Override
350357
public void onError(Throwable cause) {
351-
latch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
352-
latch.close();
358+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
359+
onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub));
360+
onErrorlatch.close();
353361
}
354362
});
355363

356-
latch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
364+
onSubscribeLatch.expectClose("Should have received onSubscribe");
365+
onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub));
357366

358367
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
359368
}
@@ -480,20 +489,24 @@ public void required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwil
480489
@Override
481490
public void run(Publisher<T> pub) throws Throwable {
482491
final Latch onErrorLatch = new Latch(env);
492+
final Latch onSubscribeLatch = new Latch(env);
483493
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) {
484494
@Override
485495
public void onError(Throwable cause) {
496+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
486497
onErrorLatch.assertOpen("Only one onError call expected");
487498
onErrorLatch.close();
488499
}
489500

490501
@Override
491502
public void onSubscribe(Subscription subs) {
492-
env.flop("onSubscribe should not be called if Publisher is unable to subscribe a Subscriber");
503+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
504+
onSubscribeLatch.close();
493505
}
494506
};
495507
pub.subscribe(sub);
496-
onErrorLatch.assertClosed("Should have received onError");
508+
onSubscribeLatch.expectClose("Should have received onSubscribe");
509+
onErrorLatch.expectClose("Should have received onError");
497510

498511
env.verifyNoAsyncErrors();
499512
}

tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwil
270270
@Override public void run() throws Throwable {
271271
customPublisherVerification(SKIP, new Publisher<Integer>() {
272272
@Override public void subscribe(Subscriber<? super Integer> s) {
273+
s.onSubscribe(new NoopSubscription());
273274
}
274275
}).required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();
275276
}
@@ -280,6 +281,7 @@ public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwil
280281
public void optional_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe_actuallyPass() throws Throwable {
281282
customPublisherVerification(SKIP, new Publisher<Integer>() {
282283
@Override public void subscribe(Subscriber<? super Integer> s) {
284+
s.onSubscribe(new NoopSubscription());
283285
s.onError(new RuntimeException("Sorry, I'm busy now. Call me later."));
284286
}
285287
}).required_spec112_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorInsteadOfOnSubscribe();

0 commit comments

Comments
 (0)