Description
#0.17.0 Release Notes
Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.
The new signatures related to Observable
in this release are:
// A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
public final static <T> Observable<T> create(OnSubscribe<T> f)
// The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
// Subscriber is an Observer + Subscription
public abstract class Subscriber<T> implements Observer<T>, Subscription
// The main `subscribe` behavior receives a Subscriber instead of Observer
public final Subscription subscribe(Subscriber<? super T> observer)
// Subscribing with an Observer however is still appropriate
// and the Observer is automatically converted into a Subscriber
public final Subscription subscribe(Observer<? super T> observer)
// A new 'lift' function allows composing Operator implementations together
public <R> Observable<R> lift(Func1<Subscriber<? super R>, Subscriber<? super T>> lift)
Also changed is the Scheduler
interface which is much simpler:
public abstract class Scheduler {
public Subscription schedule(Action1<Scheduler.Inner> action);
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
public long now();
public int degreeOfParallelism();
public static class Inner implements Subscription {
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Scheduler.Inner> action);
public long now();
}
}
This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.
As shown in the code above the changes fall into 2 major sections:
1) Lift/OnSubscribe/Subscriber
Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.
2) Schedulers
Simplification of the Scheduler
interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.
Lift/OnSubscribe/Subscriber
New types Subscriber
and OnSubscribe
along with the new lift
operator have been added. The reasons and benefits are as follows:
1) Synchronous Unsubscribe
RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Observer<? super Integer> Observer) {
for (int i = 1; i < 1000000; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Subscribing to this Observable
will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10)
.
Version 0.17.0 fixes this issue by injecting the Subscription
into the OnSubscribe
function to allow code like this:
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// we now receive a Subscriber instead of Observer
for (int i = 1; i < 1000000; i++) {
// the OnSubscribe can now check for isUnsubscribed
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
Subscribing to this will now correctly only emit 10 onNext
and unsubscribe:
// subscribe with an Observer
oi.take(10).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
println("Received: " + t);
}
})
Or the new Subscriber
type can be used and the Subscriber
itself can unsubscribe
:
// or subscribe with a Subscriber which supports unsubscribe
oi.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer t) {
println("Received: " + t);
if(t >= 10) {
// a Subscriber can unsubscribe
this.unsubscribe();
}
}
})
2) Custom Operator Chaining
Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable
is using static methods. This has meant code like this:
MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
In reality we want:
observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
Using the newly added lift
we can get quite close to this:
observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
Here is how the proposed lift
method looks if all operators were applied with it:
Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
Along with the lift
function comes a new Operator
signature:
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
All operator implementations in the rx.operators
package will over time be migrated to this new signature.
3) Simpler Operator Implementations
The lift
operator injects the necessary Observer
and Subscription
instances (via the new Subscriber
type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription
is available in-scope there are no awkward coding patterns needed for creating a Subscription
, closing over it and returning and taking into account synchronous vs asynchronous.
For example, the body of fromIterable
is simply:
public void call(Subscriber<? super T> o) {
for (T i : is) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(i);
}
o.onCompleted();
}
The take
operator is:
public final class OperatorTake<T> implements Operator<T, T> {
final int limit;
public OperatorTake(int limit) {
this.limit = limit;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> o) {
CompositeSubscription parent = new CompositeSubscription();
if (limit == 0) {
o.onCompleted();
parent.unsubscribe();
}
return new Subscriber<T>(parent) {
int count = 0;
boolean completed = false;
@Override
public void onCompleted() {
if (!completed) {
o.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!completed) {
o.onError(e);
}
}
@Override
public void onNext(T i) {
if (!isUnsubscribed()) {
o.onNext(i);
if (++count >= limit) {
completed = true;
o.onCompleted();
unsubscribe();
}
}
}
};
}
}
4) Recursion/Loop Performance
The fromIterable
use case is 20x faster when implemented as a loop instead of recursive scheduler (see a18b8c1).
Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.
Schedulers
Schedulers were greatly simplified to a design based around Action1<Inner>
.
public abstract class Scheduler {
public Subscription schedule(Action1<Scheduler.Inner> action);
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
public long now();
public int degreeOfParallelism();
public static class Inner implements Subscription {
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Scheduler.Inner> action);
public long now();
}
}
This design change originated from three findings:
- It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.
To solve this the new design explicitly has the outer Scheduler
and then Scheduler.Inner
for recursion.
- The passing of state is not useful since scheduling over network boundaries with this model does not work.
In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler
that attempted to use observeOn
to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.
- The number of overloads with different ways of doing the same things were confusing.
This new design removes all but the essential and simplest methods.
- A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.
This new design applies similar principles as done with lift
/create
/OnSubscribe
/Subscriber
and injects the Subscription
via the Inner
interface so a running task can check isUnsubscribed()
.
WIth this new design, the simplest execution of a single task is:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
}
});
Recursion is easily invoked:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
inner.schedule(this);
}
});
The use of Action1<Inner>
on both the outer and inner levels makes it so recursion that refer to this
and it works easily.
Similar to the new lift
/create
pattern with Subscriber
the Inner
is also a Subscription
so it allows efficient loops with unsubscribe
support:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
doWork();
}
}
});
An action can now unsubscribe
the Scheduler.Inner
:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
int i = doOtherWork();
if(i > 100) {
// an Action can cause the Scheduler to unsubscribe and stop
inner.unsubscribe();
}
}
}
});
Typically just stopping is sufficient:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = doOtherWork();
if (i < 10) {
// recurse until done 10
inner.schedule(this);
}
}
});
but if other work in other tasks is being done and you want to unsubscribe conditionally you could:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = doOtherWork();
if (i < 10) {
// recurse until done 10
inner.schedule(this);
} else {
inner.unsubscribe();
}
}
});
and the recursion can be delayed:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed ... but delay the recursion
inner.schedule(this, 500, TimeUnit.MILLISECONDS);
}
});
The methods on the Inner
never return a Subscription
because they are always a single thread/event-loop/actor/etc and controlled by the Subscription
returned by the initial Scheduler.schedule
method. This is part of clarifying the contract.
Thus an unsubscribe
controlled from the outside would be done like this:
Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
doWork();
}
}
});
// unsubscribe from outside
s.unsubscribe();
Migration Path
1) Lift/OnSubscribe/Subscriber
The lift
function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber
type is also additive and for most use cases does not need to be used directly, the Observer
interface can continue being used.
The previous create(OnSubscribeFunc f)
signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.
Code such as this:
Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> o) {
o.onNext(1);
o.onNext(2);
o.onCompleted();
return Subscriptions.empty();
}
});
should change to this:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
}
});
If concurrency was being injected:
Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(final Observer<? super Integer> o) {
final BooleanSubscription s = new BooleanSubscription();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (s.isUnsubscribed()) {
o.onNext(i++);
}
}
});
t.start();
return s;
}
});
you may no longer need it and can implement like this instead:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
while (subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
or can just simplify the Subscription
management:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
t.start();
}
});
or use a Scheduler
:
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
Schedulers.io().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
int i = 0;
while (subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
});
}
});
or use subscribeOn
which now works to make synchronous Observables
async while supporting unsubscribe
(this didn't work before):
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
while (subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
}
}).subscribeOn(Schedulers.newThread());
2) Schedulers
Custom Scheduler
implementations will need to be re-implemented and any direct use of the Scheduler
interface will also need to be updated.
3) Subscription
If you have custom Subscription
implementations you will see they now need an isUnsubscribed()
method.
You can either add this method, or wrap your function using Subscriptions.create
and it will handle the isUnsubscribed
behavior and execute your function when unsubscribe()
is called.
The Future...
This is hopefully the last of the major refactors to rxjava-core and we're approaching version 1.0. We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create
/subscribe
signatures as we want and the Subscription
and Scheduler
interfaces are now clean.
We need to improve on some of the Subject
implementations still, particularly ReplaySubject
. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.
We appreciate your usage, feedback and contributions and hope the library is creating value for you!