diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e11f9b7056..021352a79c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import rx.joins.Pattern2; import rx.joins.Plan0; @@ -106,7 +108,6 @@ import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; import rx.plugins.RxJavaErrorHandler; -import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; @@ -137,11 +138,11 @@ /** * The Observable interface that implements the Reactive Pattern. *

- * This interface provides overloaded methods for subscribing as well as - * delegate methods to the various operators. + * This interface provides overloaded methods for subscribing as well as delegate methods to the + * various operators. *

- * The documentation for this interface makes use of marble diagrams. The - * following legend explains these diagrams: + * The documentation for this interface makes use of marble diagrams. The following legend explains + * these diagrams: *

* *

@@ -152,40 +153,321 @@ */ public class Observable { - private final static ConcurrentHashMap internalClassMap = new ConcurrentHashMap(); + /** + * Executed when 'unsubscribe' is invoked on a {@link PartialSubscription}. + */ + public interface OnPartialUnsubscribeFunc extends Function { + void onUnsubscribe(); + } + + /** + * Executed when 'subscribe' is invoked on a {@link PartialSubscription}. + */ + public interface OnPartialSubscribeFunc extends Function { + void onSubscribe(Observer observer); + } + + final static ConcurrentHashMap internalClassMap = new ConcurrentHashMap(); /** - * Executed when 'subscribe' is invoked. + * Executed when 'getSubscription' is invoked. */ - private final OnSubscribeFunc onSubscribe; + private final OnGetSubscriptionFunc onGetSubscription; /** - * Function interface for work to be performed when an {@link Observable} - * is subscribed to via {@link Observable#subscribe(Observer)} + * Function interface for work to be performed when an {@link Observable} is subscription is + * created via {@link Observable#getSubscription()} * * @param */ - public static interface OnSubscribeFunc extends Function { - - public Subscription onSubscribe(Observer t1); + public static interface OnGetSubscriptionFunc extends Function { + public PartialSubscription onGetSubscription(); + } + /** + * Function interface for work to be performed when an {@link Observable} is subscribed to via + * {@link Observable#subscribe(Observer)} + * + * @param + * @deprecated use OnGetSubscriptionFunc + */ + @Deprecated + public static interface OnSubscribeFunc extends Function { + public Subscription onSubscribe(Observer observer); } /** - * Observable with Function to execute when subscribed to. + * Observable with Function to execute when subscription is created but before work has started. *

- * NOTE: Use {@link #create(OnSubscribeFunc)} to create an Observable - * instead of this constructor unless you specifically have a need for - * inheritance. + * NOTE: Use {@link #create(OnGetSubscriptionFunc)} to create an Observable instead of this + * constructor unless you specifically have a need for inheritance. * - * @param onSubscribe {@link OnSubscribeFunc} to be executed when - * {@link #subscribe(Observer)} is called + * @param onGetSubscription + * {@link OnGetSubscriptionFunc} to be executed when {@link #getSubscription()} is + * called */ - protected Observable(OnSubscribeFunc onSubscribe) { - this.onSubscribe = onSubscribe; + protected Observable(OnGetSubscriptionFunc onGetSubscription) { + this.onGetSubscription = onGetSubscription; + } + + public static final class PartialSubscription implements Subscription { + private AtomicReference> subscribedObserver = new AtomicReference>(null); + private static final Observer UNSUBSCRIBED_OBSERVER = new Observer() { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Object args) { + } + }; + + private final OnPartialSubscribeFunc onPartialSubscribe; + + private final OnPartialUnsubscribeFunc onPartialUnsubscribe; + + public PartialSubscription(OnPartialSubscribeFunc onPartialSubscribe, OnPartialUnsubscribeFunc onPartialUnsubscribe) { + this.onPartialSubscribe = onPartialSubscribe; + this.onPartialUnsubscribe = onPartialUnsubscribe; + } + + private boolean unsubscribed = false; + + @Override + @SuppressWarnings("unchecked") + public void unsubscribe() { + Observer observer = subscribedObserver.get(); + if (observer == null) { + subscribedObserver.compareAndSet(null, UNSUBSCRIBED_OBSERVER); + observer = subscribedObserver.get(); + } + try { + onPartialUnsubscribe.onUnsubscribe(); + } catch (Throwable e) { + observer.onError(e); + } finally { + unsubscribed = true; + } + + } + + public boolean isUnsubscribed() { + return unsubscribed; + } + + /** + * Whether a given {@link Function} is an internal implementation inside + * rx.* packages or not. + *

+ * For why this is being used see https://github.com/Netflix/RxJava/issues/216 for + * discussion on "Guideline 6.4: Protect calls to user code from within an operator" + * + * Note: If strong reasons for not depending on package names comes up then the + * implementation of this method can change to looking for a marker interface. + * + * @param o + * @return {@code true} if the given function is an internal implementation, + * and {@code false} otherwise. + */ + private boolean isInternalImplementation(Object o) { + if (o == null) { + return true; + } + // prevent double-wrapping (yeah it happens) + if (o instanceof SafeObserver) { + return true; + } + + Class clazz = o.getClass(); + if (internalClassMap.containsKey(clazz)) { + // don't need to do reflection + return internalClassMap.get(clazz); + } else { + // we treat the following package as "internal" and don't wrap it + Package p = o.getClass().getPackage(); // it can be null + Boolean isInternal = (p != null && p.getName().startsWith("rx.operators")); + internalClassMap.put(clazz, isInternal); + return isInternal; + } + } + + /** + * Allow the {@link RxJavaErrorHandler} to receive the exception from + * onError. + * + * @param e + */ + private void handleError(Throwable e) { + // onError should be rare so we'll only fetch when needed + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + } + + public void subscribe(Observer observer) { + if (!subscribedObserver.compareAndSet(null, observer)) { + // someone else has already subscribed to this PartialSubscription + if (subscribedObserver.get() == UNSUBSCRIBED_OBSERVER) { + // this means that the subscription was unsubscribed before the subscribe was called. + return; + } + else { + throw new IllegalStateException("A PartialSubscription can only be subscribed to once. The first Oserver is "+ subscribedObserver.get() +" the second Observer is "+ observer); + } + } + + // allow the hook to intercept and/or decorate + OnPartialSubscribeFunc onPartialSubscribeFunction = onPartialSubscribe;// hook.onPartialSubscribeStart(this, + // onPartialSubscribe); + // validate and proceed + if (observer == null) { + throw new IllegalArgumentException("observer can not be null"); + } + if (onPartialSubscribeFunction == null) { + throw new IllegalStateException("onSubscribe function can not be null."); + // the subscribe function can also be overridden but generally that's not the + // appropriate approach so I won't mention that in the exception + } + try { + /** + * See https://github.com/Netflix/RxJava/issues/216 for discussion on + * "Guideline 6.4: Protect calls to user code from within an operator" + */ + if (isInternalImplementation(observer)) { + onPartialSubscribeFunction.onSubscribe(observer); + } else { + onPartialSubscribeFunction.onSubscribe(new SafeObserver(this, observer)); + } + } catch (OnErrorNotImplementedException e) { + // special handling when onError is not implemented ... we just rethrow + throw e; + } catch (Throwable e) { + // if an unhandled error occurs executing the onSubscribe we will propagate it + try { + observer.onError(e);// hook.onSubscribeError(this, e)); + } catch (OnErrorNotImplementedException e2) { + // special handling when onError is not implemented ... we just rethrow + throw e2; + } catch (Throwable e2) { + // if this happens it means the onError itself failed (perhaps an invalid + // function implementation) + // so we are unable to propagate the error correctly and will just throw + RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); + // hook.onSubscribeError(this, r); + throw r; + } + } + } + + private class ObserverFunctionWrapper implements Observer { + private final Action1 onNext; + private final Action1 onError; + private final Action0 onCompleted; + + public ObserverFunctionWrapper(final Action1 onNext, final Action1 onError, final Action0 onCompleted) { + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + public void onCompleted() { + if (onCompleted != null) + onCompleted.call(); + } + + @Override + public void onError(Throwable e) { + handleError(e); + if (onError == null) + throw new OnErrorNotImplementedException(e); + onError.call(e); + } + + @Override + public void onNext(T data) { + if (onNext != null) { + onNext.call(data); + } + } + } + + /** + * Subscribe and ignore all events. + * + * @return + */ + public void subscribe() { + subscribe(new ObserverFunctionWrapper(null, null, null)); + } + + /** + * An {@link Observer} must call an Observable's {@code subscribe} method + * in order to receive items and notifications from the Observable. + * + * @param onNext + * @return + */ + public void subscribe(final Action1 onNext) { + if (onNext == null) { + throw new IllegalArgumentException("onNext can not be null"); + } + + subscribe(new ObserverFunctionWrapper(onNext, null, null)); + } + + /** + * An {@link Observer} must call an Observable's {@code subscribe} method in + * order to receive items and notifications from the Observable. + * + * @param onNext + * @param onError + * @return + */ + public void subscribe(final Action1 onNext, final Action1 onError) { + if (onNext == null) { + throw new IllegalArgumentException("onNext can not be null"); + } + if (onError == null) { + throw new IllegalArgumentException("onError can not be null"); + } + + subscribe(new ObserverFunctionWrapper(onNext, onError, null)); + } + + /** + * An {@link Observer} must call an Observable's {@code subscribe} method in + * order to receive items and notifications from the Observable. + * + * @param onNext + * @param onError + * @param onComplete + * @return + */ + public void subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { + if (onNext == null) { + throw new IllegalArgumentException("onNext can not be null"); + } + if (onError == null) { + throw new IllegalArgumentException("onError can not be null"); + } + if (onComplete == null) { + throw new IllegalArgumentException("onComplete can not be null"); + } + + subscribe(new ObserverFunctionWrapper(onNext, onError, onComplete)); + } + + public static PartialSubscription create(OnPartialSubscribeFunc onPartialSubscribe, OnPartialUnsubscribeFunc onPartialUnsubscribe) { + return new PartialSubscription(onPartialSubscribe, onPartialUnsubscribe); + } } - private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + public PartialSubscription getSubscription() { + return onGetSubscription.onGetSubscription(); + } /** * An {@link Observer} must call an Observable's {@code subscribe} method in @@ -217,55 +499,13 @@ protected Observable(OnSubscribeFunc onSubscribe) { * @throws IllegalArgumentException if the {@link Observer} provided as the * argument to {@code subscribe()} is * {@code null} + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(Observer observer) { - // allow the hook to intercept and/or decorate - OnSubscribeFunc onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe); - // validate and proceed - if (observer == null) { - throw new IllegalArgumentException("observer can not be null"); - } - if (onSubscribeFunction == null) { - throw new IllegalStateException("onSubscribe function can not be null."); - // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception - } - try { - /** - * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" - */ - if (isInternalImplementation(observer)) { - Subscription s = onSubscribeFunction.onSubscribe(observer); - if (s == null) { - // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens - // we want to gracefully handle it the same as AtomicObservableSubscription does - return hook.onSubscribeReturn(this, Subscriptions.empty()); - } else { - return hook.onSubscribeReturn(this, s); - } - } else { - SafeObservableSubscription subscription = new SafeObservableSubscription(); - subscription.wrap(onSubscribeFunction.onSubscribe(new SafeObserver(subscription, observer))); - return hook.onSubscribeReturn(this, subscription); - } - } catch (OnErrorNotImplementedException e) { - // special handling when onError is not implemented ... we just rethrow - throw e; - } catch (Throwable e) { - // if an unhandled error occurs executing the onSubscribe we will propagate it - try { - observer.onError(hook.onSubscribeError(this, e)); - } catch (OnErrorNotImplementedException e2) { - // special handling when onError is not implemented ... we just rethrow - throw e2; - } catch (Throwable e2) { - // if this happens it means the onError itself failed (perhaps an invalid function implementation) - // so we are unable to propagate the error correctly and will just throw - RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); - hook.onSubscribeError(this, r); - throw r; - } - return Subscriptions.empty(); - } + PartialSubscription partial = getSubscription(); + partial.subscribe(observer); + return partial; } /** @@ -299,86 +539,39 @@ public Subscription subscribe(Observer observer) { * finished sending them * @throws IllegalArgumentException if an argument to {@code subscribe()} * is {@code null} + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(Observer observer, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(observer); } - /** - * Protects against errors being thrown from Observer implementations and - * ensures onNext/onError/onCompleted contract compliance. - *

- * See https://github.com/Netflix/RxJava/issues/216 for a discussion on - * "Guideline 6.4: Protect calls to user code from within an operator" - */ - private Subscription protectivelyWrapAndSubscribe(Observer o) { - SafeObservableSubscription subscription = new SafeObservableSubscription(); - return subscription.wrap(subscribe(new SafeObserver(subscription, o))); - } - /** * Subscribe and ignore all events. - * - * @return + * + * @return + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe() { - return protectivelyWrapAndSubscribe(new Observer() { - - @Override - public void onCompleted() { - // do nothing - } - - @Override - public void onError(Throwable e) { - handleError(e); - throw new OnErrorNotImplementedException(e); - } - - @Override - public void onNext(T args) { - // do nothing - } - - }); + PartialSubscription partial = getSubscription(); + partial.subscribe(); + return partial; } - + /** * An {@link Observer} must call an Observable's {@code subscribe} method * in order to receive items and notifications from the Observable. * * @param onNext - * @return + * @return + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(final Action1 onNext) { - if (onNext == null) { - throw new IllegalArgumentException("onNext can not be null"); - } - - /** - * Wrapping since raw functions provided by the user are being invoked. - * - * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" - */ - return protectivelyWrapAndSubscribe(new Observer() { - - @Override - public void onCompleted() { - // do nothing - } - - @Override - public void onError(Throwable e) { - handleError(e); - throw new OnErrorNotImplementedException(e); - } - - @Override - public void onNext(T args) { - onNext.call(args); - } - - }); + PartialSubscription partial = getSubscription(); + partial.subscribe(onNext); + return partial; } /** @@ -388,7 +581,9 @@ public void onNext(T args) { * @param onNext * @param scheduler * @return + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(final Action1 onNext, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext); } @@ -400,39 +595,13 @@ public Subscription subscribe(final Action1 onNext, Scheduler schedul * @param onNext * @param onError * @return + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(final Action1 onNext, final Action1 onError) { - if (onNext == null) { - throw new IllegalArgumentException("onNext can not be null"); - } - if (onError == null) { - throw new IllegalArgumentException("onError can not be null"); - } - - /** - * Wrapping since raw functions provided by the user are being invoked. - * - * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" - */ - return protectivelyWrapAndSubscribe(new Observer() { - - @Override - public void onCompleted() { - // do nothing - } - - @Override - public void onError(Throwable e) { - handleError(e); - onError.call(e); - } - - @Override - public void onNext(T args) { - onNext.call(args); - } - - }); + PartialSubscription partial = getSubscription(); + partial.subscribe(onNext, onError); + return partial; } /** @@ -443,7 +612,9 @@ public void onNext(T args) { * @param onError * @param scheduler * @return + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } @@ -456,42 +627,13 @@ public Subscription subscribe(final Action1 onNext, final Action1 onNext, final Action1 onError, final Action0 onComplete) { - if (onNext == null) { - throw new IllegalArgumentException("onNext can not be null"); - } - if (onError == null) { - throw new IllegalArgumentException("onError can not be null"); - } - if (onComplete == null) { - throw new IllegalArgumentException("onComplete can not be null"); - } - - /** - * Wrapping since raw functions provided by the user are being invoked. - * - * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" - */ - return protectivelyWrapAndSubscribe(new Observer() { - - @Override - public void onCompleted() { - onComplete.call(); - } - - @Override - public void onError(Throwable e) { - handleError(e); - onError.call(e); - } - - @Override - public void onNext(T args) { - onNext.call(args); - } - - }); + PartialSubscription partial = getSubscription(); + partial.subscribe(onNext, onError, onComplete); + return partial; } /** @@ -503,7 +645,9 @@ public void onNext(T args) { * @param onComplete * @param scheduler * @return + * @deprecated see {@link Observable#getSubscription()} */ + @Deprecated public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } @@ -512,29 +656,21 @@ public Subscription subscribe(final Action1 onNext, final Action1 result type + * @param subject + * the {@link Subject} for the {@link ConnectableObservable} to push source items + * into + * @param + * result type * @return a {@link ConnectableObservable} that upon connection causes the - * source Observable to push results into the specified - * {@link Subject} - * @see RxJava Wiki: Observable.publish() and Observable.multicast() + * source Observable to push results into the specified {@link Subject} + * @see RxJava + * Wiki: Observable.publish() and Observable.multicast() */ public ConnectableObservable multicast(Subject subject) { return OperationMulticast.multicast(this, subject); } - /** - * Allow the {@link RxJavaErrorHandler} to receive the exception from - * onError. - * - * @param e - */ - private void handleError(Throwable e) { - // onError should be rare so we'll only fetch when needed - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - } - /** * An Observable that never sends any information to an {@link Observer}. * @@ -544,13 +680,19 @@ private void handleError(Throwable e) { */ private static class NeverObservable extends Observable { public NeverObservable() { - super(new OnSubscribeFunc() { - + super(new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer t1) { - return Subscriptions.empty(); + public PartialSubscription onGetSubscription() { + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + } + }); } - }); } } @@ -564,24 +706,22 @@ public Subscription onSubscribe(Observer t1) { private static class ThrowObservable extends Observable { public ThrowObservable(final Throwable exception) { - super(new OnSubscribeFunc() { - - /** - * Accepts an {@link Observer} and calls its - * {@link Observer#onError onError} method. - * - * @param observer an {@link Observer} of this Observable - * @return a reference to the subscription - */ + super(new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer observer) { - observer.onError(exception); - return Subscriptions.empty(); + public PartialSubscription onGetSubscription() { + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + observer.onError(exception); + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + } + }); } - }); } - } /** @@ -602,16 +742,40 @@ public Subscription onSubscribe(Observer observer) { * See Rx Design * Guidelines (PDF) for detailed information. * - * @param the type of the items that this Observable emits - * @param func a function that accepts an {@code Observer}, invokes its - * {@code onNext}, {@code onError}, and {@code onCompleted} - * methods as appropriate, and returns a {@link Subscription} to - * allow the Observer to cancel the subscription + * @param + * the type of the items that this Observable emits + * @param func + * a function that accepts an {@code Observer}, invokes its {@code onNext}, + * {@code onError}, and {@code onCompleted} methods as appropriate, and returns a + * {@link Subscription} to + * allow the Observer to cancel the subscription * @return an Observable that, when an {@link Observer} subscribes to it, * will execute the given function - * @see RxJava Wiki: create() + * @see RxJava + * Wiki: create() */ - public static Observable create(OnSubscribeFunc func) { + @Deprecated + public static Observable create(final OnSubscribeFunc func) { + return new Observable(new OnGetSubscriptionFunc() { + @Override + public PartialSubscription onGetSubscription() { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + subscription.wrap(func.onSubscribe(observer)); + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + subscription.unsubscribe(); + } + }); + } + }); + } + + public static Observable create(OnGetSubscriptionFunc func) { return new Observable(func); } diff --git a/rxjava-core/src/main/java/rx/Observer.java b/rxjava-core/src/main/java/rx/Observer.java index 3d35066ba4..e8abd83425 100644 --- a/rxjava-core/src/main/java/rx/Observer.java +++ b/rxjava-core/src/main/java/rx/Observer.java @@ -53,5 +53,5 @@ public interface Observer { * * @param args */ - public void onNext(T args); + public void onNext(T data); } diff --git a/rxjava-core/src/main/java/rx/observables/ConnectableObservable.java b/rxjava-core/src/main/java/rx/observables/ConnectableObservable.java index 1826905eaf..cb8d99d999 100644 --- a/rxjava-core/src/main/java/rx/observables/ConnectableObservable.java +++ b/rxjava-core/src/main/java/rx/observables/ConnectableObservable.java @@ -37,8 +37,8 @@ public abstract class ConnectableObservable extends Observable { - protected ConnectableObservable(OnSubscribeFunc onSubscribe) { - super(onSubscribe); + protected ConnectableObservable(OnGetSubscriptionFunc onGetSubscription) { + super(onGetSubscription); } /** diff --git a/rxjava-core/src/main/java/rx/observables/GroupedObservable.java b/rxjava-core/src/main/java/rx/observables/GroupedObservable.java index f40e8c40af..02fb295eb2 100644 --- a/rxjava-core/src/main/java/rx/observables/GroupedObservable.java +++ b/rxjava-core/src/main/java/rx/observables/GroupedObservable.java @@ -31,8 +31,8 @@ public class GroupedObservable extends Observable { private final K key; - public GroupedObservable(K key, OnSubscribeFunc onSubscribe) { - super(onSubscribe); + public GroupedObservable(K key, OnGetSubscriptionFunc onGetSubscription) { + super(onGetSubscription); this.key = key; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationCast.java b/rxjava-core/src/main/java/rx/operators/OperationCast.java index dc54c204f6..84fd447678 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCast.java @@ -16,7 +16,7 @@ package rx.operators; import rx.Observable; -import rx.Observable.OnSubscribeFunc; +import rx.Observable.OnGetSubscriptionFunc; import rx.util.functions.Func1; /** @@ -24,7 +24,7 @@ */ public class OperationCast { - public static OnSubscribeFunc cast( + public static OnGetSubscriptionFunc cast( Observable source, final Class klass) { return OperationMap.map(source, new Func1() { public R call(T t) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index a30b97523f..9813775000 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -162,21 +162,21 @@ private static class GroupedSubject extends GroupedObservable implem static GroupedSubject create(final K key, final GroupBy parent) { final AtomicReference> subscribedObserver = new AtomicReference>(OperationGroupBy. emptyObserver()); - return new GroupedSubject(key, new OnSubscribeFunc() { - - private final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return new GroupedSubject(key, new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer observer) { - // register Observer - subscribedObserver.set(observer); - - parent.subscribeKey(key); - - return subscription.wrap(new Subscription() { + public PartialSubscription onGetSubscription() { + return PartialSubscription.create(new OnPartialSubscribeFunc() { @Override - public void unsubscribe() { - // we remove the Observer so we stop emitting further events (they will be ignored if parent continues to send) + public void onSubscribe(Observer observer) { + subscribedObserver.set(observer); + parent.subscribeKey(key); + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + // we remove the Observer so we stop emitting further events (they will + // be ignored if parent continues to send) subscribedObserver.set(OperationGroupBy. emptyObserver()); // now we need to notify the parent that we're unsubscribed parent.unsubscribeKey(key); @@ -188,8 +188,8 @@ public void unsubscribe() { private final AtomicReference> subscribedObserver; - public GroupedSubject(K key, OnSubscribeFunc onSubscribe, AtomicReference> subscribedObserver) { - super(key, onSubscribe); + public GroupedSubject(K key, OnGetSubscriptionFunc onGetSubscription, AtomicReference> subscribedObserver) { + super(key, onGetSubscription); this.subscribedObserver = subscribedObserver; } @@ -214,12 +214,12 @@ private static Observer emptyObserver() { return new Observer() { @Override public void onCompleted() { - // do nothing + // do nothing } @Override public void onError(Throwable e) { - // do nothing + // do nothing } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java index 63a271a688..864bc0d31c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java @@ -21,7 +21,11 @@ import java.util.Map; import rx.Observable; +import rx.Observable.OnGetSubscriptionFunc; +import rx.Observable.OnPartialSubscribeFunc; +import rx.Observable.OnPartialUnsubscribeFunc; import rx.Observable.OnSubscribeFunc; +import rx.Observable.PartialSubscription; import rx.Observer; import rx.Subscription; import rx.observables.GroupedObservable; @@ -29,7 +33,6 @@ import rx.subjects.Subject; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; /** @@ -197,11 +200,19 @@ public void onCompleted() { } } - protected static OnSubscribeFunc neverSubscribe() { - return new OnSubscribeFunc() { + protected static OnGetSubscriptionFunc neverSubscribe() { + return new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer t1) { - return Subscriptions.empty(); + public PartialSubscription onGetSubscription() { + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + } + }); } }; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMap.java b/rxjava-core/src/main/java/rx/operators/OperationMap.java index d78b5dc6ef..69327b9819 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMap.java @@ -15,8 +15,15 @@ */ package rx.operators; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + import rx.Observable; +import rx.Observable.OnGetSubscriptionFunc; +import rx.Observable.OnPartialSubscribeFunc; +import rx.Observable.OnPartialUnsubscribeFunc; import rx.Observable.OnSubscribeFunc; +import rx.Observable.PartialSubscription; import rx.Observer; import rx.Subscription; import rx.util.functions.Func1; @@ -42,9 +49,10 @@ public final class OperationMap { * the type of the input sequence. * @param * the type of the output sequence. - * @return a sequence that is the result of applying the transformation function to each item in the input sequence. + * @return a sequence that is the result of applying the transformation function to each item in + * the input sequence. */ - public static OnSubscribeFunc map(final Observable sequence, final Func1 func) { + public static OnGetSubscriptionFunc map(final Observable sequence, final Func1 func) { return mapWithIndex(sequence, new Func2() { @Override public R call(T value, @SuppressWarnings("unused") Integer unused) { @@ -60,26 +68,30 @@ public R call(T value, @SuppressWarnings("unused") Integer unused) { * @param sequence * the input sequence. * @param func - * a function to apply to each item in the sequence. The function gets the index of the emitted item + * a function to apply to each item in the sequence. The function gets the index of + * the emitted item * as additional parameter. * @param * the type of the input sequence. * @param * the type of the output sequence. - * @return a sequence that is the result of applying the transformation function to each item in the input sequence. + * @return a sequence that is the result of applying the transformation function to each item in + * the input sequence. */ - public static OnSubscribeFunc mapWithIndex(final Observable sequence, final Func2 func) { - return new OnSubscribeFunc() { + public static OnGetSubscriptionFunc mapWithIndex(final Observable sequence, final Func2 func) { + return new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer observer) { - return new MapObservable(sequence, func).onSubscribe(observer); + public PartialSubscription onGetSubscription() { + return new MapObservable(sequence, func).onGetSubscription(); } }; } /** - * Accepts a sequence of observable sequences and a transformation function. Returns a flattened sequence that is the result of - * applying the transformation function to each item in the sequence of each observable sequence. + * Accepts a sequence of observable sequences and a transformation function. Returns a flattened + * sequence that is the result of + * applying the transformation function to each item in the sequence of each observable + * sequence. *

* The closure should return an Observable which will then be merged. * @@ -91,21 +103,23 @@ public Subscription onSubscribe(Observer observer) { * the type of the input sequence. * @param * the type of the output sequence. - * @return a sequence that is the result of applying the transformation function to each item in the input sequence. + * @return a sequence that is the result of applying the transformation function to each item in + * the input sequence. */ public static OnSubscribeFunc mapMany(Observable sequence, Func1> func) { return OperationMerge.merge(Observable.create(map(sequence, func))); } /** - * An observable sequence that is the result of applying a transformation to each item in an input sequence. + * An observable sequence that is the result of applying a transformation to each item in an + * input sequence. * * @param * the type of the input sequence. * @param * the type of the output sequence. */ - private static class MapObservable implements OnSubscribeFunc { + private static class MapObservable implements OnGetSubscriptionFunc { public MapObservable(Observable sequence, Func2 func) { this.sequence = sequence; this.func = func; @@ -116,25 +130,38 @@ public MapObservable(Observable sequence, Func2 observer) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - return subscription.wrap(sequence.subscribe(new SafeObserver(subscription, new Observer() { - @Override - public void onNext(T value) { - observer.onNext(func.call(value, index)); - index++; - } + public PartialSubscription onGetSubscription() { + final AtomicReference> subscription = new AtomicReference>(); + return PartialSubscription.create(new OnPartialSubscribeFunc() { @Override - public void onError(Throwable ex) { - observer.onError(ex); + public void onSubscribe(final Observer observer) { + subscription.set(sequence.getSubscription()); + subscription.get().subscribe(new Observer() { + @Override + public void onNext(T value) { + observer.onNext(func.call(value, index)); + index++; + } + + @Override + public void onError(Throwable ex) { + observer.onError(ex); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }); } + }, new OnPartialUnsubscribeFunc() { @Override - public void onCompleted() { - observer.onCompleted(); + public void onUnsubscribe() { + subscription.get().unsubscribe(); } - }))); + }); } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index b634b2dbac..66bfabf653 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -35,10 +35,10 @@ private static class MulticastConnectableObservable extends ConnectableObs private Subscription subscription; public MulticastConnectableObservable(Observable source, final Subject subject) { - super(new OnSubscribeFunc() { + super(new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer observer) { - return subject.subscribe(observer); + public PartialSubscription onGetSubscription() { + return (PartialSubscription) subject.getSubscription(); } }); this.source = source; diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 877ba4d5f3..aa16843440 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,13 +15,16 @@ */ package rx.operators; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import rx.Observable; -import rx.Observable.OnSubscribeFunc; +import rx.Observable.OnGetSubscriptionFunc; +import rx.Observable.OnPartialSubscribeFunc; +import rx.Observable.OnPartialUnsubscribeFunc; +import rx.Observable.PartialSubscription; import rx.Observer; -import rx.Subscription; -import rx.subscriptions.Subscriptions; /** * Returns an Observable that emits the first num items emitted by the source @@ -29,10 +32,10 @@ *

* *

- * You can choose to pay attention only to the first num items emitted by an - * Observable by using the take operation. This operation returns an Observable that will invoke a - * subscribing Observer's onNext function a maximum of num times before - * invoking onCompleted. + * You can choose to pay attention only to the first num items emitted by an Observable + * by using the take operation. This operation returns an Observable that will invoke a subscribing + * Observer's onNext function a maximum of num times before invoking + * onCompleted. */ public final class OperationTake { @@ -41,18 +44,13 @@ public final class OperationTake { * * @param items * @param num - * @return the specified number of contiguous values from the start of the given observable sequence + * @return the specified number of contiguous values from the start of the given observable + * sequence */ - public static OnSubscribeFunc take(final Observable items, final int num) { - // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. - return new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(Observer observer) { - return new Take(items, num).onSubscribe(observer); - } - - }; + public static OnGetSubscriptionFunc take(final Observable items, final int num) { + // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we + // will have 2 instances of Take rather than 1 handing both, which is not thread-safe. + return new Take(items, num); } /** @@ -66,10 +64,10 @@ public Subscription onSubscribe(Observer observer) { * * @param */ - private static class Take implements OnSubscribeFunc { + private static class Take implements OnGetSubscriptionFunc { private final Observable items; private final int num; - private final SafeObservableSubscription subscription = new SafeObservableSubscription(); + private final AtomicReference> subscription = new AtomicReference>(); private Take(Observable items, int num) { this.items = items; @@ -77,30 +75,26 @@ private Take(Observable items, int num) { } @Override - public Subscription onSubscribe(Observer observer) { - if (num < 1) { - items.subscribe(new Observer() - { - @Override - public void onCompleted() - { - } - - @Override - public void onError(Throwable e) - { + public PartialSubscription onGetSubscription() { + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + PartialSubscription partialSubscription = items.getSubscription(); + subscription.set(partialSubscription); + if (num < 1) { + // signal that we don't really want any values by unsubscribing before we've + // even subscribed. + subscription.get().unsubscribe(); } - @Override - public void onNext(T args) - { - } - }).unsubscribe(); - observer.onCompleted(); - return Subscriptions.empty(); - } - - return subscription.wrap(items.subscribe(new ItemObserver(observer))); + partialSubscription.subscribe(new ItemObserver(observer)); + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + subscription.get().unsubscribe(); + } + }); } private class ItemObserver implements Observer { @@ -145,7 +139,7 @@ public void onNext(T args) { } catch (Throwable ex) { hasEmitedError = true; observer.onError(ex); - subscription.unsubscribe(); + subscription.get().unsubscribe(); return; } if (count == num) { @@ -153,12 +147,12 @@ public void onNext(T args) { } } if (count >= num) { - // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable - subscription.unsubscribe(); + // OLD: this will work if the sequence is asynchronous, it will have no effect on a synchronous observable + // NEW: with the two phase getSubscription and subscribe event synchronous observables can be interrupted. + subscription.get().unsubscribe(); } } } - } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java b/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java index a1044ba748..be9ea3ffb3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java @@ -16,8 +16,8 @@ package rx.operators; import rx.Observable; -import rx.Observable.OnSubscribeFunc; import rx.Scheduler; +import rx.Observable.OnGetSubscriptionFunc; import rx.util.Timestamped; import rx.util.functions.Func1; @@ -37,7 +37,7 @@ public final class OperationTimestamp { * the type of the input sequence. * @return a sequence of timestamped values created by adding timestamps to each item in the input sequence. */ - public static OnSubscribeFunc> timestamp(Observable sequence) { + public static OnGetSubscriptionFunc> timestamp(Observable sequence) { return OperationMap.map(sequence, new Func1>() { @Override public Timestamped call(T value) { @@ -48,7 +48,7 @@ public Timestamped call(T value) { /** * Timestamp the source elements based on the timing provided by the scheduler. */ - public static OnSubscribeFunc> timestamp(Observable source, final Scheduler scheduler) { + public static OnGetSubscriptionFunc> timestamp(Observable source, final Scheduler scheduler) { return OperationMap.map(source, new Func1>() { @Override public Timestamped call(T value) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java index a8a970bdd8..77df06cfda 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java @@ -15,7 +15,13 @@ */ package rx.operators; -import rx.Observable.OnSubscribeFunc; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observable.OnGetSubscriptionFunc; +import rx.Observable.OnPartialSubscribeFunc; +import rx.Observable.OnPartialUnsubscribeFunc; +import rx.Observable.PartialSubscription; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; @@ -30,24 +36,38 @@ */ public final class OperationToObservableIterable { - public static OnSubscribeFunc toObservableIterable(Iterable list) { + public static OnGetSubscriptionFunc toObservableIterable(Iterable list) { return new ToObservableIterable(list); } - private static class ToObservableIterable implements OnSubscribeFunc { + private static class ToObservableIterable implements OnGetSubscriptionFunc { public ToObservableIterable(Iterable list) { this.iterable = list; } public Iterable iterable; - public Subscription onSubscribe(Observer observer) { - for (T item : iterable) { - observer.onNext(item); - } - observer.onCompleted(); - - return Subscriptions.empty(); + @Override + public PartialSubscription onGetSubscription() { + final AtomicReference> subscription = new AtomicReference>(); + + subscription.set(PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + for (T item : iterable) { + if (!subscription.get().isUnsubscribed()) + observer.onNext(item); + } + if (!subscription.get().isUnsubscribed()) + observer.onCompleted(); + } + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + } + })); + + return subscription.get(); } } } diff --git a/rxjava-core/src/main/java/rx/operators/SafeObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java index 3e6508dac9..99f16f7319 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import rx.Observer; +import rx.Subscription; import rx.plugins.RxJavaPlugins; import rx.util.CompositeException; import rx.util.OnErrorNotImplementedException; @@ -59,9 +60,9 @@ public class SafeObserver implements Observer { private final Observer actual; private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final SafeObservableSubscription subscription; + private final Subscription subscription; - public SafeObserver(SafeObservableSubscription subscription, Observer actual) { + public SafeObserver(Subscription subscription, Observer actual) { this.subscription = subscription; this.actual = actual; } diff --git a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java index 30db331ac2..c59c034532 100644 --- a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java @@ -24,70 +24,62 @@ import rx.Notification; import rx.Observer; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Action2; public abstract class AbstractSubject extends Subject { - protected AbstractSubject(rx.Observable.OnSubscribeFunc onSubscribe) { - super(onSubscribe); + protected AbstractSubject(OnGetSubscriptionFunc onGetSubscription) { + super(onGetSubscription); } protected static class SubjectState { - protected final ConcurrentHashMap> observers = new ConcurrentHashMap>(); + protected final ConcurrentHashMap> observers = new ConcurrentHashMap>(); protected final AtomicReference> currentValue = new AtomicReference>(); protected final AtomicBoolean completed = new AtomicBoolean(); protected final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock(); } - protected static OnSubscribeFunc getOnSubscribeFunc(final SubjectState state, final Action2, Observer> onEach) { - return new OnSubscribeFunc() { + protected static OnGetSubscriptionFunc getOnGetSubscriptionFunc(final SubjectState state, final Action2, Observer> onEach) { + return new OnGetSubscriptionFunc() { @Override - public Subscription onSubscribe(Observer observer) { - /* - * Subscription needs to be synchronized with terminal states to ensure - * race conditions are handled. When subscribing we must make sure - * onComplete/onError is correctly emitted to all observers, even if it - * comes in while the onComplete/onError is being propagated. - */ - state.SUBSCRIPTION_LOCK.lock(); - try { - if (state.completed.get()) { - emitNotification(state.currentValue.get(), observer); - if (onEach != null) { - onEach.call(state, observer); - } - return Subscriptions.empty(); - } else { - // the subject is not completed so we subscribe - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - subscription.wrap(new Subscription() { - @Override - public void unsubscribe() { - // on unsubscribe remove it from the map of outbound observers to notify - state.observers.remove(subscription); + public PartialSubscription onGetSubscription() { + final Object marker = new Object(); + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + /* + * Subscription needs to be synchronized with terminal states to ensure + * race conditions are handled. When subscribing we must make sure + * onComplete/onError is correctly emitted to all observers, even if it + * comes in while the onComplete/onError is being propagated. + */ + state.SUBSCRIPTION_LOCK.lock(); + try { + if (state.completed.get()) { + emitNotification(state.currentValue.get(), observer); + if (onEach != null) { + onEach.call(state, observer); + } + } else { + // on subscribe add it to the map of outbound observers to notify + state.observers.put(marker, observer); + + // invoke onSubscribe logic + if (onEach != null) { + onEach.call(state, observer); + } } - }); - - // on subscribe add it to the map of outbound observers to notify - state.observers.put(subscription, observer); - - // invoke onSubscribe logic - if (onEach != null) { - onEach.call(state, observer); + } finally { + state.SUBSCRIPTION_LOCK.unlock(); } - - return subscription; } - } finally { - state.SUBSCRIPTION_LOCK.unlock(); - } - + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + state.observers.remove(marker); + } + }); } - }; } @@ -110,7 +102,7 @@ protected static void emitNotification(Notification value, Observer void emitNotification(final SubjectState state, final Action2, Observer> onEach) { - for (Subscription s : snapshotOfObservers(state)) { + for (Object s : snapshotOfObservers(state)) { Observer o = state.observers.get(s); // emit notifications to this observer emitNotification(state.currentValue.get(), o); @@ -133,7 +125,7 @@ protected void emitNotificationAndTerminate(final SubjectState state, final A state.SUBSCRIPTION_LOCK.lock(); try { if (state.completed.compareAndSet(false, true)) { - for (Subscription s : snapshotOfObservers(state)) { + for (Object s : snapshotOfObservers(state)) { Observer o = state.observers.get(s); // emit notifications to this observer emitNotification(state.currentValue.get(), o); @@ -161,7 +153,7 @@ protected void emitNotificationAndTerminate(final SubjectState state, final A * * @return List> */ - private static Collection snapshotOfObservers(final SubjectState state) { - return new ArrayList(state.observers.keySet()); + private static Collection snapshotOfObservers(final SubjectState state) { + return new ArrayList(state.observers.keySet()); } } diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index a9c18be805..dd776ea18f 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -17,6 +17,7 @@ import rx.Notification; import rx.Observer; +import rx.Observable.OnGetSubscriptionFunc; import rx.util.functions.Action2; /** @@ -57,7 +58,7 @@ public class AsyncSubject extends AbstractSubject { */ public static AsyncSubject create() { final SubjectState state = new SubjectState(); - OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() { + OnGetSubscriptionFunc onSubscribe = getOnGetSubscriptionFunc(state, new Action2, Observer>() { @Override public void call(SubjectState state, Observer o) { @@ -76,8 +77,8 @@ public void call(SubjectState state, Observer o) { private final SubjectState state; - protected AsyncSubject(OnSubscribeFunc onSubscribe, SubjectState state) { - super(onSubscribe); + protected AsyncSubject(OnGetSubscriptionFunc onGetSubscription, SubjectState state) { + super(onGetSubscription); this.state = state; } diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 053a51f0cf..720ebb2dc7 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -17,6 +17,7 @@ import rx.Notification; import rx.Observer; +import rx.Observable.OnGetSubscriptionFunc; import rx.util.functions.Action2; /** @@ -72,7 +73,7 @@ public static BehaviorSubject create(T defaultValue) { final SubjectState state = new SubjectState(); // set a default value so subscriptions will immediately receive this until a new notification is received state.currentValue.set(new Notification(defaultValue)); - OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() { + OnGetSubscriptionFunc onSubscribe = getOnGetSubscriptionFunc(state, new Action2, Observer>() { @Override public void call(SubjectState state, Observer o) { @@ -88,8 +89,8 @@ public void call(SubjectState state, Observer o) { private final SubjectState state; - protected BehaviorSubject(OnSubscribeFunc onSubscribe, SubjectState state) { - super(onSubscribe); + protected BehaviorSubject(OnGetSubscriptionFunc onGetSubscription, SubjectState state) { + super(onGetSubscription); this.state = state; } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 8cf2d75747..16c7d8ee66 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -17,6 +17,7 @@ import rx.Notification; import rx.Observer; +import rx.Observable.OnGetSubscriptionFunc; /** * Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber. @@ -44,14 +45,14 @@ public class PublishSubject extends AbstractSubject { public static PublishSubject create() { final SubjectState state = new SubjectState(); - OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, null); - return new PublishSubject(onSubscribe, state); + OnGetSubscriptionFunc onGetSubscription = getOnGetSubscriptionFunc(state, null); + return new PublishSubject(onGetSubscription, state); } private final SubjectState state; - protected PublishSubject(OnSubscribeFunc onSubscribe, SubjectState state) { - super(onSubscribe); + protected PublishSubject(OnGetSubscriptionFunc onGetSubscription, SubjectState state) { + super(onGetSubscription); this.state = state; } diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 13d0cc3b57..358fd4003f 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -16,15 +16,12 @@ package rx.subjects; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import rx.Notification; import rx.Observer; -import rx.Subscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Func1; /** * Subject that retains all events and will replay them to an {@link Observer} that subscribes. @@ -33,140 +30,95 @@ *

* Example usage: *

- *

 {@code
-
- * eplaySubject subject = ReplaySubject.create();
-  subject.onNext("one");
-  subject.onNext("two");
-  subject.onNext("three");
-  subject.onCompleted();
-
-  // both of the following will get the onNext/onCompleted calls from above
-  subject.subscribe(observer1);
-  subject.subscribe(observer2);
-
-  } 
+ * 
+ * 
+ * {
+ *     @code
+ *     eplaySubject<Object> subject = ReplaySubject.create();
+ *     subject.onNext("one");
+ *     subject.onNext("two");
+ *     subject.onNext("three");
+ *     subject.onCompleted();
+ * 
+ *     // both of the following will get the onNext/onCompleted calls from above
+ *     subject.subscribe(observer1);
+ *     subject.subscribe(observer2);
+ * 
+ * }
+ * 
* * @param */ public final class ReplaySubject extends Subject { - - private boolean isDone = false; - private Throwable exception = null; - private final Map> subscriptions = new HashMap>(); - private final List history = Collections.synchronizedList(new ArrayList()); + private final Map> subscriptions = new HashMap>(); + private final List> history = new ArrayList>(); public static ReplaySubject create() { - return new ReplaySubject(new DelegateSubscriptionFunc()); + return new ReplaySubject(new ReplayOnGetSubscription()); } - private ReplaySubject(DelegateSubscriptionFunc onSubscribe) { - super(onSubscribe); - onSubscribe.wrap(new SubscriptionFunc()); + private ReplaySubject(ReplayOnGetSubscription onGetSubscription) { + super(onGetSubscription); + onGetSubscription.history = history; + onGetSubscription.subscriptions = subscriptions; } - private static final class DelegateSubscriptionFunc implements OnSubscribeFunc - { - private Func1, ? extends Subscription> delegate = null; - - public void wrap(Func1, ? extends Subscription> delegate) - { - if (this.delegate != null) { - throw new UnsupportedOperationException("delegate already set"); - } - this.delegate = delegate; - } + private static final class ReplayOnGetSubscription implements OnGetSubscriptionFunc { + private Map> subscriptions; + private List> history; @Override - public Subscription onSubscribe(Observer observer) - { - return delegate.call(observer); - } - } - - private class SubscriptionFunc implements Func1, Subscription> - { - @Override - public Subscription call(Observer observer) { - int item = 0; - Subscription subscription; - - for (;;) { - while (item < history.size()) { - observer.onNext(history.get(item++)); - } - - synchronized (subscriptions) { - if (item < history.size()) { - continue; - } - - if (exception != null) { - observer.onError(exception); - return Subscriptions.empty(); + public PartialSubscription onGetSubscription() { + final Object marker = new Object(); + return PartialSubscription.create(new OnPartialSubscribeFunc() { + @Override + public void onSubscribe(Observer observer) { + int item = 0; + + for (;;) { + while (item < history.size()) { + history.get(item++).accept(observer); + } + + synchronized (subscriptions) { + if (item < history.size()) { + continue; + } + subscriptions.put(marker, observer); + break; + } } - if (isDone) { - observer.onCompleted(); - return Subscriptions.empty(); - } - - subscription = new RepeatSubjectSubscription(); - subscriptions.put(subscription, observer); - break; } - } - - return subscription; + }, new OnPartialUnsubscribeFunc() { + @Override + public void onUnsubscribe() { + subscriptions.remove(marker); + } + }); } } - private class RepeatSubjectSubscription implements Subscription - { - @Override - public void unsubscribe() - { - synchronized (subscriptions) { - subscriptions.remove(this); - } - } + @Override + public void onCompleted() { + propgate(new Notification()); } @Override - public void onCompleted() - { - synchronized (subscriptions) { - isDone = true; - for (Observer observer : new ArrayList>(subscriptions.values())) { - observer.onCompleted(); - } - subscriptions.clear(); - } + public void onError(Throwable e) { + propgate(new Notification(e)); } @Override - public void onError(Throwable e) - { - synchronized (subscriptions) { - if (isDone) { - return; - } - isDone = true; - exception = e; - for (Observer observer : new ArrayList>(subscriptions.values())) { - observer.onError(e); - } - subscriptions.clear(); - } + public void onNext(T args) { + propgate(new Notification(args)); } - @Override - public void onNext(T args) - { + public void propgate(Notification n) { synchronized (subscriptions) { - history.add(args); + history.add(n); for (Observer observer : new ArrayList>(subscriptions.values())) { - observer.onNext(args); + n.accept(observer); } } } diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java index 0fb675c4ea..7a08b59d2b 100644 --- a/rxjava-core/src/main/java/rx/subjects/Subject.java +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -19,7 +19,7 @@ import rx.Observer; public abstract class Subject extends Observable implements Observer { - protected Subject(OnSubscribeFunc onSubscribe) { + protected Subject(OnGetSubscriptionFunc onSubscribe) { super(onSubscribe); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java b/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java index 8d8be93dcb..fbb44fa2c2 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java @@ -15,6 +15,7 @@ */ package rx.operators; +import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import static rx.operators.OperationToObservableIterable.*; @@ -26,6 +27,7 @@ import rx.Observable; import rx.Observer; +import rx.util.functions.Func1; public class OperationToObservableIterableTest { @@ -42,4 +44,15 @@ public void testIterable() { verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } + + @Test + public void testInterruptable() { + Func1 expensive = mock(Func1.class); + when(expensive.call(any(Integer.class))).thenReturn(10); + + int v = Observable.from(1, 2, 3).map(expensive).take(1).toBlockingObservable().single(); + + assertEquals(10, v); + verify(expensive, times(1)).call(any(Integer.class)); + } }