Skip to content

1.x: just() now supports backpressure (+ related fixes/changes) #3614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8330,7 +8330,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
return create(new OperatorSubscribeOn<T>(this, scheduler));
}

/**
Expand Down
39 changes: 37 additions & 2 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1698,8 +1698,43 @@ public void onNext(T t) {
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
public final Single<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
public final Single<T> subscribeOn(final Scheduler scheduler) {
return create(new OnSubscribe<T>() {
@Override
public void call(final SingleSubscriber<? super T> t) {
final Scheduler.Worker w = scheduler.createWorker();
t.add(w);

w.schedule(new Action0() {
@Override
public void call() {
SingleSubscriber<T> ssub = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
try {
t.onSuccess(value);
} finally {
w.unsubscribe();
}
}

@Override
public void onError(Throwable error) {
try {
t.onError(error);
} finally {
w.unsubscribe();
}
}
};

t.add(ssub);

Single.this.subscribe(ssub);
}
});
}
});
}

/**
Expand Down
124 changes: 56 additions & 68 deletions src/main/java/rx/internal/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,96 +15,84 @@
*/
package rx.internal.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.functions.Action0;

/**
* Subscribes Observers on the specified {@code Scheduler}.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/subscribeOn.png" alt="">
*
* @param <T> the value type of the actual source
*/
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

private final Scheduler scheduler;
final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Scheduler scheduler) {
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}


inner.schedule(new Action0() {
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {

@Override
public void call() {
producer.request(n);
}
});
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
}

});
});
}
}

});
}
});
};

source.unsafeSubscribe(s);
}

};
});
}
}
}
Loading