Skip to content

Commit 5e5d5a2

Browse files
authored
2.x: Fix excess item retention in the other replay components (#5898)
1 parent 9281281 commit 5e5d5a2

File tree

6 files changed

+548
-8
lines changed

6 files changed

+548
-8
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,15 @@ public final void complete() {
806806
truncateFinal();
807807
}
808808

809+
final void trimHead() {
810+
Node head = get();
811+
if (head.value != null) {
812+
Node n = new Node(null, 0L);
813+
n.lazySet(head.get());
814+
set(n);
815+
}
816+
}
817+
809818
@Override
810819
public final void replay(InnerSubscription<T> output) {
811820
synchronized (output) {
@@ -909,7 +918,7 @@ void truncate() {
909918
* based on its properties (i.e., truncate but the very last node).
910919
*/
911920
void truncateFinal() {
912-
921+
trimHead();
913922
}
914923
/* test */ final void collect(Collection<? super T> output) {
915924
Node n = getHead();

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,16 @@ final void removeFirst() {
619619
// can't null out the head's value because of late replayers would see null
620620
setFirst(next);
621621
}
622+
623+
final void trimHead() {
624+
Node head = get();
625+
if (head.value != null) {
626+
Node n = new Node(null);
627+
n.lazySet(head.get());
628+
set(n);
629+
}
630+
}
631+
622632
/* test */ final void removeSome(int n) {
623633
Node head = get();
624634
while (n > 0) {
@@ -733,7 +743,7 @@ Object leaveTransform(Object value) {
733743
* based on its properties (i.e., truncate but the very last node).
734744
*/
735745
void truncateFinal() {
736-
746+
trimHead();
737747
}
738748
/* test */ final void collect(Collection<? super T> output) {
739749
Node n = getHead();

src/main/java/io/reactivex/processors/ReplayProcessor.java

+63-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.reactivestreams.*;
2222

2323
import io.reactivex.Scheduler;
24-
import io.reactivex.annotations.CheckReturnValue;
24+
import io.reactivex.annotations.*;
2525
import io.reactivex.internal.functions.ObjectHelper;
2626
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2727
import io.reactivex.internal.util.*;
@@ -362,6 +362,24 @@ public Throwable getThrowable() {
362362
return null;
363363
}
364364

365+
/**
366+
* Makes sure the item cached by the head node in a bounded
367+
* ReplayProcessor is released (as it is never part of a replay).
368+
* <p>
369+
* By default, live bounded buffers will remember one item before
370+
* the currently receivable one to ensure subscribers can always
371+
* receive a continuous sequence of items. A terminated ReplayProcessor
372+
* automatically releases this inaccessible item.
373+
* <p>
374+
* The method must be called sequentially, similar to the standard
375+
* {@code onXXX} methods.
376+
* @since 2.1.11 - experimental
377+
*/
378+
@Experimental
379+
public void cleanupBuffer() {
380+
buffer.trimHead();
381+
}
382+
365383
/**
366384
* Returns a single value the Subject currently has or null if no such value exists.
367385
* <p>The method is thread-safe.
@@ -499,6 +517,12 @@ interface ReplayBuffer<T> {
499517
boolean isDone();
500518

501519
Throwable getError();
520+
521+
/**
522+
* Make sure an old inaccessible head value is released
523+
* in a bounded buffer.
524+
*/
525+
void trimHead();
502526
}
503527

504528
static final class ReplaySubscription<T> extends AtomicInteger implements Subscription {
@@ -568,6 +592,11 @@ public void complete() {
568592
done = true;
569593
}
570594

595+
@Override
596+
public void trimHead() {
597+
// not applicable for an unbounded buffer
598+
}
599+
571600
@Override
572601
public T getValue() {
573602
int s = size;
@@ -771,14 +800,25 @@ public void next(T value) {
771800
@Override
772801
public void error(Throwable ex) {
773802
error = ex;
803+
trimHead();
774804
done = true;
775805
}
776806

777807
@Override
778808
public void complete() {
809+
trimHead();
779810
done = true;
780811
}
781812

813+
@Override
814+
public void trimHead() {
815+
if (head.value != null) {
816+
Node<T> n = new Node<T>(null);
817+
n.lazySet(head.get());
818+
head = n;
819+
}
820+
}
821+
782822
@Override
783823
public boolean isDone() {
784824
return done;
@@ -992,19 +1032,39 @@ void trimFinal() {
9921032
for (;;) {
9931033
TimedNode<T> next = h.get();
9941034
if (next == null) {
995-
head = h;
1035+
if (h.value != null) {
1036+
head = new TimedNode<T>(null, 0L);
1037+
} else {
1038+
head = h;
1039+
}
9961040
break;
9971041
}
9981042

9991043
if (next.time > limit) {
1000-
head = h;
1044+
if (h.value != null) {
1045+
TimedNode<T> n = new TimedNode<T>(null, 0L);
1046+
n.lazySet(h.get());
1047+
head = n;
1048+
} else {
1049+
head = h;
1050+
}
10011051
break;
10021052
}
10031053

10041054
h = next;
10051055
}
10061056
}
10071057

1058+
1059+
@Override
1060+
public void trimHead() {
1061+
if (head.value != null) {
1062+
TimedNode<T> n = new TimedNode<T>(null, 0L);
1063+
n.lazySet(head.get());
1064+
head = n;
1065+
}
1066+
}
1067+
10081068
@Override
10091069
public void next(T value) {
10101070
TimedNode<T> n = new TimedNode<T>(value, scheduler.now(unit));

src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java

+169-1
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.*;
2323

24-
import io.reactivex.annotations.NonNull;
2524
import org.junit.*;
2625
import org.mockito.InOrder;
2726
import org.reactivestreams.*;
2827

2928
import io.reactivex.*;
3029
import io.reactivex.Scheduler.Worker;
30+
import io.reactivex.annotations.NonNull;
3131
import io.reactivex.disposables.Disposable;
3232
import io.reactivex.exceptions.TestException;
3333
import io.reactivex.flowables.ConnectableFlowable;
@@ -1775,4 +1775,172 @@ public void badRequest() {
17751775
.replay()
17761776
);
17771777
}
1778+
1779+
@Test
1780+
public void noHeadRetentionCompleteSize() {
1781+
PublishProcessor<Integer> source = PublishProcessor.create();
1782+
1783+
FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
1784+
.replay(1);
1785+
1786+
// the backpressure coordination would not accept items from source otherwise
1787+
co.test();
1788+
1789+
co.connect();
1790+
1791+
BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);
1792+
1793+
source.onNext(1);
1794+
source.onNext(2);
1795+
source.onComplete();
1796+
1797+
assertNull(buf.get().value);
1798+
1799+
Object o = buf.get();
1800+
1801+
buf.trimHead();
1802+
1803+
assertSame(o, buf.get());
1804+
}
1805+
1806+
@Test
1807+
public void noHeadRetentionErrorSize() {
1808+
PublishProcessor<Integer> source = PublishProcessor.create();
1809+
1810+
FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
1811+
.replay(1);
1812+
1813+
co.test();
1814+
1815+
co.connect();
1816+
1817+
BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);
1818+
1819+
source.onNext(1);
1820+
source.onNext(2);
1821+
source.onError(new TestException());
1822+
1823+
assertNull(buf.get().value);
1824+
1825+
Object o = buf.get();
1826+
1827+
buf.trimHead();
1828+
1829+
assertSame(o, buf.get());
1830+
}
1831+
1832+
@Test
1833+
public void noHeadRetentionSize() {
1834+
PublishProcessor<Integer> source = PublishProcessor.create();
1835+
1836+
FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
1837+
.replay(1);
1838+
1839+
co.test();
1840+
1841+
co.connect();
1842+
1843+
BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);
1844+
1845+
source.onNext(1);
1846+
source.onNext(2);
1847+
1848+
assertNotNull(buf.get().value);
1849+
1850+
buf.trimHead();
1851+
1852+
assertNull(buf.get().value);
1853+
1854+
Object o = buf.get();
1855+
1856+
buf.trimHead();
1857+
1858+
assertSame(o, buf.get());
1859+
}
1860+
1861+
@Test
1862+
public void noHeadRetentionCompleteTime() {
1863+
PublishProcessor<Integer> source = PublishProcessor.create();
1864+
1865+
FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
1866+
.replay(1, TimeUnit.MINUTES, Schedulers.computation());
1867+
1868+
co.test();
1869+
1870+
co.connect();
1871+
1872+
BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);
1873+
1874+
source.onNext(1);
1875+
source.onNext(2);
1876+
source.onComplete();
1877+
1878+
assertNull(buf.get().value);
1879+
1880+
Object o = buf.get();
1881+
1882+
buf.trimHead();
1883+
1884+
assertSame(o, buf.get());
1885+
}
1886+
1887+
@Test
1888+
public void noHeadRetentionErrorTime() {
1889+
PublishProcessor<Integer> source = PublishProcessor.create();
1890+
1891+
FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
1892+
.replay(1, TimeUnit.MINUTES, Schedulers.computation());
1893+
1894+
co.test();
1895+
1896+
co.connect();
1897+
1898+
BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);
1899+
1900+
source.onNext(1);
1901+
source.onNext(2);
1902+
source.onError(new TestException());
1903+
1904+
assertNull(buf.get().value);
1905+
1906+
Object o = buf.get();
1907+
1908+
buf.trimHead();
1909+
1910+
assertSame(o, buf.get());
1911+
}
1912+
1913+
@Test
1914+
public void noHeadRetentionTime() {
1915+
TestScheduler sch = new TestScheduler();
1916+
1917+
PublishProcessor<Integer> source = PublishProcessor.create();
1918+
1919+
FlowableReplay<Integer> co = (FlowableReplay<Integer>)source
1920+
.replay(1, TimeUnit.MILLISECONDS, sch);
1921+
1922+
co.test();
1923+
1924+
co.connect();
1925+
1926+
BoundedReplayBuffer<Integer> buf = (BoundedReplayBuffer<Integer>)(co.current.get().buffer);
1927+
1928+
source.onNext(1);
1929+
1930+
sch.advanceTimeBy(2, TimeUnit.MILLISECONDS);
1931+
1932+
source.onNext(2);
1933+
1934+
assertNotNull(buf.get().value);
1935+
1936+
buf.trimHead();
1937+
1938+
assertNull(buf.get().value);
1939+
1940+
Object o = buf.get();
1941+
1942+
buf.trimHead();
1943+
1944+
assertSame(o, buf.get());
1945+
}
17781946
}

0 commit comments

Comments
 (0)