Skip to content

Commit 464568d

Browse files
committed
1.x: replay now has O(1) subscription and O(1) request coordination cost
1 parent 92fe02d commit 464568d

File tree

3 files changed

+433
-111
lines changed

3 files changed

+433
-111
lines changed

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 166 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import rx.*;
2323
import rx.Observable;
24-
import rx.exceptions.Exceptions;
25-
import rx.exceptions.OnErrorThrowable;
24+
import rx.exceptions.*;
2625
import rx.functions.*;
26+
import rx.internal.util.OpenHashSet;
2727
import rx.observables.ConnectableObservable;
2828
import rx.schedulers.Timestamped;
2929
import rx.subscriptions.Subscriptions;
@@ -299,8 +299,16 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
299299
/** Indicates a terminated ReplaySubscriber. */
300300
static final InnerProducer[] TERMINATED = new InnerProducer[0];
301301

302-
/** Tracks the subscribed producers. */
303-
final AtomicReference<InnerProducer[]> producers;
302+
/** Indicates no further InnerProducers are accepted. */
303+
volatile boolean terminated;
304+
/** Tracks the subscribed producers. Guarded by itself. */
305+
final OpenHashSet<InnerProducer<T>> producers;
306+
/** Contains a copy of the producers. Modified only from the source side. */
307+
InnerProducer<T>[] producersCache;
308+
/** Contains number of modifications to the producers set.*/
309+
volatile long producersVersion;
310+
/** Contains the number of modifications that the producersCache holds. */
311+
long producersCacheVersion;
304312
/**
305313
* Atomically changed from false to true by connect to make sure the
306314
* connection is only performed by one thread.
@@ -320,12 +328,19 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
320328
/** The upstream producer. */
321329
volatile Producer producer;
322330

