Skip to content

1.x: replay().refCount() avoid leaking items between connections #5181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeRefCount.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ void cleanup() {
// and set the subscriptionCount to 0
lock.lock();
try {

if (baseSubscription == currentBase) {
// backdoor into the ConnectableObservable to cleanup and reset its state
if (source instanceof Subscription) {
((Subscription)source).unsubscribe();
}

baseSubscription.unsubscribe();
baseSubscription = new CompositeSubscription();
subscriptionCount.set(0);
Expand All @@ -148,7 +154,13 @@ public void call() {
lock.lock();
try {
if (baseSubscription == current) {

if (subscriptionCount.decrementAndGet() == 0) {
// backdoor into the ConnectableObservable to cleanup and reset its state
if (source instanceof Subscription) {
((Subscription)source).unsubscribe();
}

baseSubscription.unsubscribe();
// need a new baseSubscription because once
// unsubscribed stays that way
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import rx.schedulers.Timestamped;
import rx.subscriptions.Subscriptions;

public final class OperatorReplay<T> extends ConnectableObservable<T> {
public final class OperatorReplay<T> extends ConnectableObservable<T> implements Subscription {
/** The source observable. */
final Observable<? extends T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
Expand Down Expand Up @@ -254,6 +254,17 @@ private OperatorReplay(OnSubscribe<T> onSubscribe, Observable<? extends T> sourc
this.bufferFactory = bufferFactory;
}

@Override
public void unsubscribe() {
current.lazySet(null);
}

@Override
public boolean isUnsubscribed() {
ReplaySubscriber<T> ps = current.get();
return ps == null || ps.isUnsubscribed();
}

@Override
public void connect(Action1<? super Subscription> connection) {
boolean doConnect;
Expand Down
153 changes: 153 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
Expand All @@ -31,6 +32,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.functions.*;
import rx.observables.ConnectableObservable;
import rx.observers.*;
import rx.schedulers.*;
import rx.subjects.ReplaySubject;
Expand Down Expand Up @@ -611,4 +613,155 @@ public void call(Throwable t) {
assertNotNull("First subscriber didn't get the error", err1);
assertNotNull("Second subscriber didn't get the error", err2);
}

Observable<Object> source;

@Test
public void replayNoLeak() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't WeakReference be a more determined way to detect memory leak?

AtomicReference<WeakReference<Object>> ref = new AtomicReference<>();

Observable
  .fromCallable {
    Object o = new Object();
    ref.set(new WeakReference(o));
    return o;
  }
  .replay(1)
  .refCount();

…

for (i = 0; i < 10; i++) { System.gc(); Thread.sleep(1); }

assertThat(ref.get().get()).isNull()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ~100M is almost as apparent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes and no. I wouldn't trust heap of a process in which I do not control Threads and allocations (both IDE and Gradle start separate JVM with own wrappers around your tests to execute them).

Also, I'm afraid that allocation of such a relatively huge piece of memory as an array may result in OOM on constrained environment like Travis CI.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why only 100M, the OOMKiller jumps into action around 800 MB. Plus, the scheduler tests already use such large amount of memory and no kills for those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then!

}
})
.replay(1)
.refCount();

source.subscribe();

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void replayNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
}).concatWith(Observable.never())
.replay(1)
.refCount();

Subscription s1 = source.subscribe();
Subscription s2 = source.subscribe();

s1.unsubscribe();
s2.unsubscribe();

s1 = null;
s2 = null;

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

static final class ExceptionData extends Exception {
private static final long serialVersionUID = -6763898015338136119L;

public final Object data;

public ExceptionData(Object data) {
this.data = data;
}
}

@Test
public void publishNoLeak() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new ExceptionData(new byte[100 * 1000 * 1000]);
}
})
.publish()
.refCount();

Action1<Throwable> err = Actions.empty();
source.subscribe(Actions.empty(), err);

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void publishNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
}).concatWith(Observable.never())
.publish()
.refCount();

Subscription s1 = source.test(0);
Subscription s2 = source.test(0);

s1.unsubscribe();
s2.unsubscribe();

s1 = null;
s2 = null;

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void replayIsUnsubscribed() {
ConnectableObservable<Integer> co = Observable.just(1)
.replay();

assertTrue(((Subscription)co).isUnsubscribed());

Subscription s = co.connect();

assertFalse(((Subscription)co).isUnsubscribed());

s.unsubscribe();

assertTrue(((Subscription)co).isUnsubscribed());
}
}