Skip to content

1.x: Added Single execution hooks #3696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import rx.observers.SerializedSubscriber;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSingleExecutionHook;
import rx.schedulers.Schedulers;
import rx.singles.BlockingSingle;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -101,7 +102,7 @@ private Single(final Observable.OnSubscribe<T> f) {
this.onSubscribe = f;
}

static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
static RxJavaSingleExecutionHook hook = RxJavaPlugins.getInstance().getSingleExecutionHook();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove final?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I just saw your last comment.


/**
* Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or
Expand Down Expand Up @@ -130,7 +131,7 @@ private Single(final Observable.OnSubscribe<T> f) {
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
*/
public static <T> Single<T> create(OnSubscribe<T> f) {
return new Single<T>(f); // TODO need hook
return new Single<T>(hook.onCreate(f));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs test

}

/**
Expand Down Expand Up @@ -1570,14 +1571,12 @@ public final void onNext(T args) {
* @param subscriber
* the Subscriber that will handle the emission or notification from the Single
*/
public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strive to be as much as possible compatible with Observable type. Do you have some proposition how to handle it? If we remove Subscription as return type then we won't be able to decorate or replace the Subscription instance in hook.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right right, I mean that we will have to release it as 1.2 instead of 1.1.x, I guess.

Maintainers: what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafesubscribe is called by operators and they don't use the returned Subscription. These unsubscribe the Subscriber.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single isn't stable API so this is acceptable if it makes the API better.

On Fri, Feb 19, 2016, 10:46 AM Artem Zinnatullin [email protected]
wrote:

In src/main/java/rx/Single.java
#3696 (comment):

@@ -1569,14 +1569,12 @@ public final void onNext(T args) {
* @param subscriber
* the Subscriber that will handle the emission or notification from the Single
*/

  • public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
  • public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {

Looks like this is incompatible binary change
https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.15


Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/pull/3696/files#r53476625.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k, let's just mention this as kind of warning in changelog

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a breaking binary change, but thanks to the fact that Single is not yet marked as stable, I vote for making this change.

try {
// new Subscriber so onStart it
subscriber.onStart();
// TODO add back the hook
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
onSubscribe.call(subscriber);
hook.onSubscribeReturn(subscriber);
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs test

return hook.onSubscribeReturn(subscriber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs test

} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
Expand All @@ -1594,6 +1593,7 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs test

}
}

Expand Down Expand Up @@ -1685,9 +1685,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
// TODO add back the hook
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
onSubscribe.call(subscriber);
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs test

return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class RxJavaPlugins {

private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
private final AtomicReference<RxJavaSingleExecutionHook> singleExecutionHook = new AtomicReference<RxJavaSingleExecutionHook>();
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();

/**
Expand All @@ -68,6 +69,7 @@ public static RxJavaPlugins getInstance() {
/* package accessible for unit tests */void reset() {
INSTANCE.errorHandler.set(null);
INSTANCE.observableExecutionHook.set(null);
INSTANCE.singleExecutionHook.set(null);
INSTANCE.schedulersHook.set(null);
}

Expand Down Expand Up @@ -156,6 +158,48 @@ public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl)
}
}

/**
* Retrieves the instance of {@link RxJavaSingleExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerSingleExecutionHook(RxJavaSingleExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaSingleExecutionHook.implementation} with the
* full classname to load.
*
* @return {@link RxJavaSingleExecutionHook} implementation to use
*/
public RxJavaSingleExecutionHook getSingleExecutionHook() {
if (singleExecutionHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaSingleExecutionHook.class, System.getProperties());
if (impl == null) {
// nothing set via properties so initialize with default
singleExecutionHook.compareAndSet(null, RxJavaSingleExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
singleExecutionHook.compareAndSet(null, (RxJavaSingleExecutionHook) impl);
}
}
return singleExecutionHook.get();
}

/**
* Register an {@link RxJavaSingleExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaSingleExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
*/
public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) {
if (!singleExecutionHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
}
}

/* test */ static Object getPluginImplementationViaProperty(Class<?> pluginClass, Properties props) {
final String classSimpleName = pluginClass.getSimpleName();
/*
Expand Down
120 changes: 120 additions & 0 deletions src/main/java/rx/plugins/RxJavaSingleExecutionHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

import rx.Observable;
import rx.Single;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;

/**
* Abstract ExecutionHook with invocations at different lifecycle points of {@link Single} execution with a
* default no-op implementation.
* <p>
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
* <p>
* <b>Note on thread-safety and performance:</b>
* <p>
* A single implementation of this class will be used globally so methods on this class will be invoked
* concurrently from multiple threads so all functionality must be thread-safe.
* <p>
* Methods are also invoked synchronously and will add to execution time of the single so all behavior
* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
* worker threads.
*
*/
public abstract class RxJavaSingleExecutionHook {
/**
* Invoked during the construction by {@link Single#create(Single.OnSubscribe)}
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass-thru the function.
*
* @param f
* original {@link Single.OnSubscribe}<{@code T}> to be executed
* @return {@link Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T> Single.OnSubscribe<T> onCreate(Single.OnSubscribe<T> f) {
return f;
}

/**
* Invoked before {@link Single#subscribe(Subscriber)} is about to be executed.
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass-thru the function.
*
* @param onSubscribe
* original {@link Observable.OnSubscribe}<{@code T}> to be executed
* @return {@link Observable.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T> Observable.OnSubscribe<T> onSubscribeStart(Single<? extends T> singleInstance, final Observable.OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}

/**
* Invoked after successful execution of {@link Single#subscribe(Subscriber)} with returned
* {@link Subscription}.
* <p>
* This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging,
* metrics and other such things and pass-thru the subscription.
*
* @param subscription
* original {@link Subscription}
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
* pass-thru
*/
public <T> Subscription onSubscribeReturn(Subscription subscription) {
// pass-thru by default
return subscription;
}

/**
* Invoked after failed execution of {@link Single#subscribe(Subscriber)} with thrown Throwable.
* <p>
* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
*
* @param e
* Throwable thrown by {@link Single#subscribe(Subscriber)}
* @return Throwable that can be decorated, replaced or just returned as a pass-thru
*/
public <T> Throwable onSubscribeError(Throwable e) {
// pass-thru by default
return e;
}

/**
* Invoked just as the operator functions is called to bind two operations together into a new
* {@link Single} and the return value is used as the lifted function
* <p>
* This can be used to decorate or replace the {@link Observable.Operator} instance or just perform extra
* logging, metrics and other such things and pass-thru the onSubscribe.
*
* @param lift
* original {@link Observable.Operator}{@code <R, T>}
* @return {@link Observable.Operator}{@code <R, T>} function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T, R> Observable.Operator<? extends R, ? super T> onLift(final Observable.Operator<? extends R, ? super T> lift) {
return lift;
}
}
28 changes: 28 additions & 0 deletions src/main/java/rx/plugins/RxJavaSingleExecutionHookDefault.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

/**
* Default no-op implementation of {@link RxJavaSingleExecutionHook}
*/
class RxJavaSingleExecutionHookDefault extends RxJavaSingleExecutionHook {

private static final RxJavaSingleExecutionHookDefault INSTANCE = new RxJavaSingleExecutionHookDefault();

public static RxJavaSingleExecutionHook getInstance() {
return INSTANCE;
}
}
Loading