331+
/** The queue that holds producers with request changes that need to be coordinated. */
332+
List<InnerProducer<T>> coordinationQueue;
333+
/** Indicate that all request amounts should be considered. */
334+
boolean coordinateAll;
335+
336+
@SuppressWarnings("unchecked")
323337
public ReplaySubscriber(AtomicReference<ReplaySubscriber<T>> current,
324338
ReplayBuffer<T> buffer) {
325339
this.buffer = buffer;
326340

327341
this.nl = NotificationLite.instance();
328-
this.producers = new AtomicReference<InnerProducer[]>(EMPTY);
342+
this.producers = new OpenHashSet<InnerProducer<T>>();
343+
this.producersCache = EMPTY;
329344
this.shouldConnect = new AtomicBoolean();
330345
// make sure the source doesn't produce values until the child subscribers
331346
// expressed their request amounts
@@ -336,7 +351,15 @@ void init() {
336351
add(Subscriptions.create(new Action0() {
337352
@Override
338353
public void call() {
339-
ReplaySubscriber.this.producers.getAndSet(TERMINATED);
354+
if (!terminated) {
355+
synchronized (producers) {
356+
if (!terminated) {
357+
producers.terminate();
358+
producersVersion++;
359+
terminated = true;
360+
}
361+
}
362+
}
340363
// unlike OperatorPublish, we can't null out the terminated so
341364
// late subscribers can still get replay
342365
// current.compareAndSet(ReplaySubscriber.this, null);
@@ -355,76 +378,34 @@ boolean add(InnerProducer<T> producer) {
355378
if (producer == null) {
356379
throw new NullPointerException();
357380
}
358-
// the state can change so we do a CAS loop to achieve atomicity
359-
for (;;) {
360-
// get the current producer array
361-
InnerProducer[] c = producers.get();
362-
// if this subscriber-to-source reached a terminal state by receiving
363-
// an onError or onCompleted, just refuse to add the new producer
364-
if (c == TERMINATED) {
381+
if (terminated) {
382+
return false;
383+
}
384+
synchronized (producers) {
385+
if (terminated) {
365386
return false;
366387
}
367-
// we perform a copy-on-write logic
368-
int len = c.length;
369-
InnerProducer[] u = new InnerProducer[len + 1];
370-
System.arraycopy(c, 0, u, 0, len);
371-
u[len] = producer;
372-
// try setting the producers array
373-
if (producers.compareAndSet(c, u)) {
374-
return true;
375-
}
376-
// if failed, some other operation succeded (another add, remove or termination)
377-
// so retry
388+
389+
producers.add(producer);
390+
producersVersion++;
378391
}
392+
return true;
379393
}
380394

381395
/**
382396
* Atomically removes the given producer from the producers array.
383397
* @param producer the producer to remove
384398
*/
385399
void remove(InnerProducer<T> producer) {
386-
// the state can change so we do a CAS loop to achieve atomicity
387-
for (;;) {
388-
// let's read the current producers array
389-
InnerProducer[] c = producers.get();
390-
// if it is either empty or terminated, there is nothing to remove so we quit
391-
if (c == EMPTY || c == TERMINATED) {
392-
return;
393-
}
394-
// let's find the supplied producer in the array
395-
// although this is O(n), we don't expect too many child subscribers in general
396-
int j = -1;
397-
int len = c.length;
398-
for (int i = 0; i < len; i++) {
399-
if (c[i].equals(producer)) {
400-
j = i;
401-
break;
402-
}
403-
}
404-
// we didn't find it so just quit
405-
if (j < 0) {
406-
return;
407-
}
408-
// we do copy-on-write logic here
409-
InnerProducer[] u;
410-
// we don't create a new empty array if producer was the single inhabitant
411-
// but rather reuse an empty array
412-
if (len == 1) {
413-
u = EMPTY;
414-
} else {
415-
// otherwise, create a new array one less in size
416-
u = new InnerProducer[len - 1];
417-
// copy elements being before the given producer
418-
System.arraycopy(c, 0, u, 0, j);
419-
// copy elements being after the given producer
420-
System.arraycopy(c, j + 1, u, j, len - j - 1);
421-
}
422-
// try setting this new array as
423-
if (producers.compareAndSet(c, u)) {
400+
if (terminated) {
401+
return;
402+
}
403+
synchronized (producers) {
404+
if (terminated) {
424405
return;
425406
}
426-
// if we failed, it means something else happened
427-
// (a concurrent add/remove or termination), we need to retry
407+
producers.remove(producer);
408+
producersVersion++;
428409
}
429410
}
430411

@@ -435,7 +416,7 @@ public void setProducer(Producer p) {
435416
throw new IllegalStateException("Only a single producer can be set on a Subscriber.");
436417
}
437418
producer = p;
438-
manageRequests();
419+
manageRequests(null);
439420
replay();
440421
}
441422

@@ -478,81 +459,157 @@ public void onCompleted() {
478459
/**
479460
* Coordinates the request amounts of various child Subscribers.
480461
*/
481-
void manageRequests() {
462+
void manageRequests(InnerProducer<T> inner) {
482463
// if the upstream has completed, no more requesting is possible
483464
if (isUnsubscribed()) {
484465
return;
485466
}
486467
synchronized (this) {
487468
if (emitting) {
469+
if (inner != null) {
470+
List<InnerProducer<T>> q = coordinationQueue;
471+
if (q == null) {
472+
q = new ArrayList<InnerProducer<T>>();
473+
coordinationQueue = q;
474+
}
475+
q.add(inner);
476+
} else {
477+
coordinateAll = true;
478+
}
488479
missed = true;
489480
return;
490481
}
491482
emitting = true;
492483
}
484+
485+
long ri = maxChildRequested;
486+
long maxTotalRequested;
487+
488+
if (inner != null) {
489+
maxTotalRequested = Math.max(ri, inner.totalRequested.get());
490+
} else {
491+
maxTotalRequested = ri;
492+
493+
InnerProducer<T>[] a = copyProducers();
494+
for (InnerProducer<T> rp : a) {
495+
if (rp != null) {
496+
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
497+
}
498+
}
499+
500+
}
501+
makeRequest(maxTotalRequested, ri);
502+
493503
for (;;) {
494504
// if the upstream has completed, no more requesting is possible
495505
if (isUnsubscribed()) {
496506
return;
497507
}
498508

499-
@SuppressWarnings("unchecked")
500-
InnerProducer<T>[] a = producers.get();
501-
502-
long ri = maxChildRequested;
503-
long maxTotalRequests = ri;
504-
505-
for (InnerProducer<T> rp : a) {
506-
maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get());
509+
List<InnerProducer<T>> q;
510+
boolean all;
511+
synchronized (this) {
512+
if (!missed) {
513+
emitting = false;
514+
return;
515+
}
516+
missed = false;
517+
q = coordinationQueue;
518+
coordinationQueue = null;
519+
all = coordinateAll;
520+
coordinateAll = false;
507521
}
508522

509-
long ur = maxUpstreamRequested;
510-
Producer p = producer;
523+
ri = maxChildRequested;
524+
maxTotalRequested = ri;
511525

512-
long diff = maxTotalRequests - ri;
513-
if (diff != 0) {
514-
maxChildRequested = maxTotalRequests;
515-
if (p != null) {
516-
if (ur != 0L) {
517-
maxUpstreamRequested = 0L;
518-
p.request(ur + diff);
519-
} else {
520-
p.request(diff);
521-
}
522-
} else {
523-
// collect upstream request amounts until there is a producer for them
524-
long u = ur + diff;
525-
if (u < 0) {
526-
u = Long.MAX_VALUE;
526+
if (q != null) {
527+
for (InnerProducer<T> rp : q) {
528+
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
529+
}
530+
}
531+
532+
if (all) {
533+
InnerProducer<T>[] a = copyProducers();
534+
for (InnerProducer<T> rp : a) {
535+
if (rp != null) {
536+
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
527537
}
528-
maxUpstreamRequested = u;
529538
}
530-
} else
531-
// if there were outstanding upstream requests and we have a producer
532-
if (ur != 0L && p != null) {
533-
maxUpstreamRequested = 0L;
534-
// fire the accumulated requests
535-
p.request(ur);
536539
}
537540

538-
synchronized (this) {
539-
if (!missed) {
540-
emitting = false;
541-
return;
541+
makeRequest(maxTotalRequested, ri);
542+
}
543+
}
544+
545+
InnerProducer<T>[] copyProducers() {
546+
synchronized (producers) {
547+
Object[] a = producers.values();
548+
int n = a.length;
549+
@SuppressWarnings("unchecked")
550+
InnerProducer<T>[] result = new InnerProducer[n];
551+
System.arraycopy(a, 0, result, 0, n);
552+
return result;
553+
}
554+
}
555+
556+
void makeRequest(long maxTotalRequests, long previousTotalRequests) {
557+
long ur = maxUpstreamRequested;
558+
Producer p = producer;
559+
560+
long diff = maxTotalRequests - previousTotalRequests;
561+
if (diff != 0) {
562+
maxChildRequested = maxTotalRequests;
563+
if (p != null) {
564+
if (ur != 0L) {
565+
maxUpstreamRequested = 0L;
566+
p.request(ur + diff);
567+
} else {
568+
p.request(diff);
542569
}
543-
missed = false;
570+
} else {
571+
// collect upstream request amounts until there is a producer for them
572+
long u = ur + diff;
573+
if (u < 0) {
574+
u = Long.MAX_VALUE;
575+
}
576+
maxUpstreamRequested = u;
544577
}
578+
} else
579+
// if there were outstanding upstream requests and we have a producer
580+
if (ur != 0L && p != null) {
581+
maxUpstreamRequested = 0L;
582+
// fire the accumulated requests
583+
p.request(ur);
545584
}
546585
}
547586

548587
/**
549588
* Tries to replay the buffer contents to all known subscribers.
550589
*/
590+
@SuppressWarnings("unchecked")
551591
void replay() {
552-
@SuppressWarnings("unchecked")
553-
InnerProducer<T>[] a = producers.get();
554-
for (InnerProducer<T> rp : a) {
555-
buffer.replay(rp);
592+
InnerProducer<T>[] pc = producersCache;
593+
if (producersCacheVersion != producersVersion) {
594+
synchronized (producers) {
595+
pc = producersCache;
596+
// if the producers hasn't changed do nothing
597+
// otherwise make a copy of the current set of producers
598+
Object[] a = producers.values();
599+
int n = a.length;
600+
if (pc.length != n) {
601+
pc = new InnerProducer[n];
602+
producersCache = pc;
603+
}
604+
System.arraycopy(a, 0, pc, 0, n);
605+
producersCacheVersion = producersVersion;
606+
}
607+
}
608+
ReplayBuffer<T> b = buffer;
609+
for (InnerProducer<T> rp : pc) {
610+
if (rp != null) {
611+
b.replay(rp);
612+
}
556613
}
557614
}
558615
}
@@ -631,7 +688,7 @@ public void request(long n) {
631688
addTotalRequested(n);
632689
// if successful, notify the parent dispacher this child can receive more
633690
// elements
634-
parent.manageRequests();
691+
parent.manageRequests(this);
635692

636693
parent.buffer.replay(this);
637694
return;
@@ -712,7 +769,7 @@ public void unsubscribe() {
712769
// let's assume this child had 0 requested before the unsubscription while
713770
// the others had non-zero. By removing this 'blocking' child, the others
714771
// are now free to receive events
715-
parent.manageRequests();
772+
parent.manageRequests(this);
716773
}
717774
}
718775
}
@@ -852,8 +909,6 @@ public void replay(InnerProducer<T> output) {
852909

853910
/**
854911
* Represents a node in a bounded replay buffer's linked list.
855-
*
856-
* @param <T> the contained value type
857912
*/
858913
static final class Node extends AtomicReference<Node> {
859914
/** */

0 commit comments

Comments
 (0)