Skip to content

Changes to allow the initiating of work on an Observable to be done after the Subscription is available. #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
644 changes: 404 additions & 240 deletions rxjava-core/src/main/java/rx/Observable.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/Observer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ public interface Observer<T> {
*
* @param args
*/
public void onNext(T args);
public void onNext(T data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@

public abstract class ConnectableObservable<T> extends Observable<T> {

protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
super(onSubscribe);
protected ConnectableObservable(OnGetSubscriptionFunc<T> onGetSubscription) {
super(onGetSubscription);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
public class GroupedObservable<K, T> extends Observable<T> {
private final K key;

public GroupedObservable(K key, OnSubscribeFunc<T> onSubscribe) {
super(onSubscribe);
public GroupedObservable(K key, OnGetSubscriptionFunc<T> onGetSubscription) {
super(onGetSubscription);
this.key = key;
}

Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperationCast.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.OnGetSubscriptionFunc;
import rx.util.functions.Func1;

/**
* Converts the elements of an observable sequence to the specified type.
*/
public class OperationCast {

public static <T, R> OnSubscribeFunc<R> cast(
public static <T, R> OnGetSubscriptionFunc<R> cast(
Observable<? extends T> source, final Class<R> klass) {
return OperationMap.map(source, new Func1<T, R>() {
public R call(T t) {
Expand Down
32 changes: 16 additions & 16 deletions rxjava-core/src/main/java/rx/operators/OperationGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,21 @@ private static class GroupedSubject<K, T> extends GroupedObservable<K, T> implem

static <K, T> GroupedSubject<K, T> create(final K key, final GroupBy<K, T> parent) {
final AtomicReference<Observer<? super T>> subscribedObserver = new AtomicReference<Observer<? super T>>(OperationGroupBy.<T> emptyObserver());
return new GroupedSubject<K, T>(key, new OnSubscribeFunc<T>() {

private final SafeObservableSubscription subscription = new SafeObservableSubscription();
return new GroupedSubject<K, T>(key, new OnGetSubscriptionFunc<T>() {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
// register Observer
subscribedObserver.set(observer);

parent.subscribeKey(key);

return subscription.wrap(new Subscription() {
public PartialSubscription<T> onGetSubscription() {
return PartialSubscription.create(new OnPartialSubscribeFunc<T>() {
@Override
public void unsubscribe() {
// we remove the Observer so we stop emitting further events (they will be ignored if parent continues to send)
public void onSubscribe(Observer<? super T> observer) {
subscribedObserver.set(observer);
parent.subscribeKey(key);
}
}, new OnPartialUnsubscribeFunc() {
@Override
public void onUnsubscribe() {
// we remove the Observer so we stop emitting further events (they will
// be ignored if parent continues to send)
subscribedObserver.set(OperationGroupBy.<T> emptyObserver());
// now we need to notify the parent that we're unsubscribed
parent.unsubscribeKey(key);
Expand All @@ -188,8 +188,8 @@ public void unsubscribe() {

private final AtomicReference<Observer<? super T>> subscribedObserver;

public GroupedSubject(K key, OnSubscribeFunc<T> onSubscribe, AtomicReference<Observer<? super T>> subscribedObserver) {
super(key, onSubscribe);
public GroupedSubject(K key, OnGetSubscriptionFunc<T> onGetSubscription, AtomicReference<Observer<? super T>> subscribedObserver) {
super(key, onGetSubscription);
this.subscribedObserver = subscribedObserver;
}

Expand All @@ -214,12 +214,12 @@ private static <T> Observer<T> emptyObserver() {
return new Observer<T>() {
@Override
public void onCompleted() {
// do nothing
// do nothing
}

@Override
public void onError(Throwable e) {
// do nothing
// do nothing
}

@Override
Expand Down
21 changes: 16 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@
import java.util.Map;

import rx.Observable;
import rx.Observable.OnGetSubscriptionFunc;
import rx.Observable.OnPartialSubscribeFunc;
import rx.Observable.OnPartialUnsubscribeFunc;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.PartialSubscription;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
Expand Down Expand Up @@ -197,11 +200,19 @@ public void onCompleted() {

}
}
protected static <T> OnSubscribeFunc<T> neverSubscribe() {
return new OnSubscribeFunc<T>() {
protected static <T> OnGetSubscriptionFunc<T> neverSubscribe() {
return new OnGetSubscriptionFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return Subscriptions.empty();
public PartialSubscription<T> onGetSubscription() {
return PartialSubscription.create(new OnPartialSubscribeFunc<T>() {
@Override
public void onSubscribe(Observer<? super T> observer) {
}
}, new OnPartialUnsubscribeFunc() {
@Override
public void onUnsubscribe() {
}
});
}
};
}
Expand Down
79 changes: 53 additions & 26 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
*/
package rx.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observable.OnGetSubscriptionFunc;
import rx.Observable.OnPartialSubscribeFunc;
import rx.Observable.OnPartialUnsubscribeFunc;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.PartialSubscription;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
Expand All @@ -42,9 +49,10 @@ public final class OperationMap {
* the type of the input sequence.
* @param <R>
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
* @return a sequence that is the result of applying the transformation function to each item in
* the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
public static <T, R> OnGetSubscriptionFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
return mapWithIndex(sequence, new Func2<T, Integer, R>() {
@Override
public R call(T value, @SuppressWarnings("unused") Integer unused) {
Expand All @@ -60,26 +68,30 @@ public R call(T value, @SuppressWarnings("unused") Integer unused) {
* @param sequence
* the input sequence.
* @param func
* a function to apply to each item in the sequence. The function gets the index of the emitted item
* a function to apply to each item in the sequence. The function gets the index of
* the emitted item
* as additional parameter.
* @param <T>
* the type of the input sequence.
* @param <R>
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
* @return a sequence that is the result of applying the transformation function to each item in
* the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> mapWithIndex(final Observable<? extends T> sequence, final Func2<? super T, Integer, ? extends R> func) {
return new OnSubscribeFunc<R>() {
public static <T, R> OnGetSubscriptionFunc<R> mapWithIndex(final Observable<? extends T> sequence, final Func2<? super T, Integer, ? extends R> func) {
return new OnGetSubscriptionFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return new MapObservable<T, R>(sequence, func).onSubscribe(observer);
public PartialSubscription<R> onGetSubscription() {
return new MapObservable<T, R>(sequence, func).onGetSubscription();
}
};
}

/**
* Accepts a sequence of observable sequences and a transformation function. Returns a flattened sequence that is the result of
* applying the transformation function to each item in the sequence of each observable sequence.
* Accepts a sequence of observable sequences and a transformation function. Returns a flattened
* sequence that is the result of
* applying the transformation function to each item in the sequence of each observable
* sequence.
* <p>
* The closure should return an Observable which will then be merged.
*
Expand All @@ -91,21 +103,23 @@ public Subscription onSubscribe(Observer<? super R> observer) {
* the type of the input sequence.
* @param <R>
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
* @return a sequence that is the result of applying the transformation function to each item in
* the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> mapMany(Observable<? extends T> sequence, Func1<? super T, ? extends Observable<? extends R>> func) {
return OperationMerge.merge(Observable.create(map(sequence, func)));
}

/**
* An observable sequence that is the result of applying a transformation to each item in an input sequence.
* An observable sequence that is the result of applying a transformation to each item in an
* input sequence.
*
* @param <T>
* the type of the input sequence.
* @param <R>
* the type of the output sequence.
*/
private static class MapObservable<T, R> implements OnSubscribeFunc<R> {
private static class MapObservable<T, R> implements OnGetSubscriptionFunc<R> {
public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer, ? extends R> func) {
this.sequence = sequence;
this.func = func;
Expand All @@ -116,25 +130,38 @@ public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer,
private int index;

@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
@Override
public void onNext(T value) {
observer.onNext(func.call(value, index));
index++;
}
public PartialSubscription<R> onGetSubscription() {
final AtomicReference<PartialSubscription<? extends T>> subscription = new AtomicReference<PartialSubscription<? extends T>>();
return PartialSubscription.create(new OnPartialSubscribeFunc<R>() {

@Override
public void onError(Throwable ex) {
observer.onError(ex);
public void onSubscribe(final Observer<? super R> observer) {
subscription.set(sequence.getSubscription());
subscription.get().subscribe(new Observer<T>() {
@Override
public void onNext(T value) {
observer.onNext(func.call(value, index));
index++;
}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
observer.onCompleted();
}
});
}
}, new OnPartialUnsubscribeFunc() {

@Override
public void onCompleted() {
observer.onCompleted();
public void onUnsubscribe() {
subscription.get().unsubscribe();
}
})));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ private static class MulticastConnectableObservable<T, R> extends ConnectableObs
private Subscription subscription;

public MulticastConnectableObservable(Observable<? extends T> source, final Subject<? super T, ? extends R> subject) {
super(new OnSubscribeFunc<R>() {
super(new OnGetSubscriptionFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return subject.subscribe(observer);
public PartialSubscription<R> onGetSubscription() {
return (PartialSubscription<R>) subject.getSubscription();
}
});
this.source = source;
Expand Down
Loading