Description
I'd like to implement a publisher which resubscribes to a source publisher a number of times (unless an error happens in one of those runs). The optimization is to only have a single instance of the mediator Subscriber
which is always resubscribed (trampolined to prevent SO) to the source when the previous subscription fires onComplete
. However, §2.12 forbids this reuse and if the source enforces this in some way (i.e., storing the current set of subscribers in a thread-safe set and removing the completed one after the call to onComplete
) the repeat can't work.
Here is the code of the publisher I'm talking about.
public final class PublisherRepeat<T> implements Publisher<T> {
final Publisher<? extends T> source;
final long count;
public PublisherRepeat(Publisher<? extends T> source, long count) {
this.source = source;
this.count = count;
}
@Override
public void subscribe(Subscriber<? super T> s) {
SubscriptionArbiter sa = new SubscriptionArbiter();
s.onSubscribe(sa);
RepeatSubscriber<T> rs = new RepeatSubscriber<>(s, count, sa, source);
rs.subscribeNext();
}
static final class RepeatSubscriber<T> extends AtomicInteger implements Subscriber<T> {
/** */
private static final long serialVersionUID = -7098360935104053232L;
final Subscriber<? super T> actual;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
long remaining;
public RepeatSubscriber(Subscriber<? super T> actual, long count,
SubscriptionArbiter sa, Publisher<? extends T> source) {
this.actual = actual;
this.sa = sa;
this.source = source;
this.remaining = count;
}
@Override
public void onSubscribe(Subscription s) {
sa.setSubscription(s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
long r = remaining;
if (r != Long.MAX_VALUE) {
remaining = r - 1;
}
if (r != 0L) {
subscribeNext();
} else {
actual.onComplete();
}
}
/**
* Subscribes to the source again via trampolining.
*/
void subscribeNext() {
if (getAndIncrement() == 0) {
int missed = 1;
for (;;) {
source.subscribe(this);
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
}
}
(SubscriptionArbiter
makes sure any unfulfilled request is re-requested from the Publisher in the next turn.)
Therefore, this object equality requirement gives me trouble. Do I interpret §2.12 correctly or rule §2.4 and/or §1.6 actually allows this, i.e., reuse is allowed the moment the publisher is about to call onError
or onComplete
?
(This also affects operators such as concat
and retry
where the terminal event triggers a new subscription with the same mediator Subscriber
).