21
21
22
22
import rx .*;
23
23
import rx .Observable ;
24
- import rx .exceptions .Exceptions ;
25
- import rx .exceptions .OnErrorThrowable ;
24
+ import rx .exceptions .*;
26
25
import rx .functions .*;
26
+ import rx .internal .util .OpenHashSet ;
27
27
import rx .observables .ConnectableObservable ;
28
28
import rx .schedulers .Timestamped ;
29
29
import rx .subscriptions .Subscriptions ;
@@ -303,8 +303,16 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
303
303
/** Indicates a terminated ReplaySubscriber. */
304
304
static final InnerProducer [] TERMINATED = new InnerProducer [0 ];
305
305
306
- /** Tracks the subscribed producers. */
307
- final AtomicReference <InnerProducer []> producers ;
306
+ /** Indicates no further InnerProducers are accepted. */
307
+ volatile boolean terminated ;
308
+ /** Tracks the subscribed producers. Guarded by itself. */
309
+ final OpenHashSet <InnerProducer <T >> producers ;
310
+ /** Contains a copy of the producers. Modified only from the source side. */
311
+ InnerProducer <T >[] producersCache ;
312
+ /** Contains number of modifications to the producers set.*/
313
+ volatile long producersVersion ;
314
+ /** Contains the number of modifications that the producersCache holds. */
315
+ long producersCacheVersion ;
308
316
/**
309
317
* Atomically changed from false to true by connect to make sure the
310
318
* connection is only performed by one thread.
@@ -324,12 +332,19 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
324
332
/** The upstream producer. */
325
333
volatile Producer producer ;
326
334
335
+ /** The queue that holds producers with request changes that need to be coordinated. */
336
+ List <InnerProducer <T >> coordinationQueue ;
337
+ /** Indicate that all request amounts should be considered. */
338
+ boolean coordinateAll ;
339
+
340
+ @ SuppressWarnings ("unchecked" )
327
341
public ReplaySubscriber (AtomicReference <ReplaySubscriber <T >> current ,
328
342
ReplayBuffer <T > buffer ) {
329
343
this .buffer = buffer ;
330
344
331
345
this .nl = NotificationLite .instance ();
332
- this .producers = new AtomicReference <InnerProducer []>(EMPTY );
346
+ this .producers = new OpenHashSet <InnerProducer <T >>();
347
+ this .producersCache = EMPTY ;
333
348
this .shouldConnect = new AtomicBoolean ();
334
349
// make sure the source doesn't produce values until the child subscribers
335
350
// expressed their request amounts
@@ -340,7 +355,15 @@ void init() {
340
355
add (Subscriptions .create (new Action0 () {
341
356
@ Override
342
357
public void call () {
343
- ReplaySubscriber .this .producers .getAndSet (TERMINATED );
358
+ if (!terminated ) {
359
+ synchronized (producers ) {
360
+ if (!terminated ) {
361
+ producers .terminate ();
362
+ producersVersion ++;
363
+ terminated = true ;
364
+ }
365
+ }
366
+ }
344
367
// unlike OperatorPublish, we can't null out the terminated so
345
368
// late subscribers can still get replay
346
369
// current.compareAndSet(ReplaySubscriber.this, null);
@@ -359,76 +382,34 @@ boolean add(InnerProducer<T> producer) {
359
382
if (producer == null ) {
360
383
throw new NullPointerException ();
361
384
}
362
- // the state can change so we do a CAS loop to achieve atomicity
363
- for (;;) {
364
- // get the current producer array
365
- InnerProducer [] c = producers .get ();
366
- // if this subscriber-to-source reached a terminal state by receiving
367
- // an onError or onCompleted, just refuse to add the new producer
368
- if (c == TERMINATED ) {
385
+ if (terminated ) {
386
+ return false ;
387
+ }
388
+ synchronized (producers ) {
389
+ if (terminated ) {
369
390
return false ;
370
391
}
371
- // we perform a copy-on-write logic
372
- int len = c .length ;
373
- InnerProducer [] u = new InnerProducer [len + 1 ];
374
- System .arraycopy (c , 0 , u , 0 , len );
375
- u [len ] = producer ;
376
- // try setting the producers array
377
- if (producers .compareAndSet (c , u )) {
378
- return true ;
379
- }
380
- // if failed, some other operation succeeded (another add, remove or termination)
381
- // so retry
392
+
393
+ producers .add (producer );
394
+ producersVersion ++;
382
395
}
396
+ return true ;
383
397
}
384
398
385
399
/**
386
400
* Atomically removes the given producer from the producers array.
387
401
* @param producer the producer to remove
388
402
*/
389
403
void remove (InnerProducer <T > producer ) {
390
- // the state can change so we do a CAS loop to achieve atomicity
391
- for (;;) {
392
- // let's read the current producers array
393
- InnerProducer [] c = producers .get ();
394
- // if it is either empty or terminated, there is nothing to remove so we quit
395
- if (c == EMPTY || c == TERMINATED ) {
396
- return ;
397
- }
398
- // let's find the supplied producer in the array
399
- // although this is O(n), we don't expect too many child subscribers in general
400
- int j = -1 ;
401
- int len = c .length ;
402
- for (int i = 0 ; i < len ; i ++) {
403
- if (c [i ].equals (producer )) {
404
- j = i ;
405
- break ;
406
- }
407
- }
408
- // we didn't find it so just quit
409
- if (j < 0 ) {
410
- return ;
411
- }
412
- // we do copy-on-write logic here
413
- InnerProducer [] u ;
414
- // we don't create a new empty array if producer was the single inhabitant
415
- // but rather reuse an empty array
416
- if (len == 1 ) {
417
- u = EMPTY ;
418
- } else {
419
- // otherwise, create a new array one less in size
420
- u = new InnerProducer [len - 1 ];
421
- // copy elements being before the given producer
422
- System .arraycopy (c , 0 , u , 0 , j );
423
- // copy elements being after the given producer
424
- System .arraycopy (c , j + 1 , u , j , len - j - 1 );
425
- }
426
- // try setting this new array as
427
- if (producers .compareAndSet (c , u )) {
404
+ if (terminated ) {
405
+ return ;
406
+ }
407
+ synchronized (producers ) {
408
+ if (terminated ) {
428
409
return ;
429
410
}
430
- // if we failed, it means something else happened
431
- // (a concurrent add/remove or termination), we need to retry
411
+ producers . remove ( producer );
412
+ producersVersion ++;
432
413
}
433
414
}
434
415
@@ -439,7 +420,7 @@ public void setProducer(Producer p) {
439
420
throw new IllegalStateException ("Only a single producer can be set on a Subscriber." );
440
421
}
441
422
producer = p ;
442
- manageRequests ();
423
+ manageRequests (null );
443
424
replay ();
444
425
}
445
426
@@ -482,81 +463,157 @@ public void onCompleted() {
482
463
/**
483
464
* Coordinates the request amounts of various child Subscribers.
484
465
*/
485
- void manageRequests () {
466
+ void manageRequests (InnerProducer < T > inner ) {
486
467
// if the upstream has completed, no more requesting is possible
487
468
if (isUnsubscribed ()) {
488
469
return ;
489
470
}
490
471
synchronized (this ) {
491
472
if (emitting ) {
473
+ if (inner != null ) {
474
+ List <InnerProducer <T >> q = coordinationQueue ;
475
+ if (q == null ) {
476
+ q = new ArrayList <InnerProducer <T >>();
477
+ coordinationQueue = q ;
478
+ }
479
+ q .add (inner );
480
+ } else {
481
+ coordinateAll = true ;
482
+ }
492
483
missed = true ;
493
484
return ;
494
485
}
495
486
emitting = true ;
496
487
}
488
+
489
+ long ri = maxChildRequested ;
490
+ long maxTotalRequested ;
491
+
492
+ if (inner != null ) {
493
+ maxTotalRequested = Math .max (ri , inner .totalRequested .get ());
494
+ } else {
495
+ maxTotalRequested = ri ;
496
+
497
+ InnerProducer <T >[] a = copyProducers ();
498
+ for (InnerProducer <T > rp : a ) {
499
+ if (rp != null ) {
500
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
501
+ }
502
+ }
503
+
504
+ }
505
+ makeRequest (maxTotalRequested , ri );
506
+
497
507
for (;;) {
498
508
// if the upstream has completed, no more requesting is possible
499
509
if (isUnsubscribed ()) {
500
510
return ;
501
511
}
502
512
503
- @ SuppressWarnings ("unchecked" )
504
- InnerProducer <T >[] a = producers .get ();
505
-
506
- long ri = maxChildRequested ;
507
- long maxTotalRequests = ri ;
508
-
509
- for (InnerProducer <T > rp : a ) {
510
- maxTotalRequests = Math .max (maxTotalRequests , rp .totalRequested .get ());
513
+ List <InnerProducer <T >> q ;
514
+ boolean all ;
515
+ synchronized (this ) {
516
+ if (!missed ) {
517
+ emitting = false ;
518
+ return ;
519
+ }
520
+ missed = false ;
521
+ q = coordinationQueue ;
522
+ coordinationQueue = null ;
523
+ all = coordinateAll ;
524
+ coordinateAll = false ;
511
525
}
512
526
513
- long ur = maxUpstreamRequested ;
514
- Producer p = producer ;
527
+ ri = maxChildRequested ;
528
+ maxTotalRequested = ri ;
515
529
516
- long diff = maxTotalRequests - ri ;
517
- if (diff != 0 ) {
518
- maxChildRequested = maxTotalRequests ;
519
- if (p != null ) {
520
- if (ur != 0L ) {
521
- maxUpstreamRequested = 0L ;
522
- p .request (ur + diff );
523
- } else {
524
- p .request (diff );
525
- }
526
- } else {
527
- // collect upstream request amounts until there is a producer for them
528
- long u = ur + diff ;
529
- if (u < 0 ) {
530
- u = Long .MAX_VALUE ;
530
+ if (q != null ) {
531
+ for (InnerProducer <T > rp : q ) {
532
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
533
+ }
534
+ }
535
+
536
+ if (all ) {
537
+ InnerProducer <T >[] a = copyProducers ();
538
+ for (InnerProducer <T > rp : a ) {
539
+ if (rp != null ) {
540
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
531
541
}
532
- maxUpstreamRequested = u ;
533
542
}
534
- } else
535
- // if there were outstanding upstream requests and we have a producer
536
- if (ur != 0L && p != null ) {
537
- maxUpstreamRequested = 0L ;
538
- // fire the accumulated requests
539
- p .request (ur );
540
543
}
541
544
542
- synchronized (this ) {
543
- if (!missed ) {
544
- emitting = false ;
545
- return ;
545
+ makeRequest (maxTotalRequested , ri );
546
+ }
547
+ }
548
+
549
+ InnerProducer <T >[] copyProducers () {
550
+ synchronized (producers ) {
551
+ Object [] a = producers .values ();
552
+ int n = a .length ;
553
+ @ SuppressWarnings ("unchecked" )
554
+ InnerProducer <T >[] result = new InnerProducer [n ];
555
+ System .arraycopy (a , 0 , result , 0 , n );
556
+ return result ;
557
+ }
558
+ }
559
+
560
+ void makeRequest (long maxTotalRequests , long previousTotalRequests ) {
561
+ long ur = maxUpstreamRequested ;
562
+ Producer p = producer ;
563
+
564
+ long diff = maxTotalRequests - previousTotalRequests ;
565
+ if (diff != 0 ) {
566
+ maxChildRequested = maxTotalRequests ;
567
+ if (p != null ) {
568
+ if (ur != 0L ) {
569
+ maxUpstreamRequested = 0L ;
570
+ p .request (ur + diff );
571
+ } else {
572
+ p .request (diff );
546
573
}
547
- missed = false ;
574
+ } else {
575
+ // collect upstream request amounts until there is a producer for them
576
+ long u = ur + diff ;
577
+ if (u < 0 ) {
578
+ u = Long .MAX_VALUE ;
579
+ }
580
+ maxUpstreamRequested = u ;
548
581
}
582
+ } else
583
+ // if there were outstanding upstream requests and we have a producer
584
+ if (ur != 0L && p != null ) {
585
+ maxUpstreamRequested = 0L ;
586
+ // fire the accumulated requests
587
+ p .request (ur );
549
588
}
550
589
}
551
590
552
591
/**
553
592
* Tries to replay the buffer contents to all known subscribers.
554
593
*/
594
+ @ SuppressWarnings ("unchecked" )
555
595
void replay () {
556
- @ SuppressWarnings ("unchecked" )
557
- InnerProducer <T >[] a = producers .get ();
558
- for (InnerProducer <T > rp : a ) {
559
- buffer .replay (rp );
596
+ InnerProducer <T >[] pc = producersCache ;
597
+ if (producersCacheVersion != producersVersion ) {
598
+ synchronized (producers ) {
599
+ pc = producersCache ;
600
+ // if the producers hasn't changed do nothing
601
+ // otherwise make a copy of the current set of producers
602
+ Object [] a = producers .values ();
603
+ int n = a .length ;
604
+ if (pc .length != n ) {
605
+ pc = new InnerProducer [n ];
606
+ producersCache = pc ;
607
+ }
608
+ System .arraycopy (a , 0 , pc , 0 , n );
609
+ producersCacheVersion = producersVersion ;
610
+ }
611
+ }
612
+ ReplayBuffer <T > b = buffer ;
613
+ for (InnerProducer <T > rp : pc ) {
614
+ if (rp != null ) {
615
+ b .replay (rp );
616
+ }
560
617
}
561
618
}
562
619
}
@@ -635,7 +692,7 @@ public void request(long n) {
635
692
addTotalRequested (n );
636
693
// if successful, notify the parent dispatcher this child can receive more
637
694
// elements
638
- parent .manageRequests ();
695
+ parent .manageRequests (this );
639
696
640
697
parent .buffer .replay (this );
641
698
return ;
@@ -716,7 +773,7 @@ public void unsubscribe() {
716
773
// let's assume this child had 0 requested before the unsubscription while
717
774
// the others had non-zero. By removing this 'blocking' child, the others
718
775
// are now free to receive events
719
- parent .manageRequests ();
776
+ parent .manageRequests (this );
720
777
}
721
778
}
722
779
}
@@ -856,8 +913,6 @@ public void replay(InnerProducer<T> output) {
856
913
857
914
/**
858
915
* Represents a node in a bounded replay buffer's linked list.
859
- *
860
- * @param <T> the contained value type
861
916
*/
862
917
static final class Node extends AtomicReference <Node > {
863
918
/** */
0 commit comments