diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 86774cbd39..33e14b15de 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -410,6 +410,7 @@ boolean add(InnerProducer producer) { * Atomically removes the given producer from the producers array. * @param producer the producer to remove */ + @SuppressWarnings("unchecked") void remove(InnerProducer producer) { if (terminated) { return; @@ -419,6 +420,9 @@ void remove(InnerProducer producer) { return; } producers.remove(producer); + if (producers.isEmpty()) { + producersCache = EMPTY; + } producersVersion++; } } @@ -643,7 +647,7 @@ static final class InnerProducer extends AtomicLong implements Producer, Subs */ final ReplaySubscriber parent; /** The actual child subscriber. */ - final Subscriber child; + Subscriber child; /** * Holds an object that represents the current location in the buffer. * Guarded by the emitter loop. @@ -784,6 +788,8 @@ public void unsubscribe() { // the others had non-zero. By removing this 'blocking' child, the others // are now free to receive events parent.manageRequests(this); + // break the reference + child = null; } } } @@ -878,20 +884,25 @@ public void replay(InnerProducer output) { Integer destIndexObject = output.index(); int destIndex = destIndexObject != null ? destIndexObject : 0; + Subscriber child = output.child; + if (child == null) { + return; + } + long r = output.get(); long e = 0L; while (e != r && destIndex < sourceIndex) { Object o = get(destIndex); try { - if (nl.accept(output.child, o)) { + if (nl.accept(child, o)) { return; } } catch (Throwable err) { Exceptions.throwIfFatal(err); output.unsubscribe(); if (!nl.isError(o) && !nl.isCompleted(o)) { - output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o))); + child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o))); } return; } @@ -1066,6 +1077,11 @@ public final void replay(InnerProducer output) { return; } + Subscriber child = output.child; + if (child == null) { + return; + } + long r = output.get(); long e = 0L; @@ -1074,7 +1090,7 @@ public final void replay(InnerProducer output) { if (v != null) { Object o = leaveTransform(v.value); try { - if (nl.accept(output.child, o)) { + if (nl.accept(child, o)) { output.index = null; return; } @@ -1083,7 +1099,7 @@ public final void replay(InnerProducer output) { Exceptions.throwIfFatal(err); output.unsubscribe(); if (!nl.isError(o) && !nl.isCompleted(o)) { - output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o))); + child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o))); } return; } diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index fafe4ebc58..d3415d6e6e 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -19,6 +19,7 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; +import java.lang.management.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -1495,4 +1496,85 @@ public void timeSizeDefaultScheduler() { ts.assertNoErrors(); ts.assertCompleted(); } + + void replayNoRetention(Func1, ConnectableObservable> replayOp) throws InterruptedException { + System.gc(); + + Thread.sleep(500); + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = replayOp.call(ps); + + Subscription s = co.subscribe(new Action1() { + int[] array = new int[1024 * 1024 * 32]; + + @Override + public void call(Integer t) { + System.out.println(array.length); + } + }); + + co.connect(); + ps.onNext(1); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long middle = memHeap.getUsed(); + + System.out.printf("Starting: %.3f MB%n", middle / 1024.0 / 1024.0); + + s.unsubscribe(); + s = null; + + System.gc(); + + Thread.sleep(500); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long finish = memHeap.getUsed(); + + System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); + + if (finish > initial * 5) { + fail(String.format("Leak: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, middle / 1024 / 1024.0, finish / 1024 / 1024d)); + } + + } + + @Test + public void replayNoRetentionUnbounded() throws Exception { + replayNoRetention(new Func1, ConnectableObservable>() { + @Override + public ConnectableObservable call(Observable o) { + return o.replay(); + } + }); + } + + @Test + public void replayNoRetentionSizeBound() throws Exception { + replayNoRetention(new Func1, ConnectableObservable>() { + @Override + public ConnectableObservable call(Observable o) { + return o.replay(1); + } + }); + } + + @Test + public void replayNoRetentionTimebound() throws Exception { + replayNoRetention(new Func1, ConnectableObservable>() { + @Override + public ConnectableObservable call(Observable o) { + return o.replay(1, TimeUnit.DAYS); + } + }); + } + } \ No newline at end of file