@@ -320,6 +320,11 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
320
320
/** The upstream producer. */
321
321
volatile Producer producer ;
322
322
323
+ /** The queue that holds producers with request changes that need to be coordinated. */
324
+ List <InnerProducer <T >> coordinationQueue ;
325
+ /** Indicate that all request amounts should be considered. */
326
+ boolean coordinateAll ;
327
+
323
328
public ReplaySubscriber (AtomicReference <ReplaySubscriber <T >> current ,
324
329
ReplayBuffer <T > buffer ) {
325
330
this .buffer = buffer ;
@@ -435,7 +440,7 @@ public void setProducer(Producer p) {
435
440
throw new IllegalStateException ("Only a single producer can be set on a Subscriber." );
436
441
}
437
442
producer = p ;
438
- manageRequests ();
443
+ manageRequests (null );
439
444
replay ();
440
445
}
441
446
@@ -478,70 +483,115 @@ public void onCompleted() {
478
483
/**
479
484
* Coordinates the request amounts of various child Subscribers.
480
485
*/
481
- void manageRequests () {
486
+ void manageRequests (InnerProducer < T > inner ) {
482
487
// if the upstream has completed, no more requesting is possible
483
488
if (isUnsubscribed ()) {
484
489
return ;
485
490
}
486
491
synchronized (this ) {
487
492
if (emitting ) {
493
+ if (inner != null ) {
494
+ List <InnerProducer <T >> q = coordinationQueue ;
495
+ if (q == null ) {
496
+ q = new ArrayList <InnerProducer <T >>();
497
+ coordinationQueue = q ;
498
+ }
499
+ q .add (inner );
500
+ } else {
501
+ coordinateAll = true ;
502
+ }
488
503
missed = true ;
489
504
return ;
490
505
}
491
506
emitting = true ;
492
507
}
508
+
509
+ long ri = maxChildRequested ;
510
+ long maxTotalRequested ;
511
+
512
+ if (inner != null ) {
513
+ maxTotalRequested = Math .max (ri , inner .totalRequested .get ());
514
+ } else {
515
+ maxTotalRequested = ri ;
516
+
517
+ @ SuppressWarnings ("unchecked" )
518
+ InnerProducer <T >[] a = producers .get ();
519
+ for (InnerProducer <T > rp : a ) {
520
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
521
+ }
522
+
523
+ }
524
+ makeRequest (maxTotalRequested , ri );
525
+
493
526
for (;;) {
494
527
// if the upstream has completed, no more requesting is possible
495
528
if (isUnsubscribed ()) {
496
529
return ;
497
530
}
498
531
499
- @ SuppressWarnings ("unchecked" )
500
- InnerProducer <T >[] a = producers .get ();
532
+ List <InnerProducer <T >> q ;
533
+ boolean all ;
534
+ synchronized (this ) {
535
+ if (!missed ) {
536
+ emitting = false ;
537
+ return ;
538
+ }
539
+ missed = false ;
540
+ q = coordinationQueue ;
541
+ coordinationQueue = null ;
542
+ all = coordinateAll ;
543
+ coordinateAll = false ;
544
+ }
501
545
502
- long ri = maxChildRequested ;
503
- long maxTotalRequests = ri ;
546
+ ri = maxChildRequested ;
547
+ maxTotalRequested = ri ;
504
548
505
- for (InnerProducer <T > rp : a ) {
506
- maxTotalRequests = Math .max (maxTotalRequests , rp .totalRequested .get ());
549
+ if (q != null ) {
550
+ for (InnerProducer <T > rp : q ) {
551
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
552
+ }
553
+ }
554
+
555
+ if (all ) {
556
+ @ SuppressWarnings ("unchecked" )
557
+ InnerProducer <T >[] a = producers .get ();
558
+ for (InnerProducer <T > rp : a ) {
559
+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
560
+ }
507
561
}
508
562
509
- long ur = maxUpstreamRequested ;
510
- Producer p = producer ;
563
+ makeRequest (maxTotalRequested , ri );
564
+ }
565
+ }
566
+
567
+ void makeRequest (long maxTotalRequests , long previousTotalRequests ) {
568
+ long ur = maxUpstreamRequested ;
569
+ Producer p = producer ;
511
570
512
- long diff = maxTotalRequests - ri ;
513
- if (diff != 0 ) {
514
- maxChildRequested = maxTotalRequests ;
515
- if (p != null ) {
516
- if (ur != 0L ) {
517
- maxUpstreamRequested = 0L ;
518
- p .request (ur + diff );
519
- } else {
520
- p .request (diff );
521
- }
571
+ long diff = maxTotalRequests - previousTotalRequests ;
572
+ if (diff != 0 ) {
573
+ maxChildRequested = maxTotalRequests ;
574
+ if (p != null ) {
575
+ if (ur != 0L ) {
576
+ maxUpstreamRequested = 0L ;
577
+ p .request (ur + diff );
522
578
} else {
523
- // collect upstream request amounts until there is a producer for them
524
- long u = ur + diff ;
525
- if (u < 0 ) {
526
- u = Long .MAX_VALUE ;
527
- }
528
- maxUpstreamRequested = u ;
579
+ p .request (diff );
529
580
}
530
- } else
531
- // if there were outstanding upstream requests and we have a producer
532
- if (ur != 0L && p != null ) {
533
- maxUpstreamRequested = 0L ;
534
- // fire the accumulated requests
535
- p .request (ur );
536
- }
537
-
538
- synchronized (this ) {
539
- if (!missed ) {
540
- emitting = false ;
541
- return ;
581
+ } else {
582
+ // collect upstream request amounts until there is a producer for them
583
+ long u = ur + diff ;
584
+ if (u < 0 ) {
585
+ u = Long .MAX_VALUE ;
542
586
}
543
- missed = false ;
587
+ maxUpstreamRequested = u ;
544
588
}
589
+ } else
590
+ // if there were outstanding upstream requests and we have a producer
591
+ if (ur != 0L && p != null ) {
592
+ maxUpstreamRequested = 0L ;
593
+ // fire the accumulated requests
594
+ p .request (ur );
545
595
}
546
596
}
547
597
@@ -631,7 +681,7 @@ public void request(long n) {
631
681
addTotalRequested (n );
632
682
// if successful, notify the parent dispacher this child can receive more
633
683
// elements
634
- parent .manageRequests ();
684
+ parent .manageRequests (this );
635
685
636
686
parent .buffer .replay (this );
637
687
return ;
@@ -712,7 +762,7 @@ public void unsubscribe() {
712
762
// let's assume this child had 0 requested before the unsubscription while
713
763
// the others had non-zero. By removing this 'blocking' child, the others
714
764
// are now free to receive events
715
- parent .manageRequests ();
765
+ parent .manageRequests (this );
716
766
}
717
767
}
718
768
}
0 commit comments