diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index a94fd047ae..8fd5739fb9 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -38,6 +38,8 @@ public class CompositeSubscription implements Subscription { */ private AtomicBoolean unsubscribed = new AtomicBoolean(false); private final ConcurrentHashMap subscriptions = new ConcurrentHashMap(); + + private final Object guard = new Object(); public CompositeSubscription(List subscriptions) { for (Subscription s : subscriptions) { @@ -87,19 +89,32 @@ public boolean isUnsubscribed() { return unsubscribed.get(); } - public synchronized void add(Subscription s) { - if (unsubscribed.get()) { - s.unsubscribe(); - } else { - subscriptions.put(s, Boolean.TRUE); + public void add(Subscription s) { + Subscription q = null; + synchronized (guard) { + if (unsubscribed.get()) { + q = s; + } else { + subscriptions.put(s, Boolean.TRUE); + } + } + if (q != null) { + q.unsubscribe(); } } @Override - public synchronized void unsubscribe() { - if (unsubscribed.compareAndSet(false, true)) { + public void unsubscribe() { + List toUnsubscribe = null; + synchronized (guard) { + if (unsubscribed.compareAndSet(false, true)) { + toUnsubscribe = new ArrayList(subscriptions.keySet()); + subscriptions.clear(); + } + } + if (toUnsubscribe != null) { Collection es = null; - for (Subscription s : subscriptions.keySet()) { + for (Subscription s : toUnsubscribe) { try { s.unsubscribe(); } catch (Throwable e) { diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 74ed285f77..ff12f2f3a8 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -15,7 +15,6 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import rx.Observable; @@ -28,33 +27,45 @@ */ public class MultipleAssignmentSubscription implements Subscription { - private final AtomicBoolean unsubscribed = new AtomicBoolean(false); - private AtomicReference subscription = new AtomicReference(); - + private final AtomicReference inner = new AtomicReference(); + private static final Subscription UNSUBSCRIBED = new Subscription() { + @Override + public void unsubscribe() { + } + }; public boolean isUnsubscribed() { - return unsubscribed.get(); + return inner.get() == UNSUBSCRIBED; } @Override - public synchronized void unsubscribe() { - unsubscribed.set(true); - Subscription s = getSubscription(); + public void unsubscribe() { + Subscription s = inner.getAndSet(UNSUBSCRIBED); if (s != null) { s.unsubscribe(); } - } - public synchronized void setSubscription(Subscription s) { - if (unsubscribed.get()) { - s.unsubscribe(); - } else { - subscription.set(s); - } + public void setSubscription(Subscription s) { + do { + Subscription r = inner.get(); + if (r == UNSUBSCRIBED) { + s.unsubscribe(); + return; + } else { + if (inner.compareAndSet(r, s)) { + return; + } + } + } while (true); } public Subscription getSubscription() { - return subscription.get(); + Subscription r = inner.get(); + // don't leak the unsubscription sentinel + if (r == UNSUBSCRIBED) { + return Subscriptions.empty(); + } + return r; } } diff --git a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java new file mode 100644 index 0000000000..8e4f782be2 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java @@ -0,0 +1,68 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import rx.Subscription; +import static rx.subscriptions.Subscriptions.create; +import rx.util.functions.Action0; + +public class MultipleAssignmentSubscriptionTest { + Action0 unsubscribe; + Subscription s; + @Before + public void before() { + unsubscribe = mock(Action0.class); + s = create(unsubscribe); + } + @Test + public void testNoUnsubscribeWhenReplaced() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.setSubscription(s); + mas.setSubscription(null); + mas.unsubscribe(); + + verify(unsubscribe, never()).call(); + + } + @Test + public void testUnsubscribeWhenParentUnsubscribes() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + mas.setSubscription(s); + mas.unsubscribe(); + mas.unsubscribe(); + + verify(unsubscribe, times(1)).call(); + + Assert.assertEquals(true, mas.isUnsubscribed()); + } + @Test + public void testUnsubscribedDoesntLeakSentinel() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.setSubscription(s); + mas.unsubscribe(); + + Assert.assertEquals(true, mas.getSubscription() == Subscriptions.empty()); + } +}