From cdf7ff9ce6ae5347dfa261a101283c7d43bd8cc6 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 7 Feb 2015 21:30:52 -0800 Subject: [PATCH] rx.Task Adds `rx.Task` as a "scalar Observable" for representing work with a single return value. See https://github.com/ReactiveX/RxJava/issues/1594 rx.Future/Task This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Task`s can be combined into an `Observable` stream. Note how `Task.zip` returns `Task` whereas `Task.merge` returns `Observable`. NOTE: This is for experimentation and feedback at this time. Items requiring review and work that I'm particularly aware of: - naming of `OnExecute` - naming of `TaskObserver` (this one in particular I don't like) - design and implementation of `Task.Promise` - should the public `lift` use the `Observable.Operator` or should that only be for internal reuse? - should we have a public `lift` that uses a `Task.Operator`? - the `Task.toObservable` implementation right now is efficient but will likely break something so it likely needs to change to use `subscribe` - implementation of this merge variant: `Task merge(final Task> source)` - several operators currently just wrap as `Observable` to reuse existing operators ... is that okay performance wise? - Javadocs Examples of using this class: ```java import rx.Observable; import rx.Task; import rx.Task.Promise; public class TaskExamples { public static void main(String... args) { // scalar synchronous value Task t1 = Task.create(t -> { t.onSuccess("Hello World!"); }); // scalar synchronous value using helper method Task t2 = Task.just(1); // synchronous error Task error = Task.create(t -> { t.onError(new RuntimeException("failed!")); }); // executing t1.subscribe(System.out::println); t2.subscribe(System.out::println); error.subscribe(System.out::println, e -> System.out.println(e.getMessage())); // scalar Tasks for request/response like a Future getData(1).subscribe(System.out::println); getDataUsingPromise(2).subscribe(System.out::println); // combining Tasks into another Task Task zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b); // combining Tasks into an Observable stream Observable merged = Task.merge(t1, t2.map(String::valueOf), getData(3)); Observable mergeWith = t1.mergeWith(t2.map(String::valueOf)); zipped.subscribe(v -> System.out.println("zipped => " + v)); merged.subscribe(v -> System.out.println("merged => " + v)); mergeWith.subscribe(v -> System.out.println("mergeWith => " + v)); } /** * Example of an async scalar execution using Task.create *

* This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar. * * @param arg * @return */ public static Task getData(int arg) { return Task.create(s -> { new Thread(() -> { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } // deliver value s.onSuccess("Data=" + arg); }).start(); }); } /** * Example of an async scalar execution using a Task.Promise *

* This shows how an eager (hot) process would work like using a Future. * * @param arg * @return */ public static Task getDataUsingPromise(int arg) { Task.Promise p = Promise.create(); new Thread(() -> { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } // deliver value p.onSuccess("Data=" + arg); }).start(); return p.getTask(); } } ``` --- src/main/java/rx/Task.java | 1950 ++++++++++++++++++++++++++++++++ src/test/java/rx/TaskTest.java | 196 ++++ 2 files changed, 2146 insertions(+) create mode 100644 src/main/java/rx/Task.java create mode 100644 src/test/java/rx/TaskTest.java diff --git a/src/main/java/rx/Task.java b/src/main/java/rx/Task.java new file mode 100644 index 0000000000..7de0d57061 --- /dev/null +++ b/src/main/java/rx/Task.java @@ -0,0 +1,1950 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package rx; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import rx.Observable.OnSubscribe; +import rx.Observable.Operator; +import rx.annotations.Experimental; +import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.functions.Func3; +import rx.functions.Func4; +import rx.functions.Func5; +import rx.functions.Func6; +import rx.functions.Func7; +import rx.functions.Func8; +import rx.functions.Func9; +import rx.internal.operators.OnSubscribeToObservableFuture; +import rx.internal.operators.OperatorMap; +import rx.internal.operators.OperatorObserveOn; +import rx.internal.operators.OperatorOnErrorReturn; +import rx.internal.operators.OperatorSubscribeOn; +import rx.internal.operators.OperatorTimeout; +import rx.internal.operators.OperatorZip; +import rx.observables.BlockingObservable; +import rx.observers.SafeSubscriber; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaPlugins; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; + +/** + * The Task class that implements the Reactive Pattern for scalars (single values). See {@link Observable} for a stream or vector of values. + *

+ * The documentation for this class makes use of marble diagrams. The following legend explains these diagrams: + *

+ * + *

+ * For more information see the ReactiveX documentation. + * + * @param + * the type of the item emitted by the Task + */ +@Experimental +public class Task { + + final OnSubscribe onSubscribe; + + /** + * Creates a Task with a Function to execute when it is subscribed to (executed). + *

+ * Note: Use {@link #create(OnExecute)} to create a Task, instead of this constructor, + * unless you specifically have a need for inheritance. + * + * @param f + * {@link OnExecute} to be executed when {@link #execute(TaskObserver)} or {@link #subscribe(Subscriber)} is called + */ + protected Task(final OnExecute f) { + // bridge between OnSubscribe (which all Operators and Observables use) and OnExecute (for Task) + this.onSubscribe = new OnSubscribe() { + + @Override + public void call(final Subscriber s) { + f.call(new TaskObserver() { + + @Override + public void onSuccess(T value) { + // TODO validate error handling if onNext throws + s.onNext(value); + s.onCompleted(); + } + + @Override + public void onError(Throwable error) { + // TODO validate error handling if onError throws + s.onError(error); + } + + }); + } + + }; + } + + private Task(final OnSubscribe f) { + this.onSubscribe = f; + } + + private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + + /** + * Returns a Task that will execute the specified function when a {@link TaskObserver} executes it or {@link Subscriber} subscribes to it. + *

+ * + *

+ * Write the function you pass to {@code create} so that it behaves as a Task: It should invoke the + * TaskExecutor {@link TaskObserver#onSuccess onSuccess} and {@link TaskObserver#onError onError} methods appropriately. + *

+ * A well-formed Task must invoke either the TaskExecutor's {@code onSuccess} method exactly once or + * its {@code onError} method exactly once. + *

+ *

+ *
Scheduler:
+ *
{@code create} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of the item that this Task emits + * @param f + * a function that accepts an {@code TaskExecutor}, and invokes its {@code onSuccess} or {@code onError} methods as appropriate + * @return a Task that, when a {@link Subscriber} subscribes to it, will execute the specified function + * @see ReactiveX operators documentation: Create + */ + public final static Task create(OnExecute f) { + return new Task(f); // TODO need hook + } + + /** + * Invoked when Task.execute is called. + */ + public static interface OnExecute extends Action1> { + // cover for generics insanity + } + + // TODO what should be the name of this? + public static interface TaskObserver { + + public void onSuccess(T value); + + public void onError(Throwable error); + } + + public static class Promise { + + private final Object lock = new Object(); + private TaskObserver child; + private T v; + private Throwable error; + private final Task task; + + public static Promise create() { + return new Promise(); + } + + Promise() { + this.task = Task.create(new OnExecute() { + + @Override + public void call(TaskObserver c) { + T _v = null; + Throwable _t = null; + synchronized (lock) { + child = c; + _v = v; + _t = error; + } + if (_v != null) { + // if a value is already set emit it + c.onSuccess(_v); + } else if (_t != null) { + // if a Throwable is already set emit it + c.onError(_t); + } + } + + }); + } + + public final void onSuccess(T value) { + TaskObserver c = null; + synchronized (lock) { + if (child != null) { + c = child; + } else { + v = value; + } + } + // emit outside of lock if we have an observer + if (c != null) { + c.onSuccess(value); + } + } + + public final void onError(Throwable t) { + TaskObserver c = null; + synchronized (lock) { + if (child != null) { + c = child; + } else { + t = error; + } + } + // emit outside of lock if we have an observer + if (c != null) { + c.onError(t); + } + } + + public final Task getTask() { + return task; + } + } + + /** + * Lifts a function to the current Task and returns a new Task that when subscribed to will pass + * the values of the current Task through the Operator function. + *

+ * In other words, this allows chaining TaskExecutors together on a Task for acting on the values within + * the Task. + *

{@code + * task.map(...).filter(...).lift(new OperatorA()).lift(new OperatorB(...)).subscribe() + * }

+ * If the operator you are creating is designed to act on the item emitted by a source + * Task, use {@code lift}. If your operator is designed to transform the source Task as a whole + * (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}. + *

+ *
Scheduler:
+ *
{@code lift} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param lift + * the Operator that implements the Task-operating function to be applied to the source Task + * @return a Task that is the result of applying the lifted Operator to the source Task + * @see RxJava wiki: Implementing Your Own Operators + */ + public final Task lift(final Operator lift) { + return new Task(new OnSubscribe() { + @Override + public void call(Subscriber o) { + try { + final Subscriber st = hook.onLift(lift).call(o); + try { + // new Subscriber created and being subscribed with so 'onStart' it + st.onStart(); + onSubscribe.call(st); + } catch (Throwable e) { + // localized capture of errors rather than it skipping all operators + // and ending up in the try/catch of the subscribe method which then + // prevents onErrorResumeNext and other similar approaches to error handling + if (e instanceof OnErrorNotImplementedException) { + throw (OnErrorNotImplementedException) e; + } + st.onError(e); + } + } catch (Throwable e) { + if (e instanceof OnErrorNotImplementedException) { + throw (OnErrorNotImplementedException) e; + } + // if the lift function failed all we can do is pass the error to the final Subscriber + // as we don't have the operator available to us + o.onError(e); + } + } + }); + } + + /** + * Transform an Observable by applying a particular Transformer function to it. + *

+ * This method operates on the Observable itself whereas {@link #lift} operates on the Observable's + * Subscribers or Observers. + *

+ * If the operator you are creating is designed to act on the individual items emitted by a source + * Observable, use {@link #lift}. If your operator is designed to transform the source Observable as a whole + * (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}. + *

+ *
Scheduler:
+ *
{@code compose} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param transformer + * implements the function that transforms the source Observable + * @return the source Observable, transformed by the transformer function + * @see RxJava wiki: Implementing Your Own Operators + */ + @SuppressWarnings("unchecked") + public Task compose(Transformer transformer) { + return ((Transformer) transformer).call(this); + } + + /** + * Transformer function used by {@link #compose}. + * + * @warn more complete description needed + */ + public static interface Transformer extends Func1, Task> { + // cover for generics insanity + } + + private static Observable toObservable(Task t) { + // is this sufficient, or do I need to keep the outer Task and subscribe to it? + return Observable.create(t.onSubscribe); + } + + /** + * INTERNAL: Used with lift and operators. + * + * Converts the source {@code Task} into an {@code Task>} that emits the + * source Observable as its single emission. + *

+ * + *

+ *
Scheduler:
+ *
{@code nest} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Observable that emits a single item: the source Observable + * @see ReactiveX operators documentation: To + */ + private final Task> nest() { + return Task.just(toObservable(this)); + } + + /* ********************************************************************************************************* + * Operators Below Here + * ********************************************************************************************************* + */ + + /** + * Returns an Observable that emits the items emitted by two Tasks, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * an Task to be concatenated + * @param t2 + * an Task to be concatenated + * @return an Observable that emits items emitted by the two source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2) { + return Observable.concat(toObservable(t1), toObservable(t2)); + } + + /** + * Returns an Observable that emits the items emitted by three Tasks, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @return an Observable that emits items emitted by the three source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3)); + } + + /** + * Returns an Observable that emits the items emitted by four Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @param t4 + * a Task to be concatenated + * @return an Observable that emits items emitted by the four source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3, Task t4) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4)); + } + + /** + * Returns an Observable that emits the items emitted by five Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @param t4 + * a Task to be concatenated + * @param t5 + * a Task to be concatenated + * @return an Observable that emits items emitted by the five source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3, Task t4, Task t5) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5)); + } + + /** + * Returns an Observable that emits the items emitted by six Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @param t4 + * a Task to be concatenated + * @param t5 + * a Task to be concatenated + * @param t6 + * a Task to be concatenated + * @return an Observable that emits items emitted by the six source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6)); + } + + /** + * Returns an Observable that emits the items emitted by seven Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @param t4 + * a Task to be concatenated + * @param t5 + * a Task to be concatenated + * @param t6 + * a Task to be concatenated + * @param t7 + * a Task to be concatenated + * @return an Observable that emits items emitted by the seven source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6, Task t7) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7)); + } + + /** + * Returns an Observable that emits the items emitted by eight Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @param t4 + * a Task to be concatenated + * @param t5 + * a Task to be concatenated + * @param t6 + * a Task to be concatenated + * @param t7 + * a Task to be concatenated + * @param t8 + * a Task to be concatenated + * @return an Observable that emits items emitted by the eight source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6, Task t7, Task t8) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8)); + } + + /** + * Returns an Observable that emits the items emitted by nine Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated + * @param t2 + * a Task to be concatenated + * @param t3 + * a Task to be concatenated + * @param t4 + * a Task to be concatenated + * @param t5 + * a Task to be concatenated + * @param t6 + * a Task to be concatenated + * @param t7 + * a Task to be concatenated + * @param t8 + * a Task to be concatenated + * @param t9 + * a Task to be concatenated + * @return an Observable that emits items emitted by the nine source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6, Task t7, Task t8, Task t9) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8), toObservable(t9)); + } + + /** + * Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError} method when the + * Observer subscribes to it. + *

+ * + *

+ *
Scheduler:
+ *
{@code error} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param exception + * the particular Throwable to pass to {@link Observer#onError onError} + * @param + * the type of the items (ostensibly) emitted by the Observable + * @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when + * the Observer subscribes to it + * @see ReactiveX operators documentation: Throw + */ + public final static Task error(final Throwable exception) { + return Task.create(new OnExecute() { + + @Override + public void call(TaskObserver te) { + te.onError(exception); + } + + }); + } + + /** + * Converts a {@link Future} into an Observable. + *

+ * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that emits the + * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} method. + *

+ * Important note: This Observable is blocking; you cannot unsubscribe from it. + *

+ *
Scheduler:
+ *
{@code from} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param future + * the source {@link Future} + * @param + * the type of object that the {@link Future} returns, and also the type of item to be emitted by + * the resulting Observable + * @return an Observable that emits the item from the source {@link Future} + * @see ReactiveX operators documentation: From + */ + public final static Task from(Future future) { + return new Task(OnSubscribeToObservableFuture.toObservableFuture(future)); + } + + /** + * Converts a {@link Future} into an Observable, with a timeout on the Future. + *

+ * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that emits the + * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} method. + *

+ * Important note: This Observable is blocking; you cannot unsubscribe from it. + *

+ *
Scheduler:
+ *
{@code from} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param future + * the source {@link Future} + * @param timeout + * the maximum time to wait before calling {@code get} + * @param unit + * the {@link TimeUnit} of the {@code timeout} argument + * @param + * the type of object that the {@link Future} returns, and also the type of item to be emitted by + * the resulting Observable + * @return an Observable that emits the item from the source {@link Future} + * @see ReactiveX operators documentation: From + */ + public final static Task from(Future future, long timeout, TimeUnit unit) { + return new Task(OnSubscribeToObservableFuture.toObservableFuture(future, timeout, unit)); + } + + /** + * Converts a {@link Future}, operating on a specified {@link Scheduler}, into an Observable. + *

+ * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that emits the + * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} method. + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param future + * the source {@link Future} + * @param scheduler + * the {@link Scheduler} to wait for the Future on. Use a Scheduler such as {@link Schedulers#io()} that can block and wait on the Future + * @param + * the type of object that the {@link Future} returns, and also the type of item to be emitted by + * the resulting Observable + * @return an Observable that emits the item from the source {@link Future} + * @see ReactiveX operators documentation: From + */ + public final static Task from(Future future, Scheduler scheduler) { + return new Task(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler); + } + + /** + * Returns an Observable that emits a single item and then completes. + *

+ * + *

+ * To convert any object into an Observable that emits that object, pass that object into the {@code just} method. + *

+ * This is similar to the {@link #from(java.lang.Object[])} method, except that {@code from} will convert + * an {@link Iterable} object into an Observable that emits each of the items in the Iterable, one at a + * time, while the {@code just} method converts an Iterable into an Observable that emits the entire + * Iterable as a single item. + *

+ *
Scheduler:
+ *
{@code just} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param value + * the item to emit + * @param + * the type of that item + * @return an Observable that emits {@code value} as a single item and then completes + * @see ReactiveX operators documentation: Just + */ + public final static Task just(final T value) { + // TODO add similar optimization as ScalarSynchronousObservable + return Task.create(new OnExecute() { + + @Override + public void call(TaskObserver te) { + te.onSuccess(value); + } + + }); + } + + // /** + // * Flattens an Observable that emits Observables into a single Observable that emits the items emitted by + // * those Observables, without any transformation. + // *

+ // * + // *

+ // * You can combine the items emitted by multiple Observables so that they appear as a single Observable, by + // * using the {@code merge} method. + // *

+ // *
Scheduler:
+ // *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ // *
+ // * + // * @param source + // * an Observable that emits Observables + // * @return an Observable that emits items that are the result of flattening the Observables emitted by the {@code source} Observable + // * @see ReactiveX operators documentation: Merge + // */ + // public final static Observable merge(Observable> source) { + // return source.lift(OperatorMerge. instance(false)); + // } + + /** + * Flattens a Task that emits a Task into a single Task that emits the items emitted by + * the nested Task, without any transformation. + *

+ * + *

+ *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param source + * a Task that emits a Task + * @return a Task that emits the item that is the result of flattening the Task emitted by the {@code source} Task + * @see ReactiveX operators documentation: Merge + */ + public final static Task merge(final Task> source) { + // // TODO This approach is a hack using Observable.merge + // Task> m2 = source.map(new Func1, Observable>() { + // + // @Override + // public Observable call(Task t) { + // return toObservable(t); + // } + // + // }); + // Observable merged = Observable.merge(toObservable(m2)); + // return new Task(merged.onSubscribe); + + // TODO this is a quick hack that ignores backpressure etc + // TODO can we do this with lift instead? + return Task.create(new OnExecute() { + + @Override + public void call(final TaskObserver child) { + source.execute(new TaskObserver>() { + + @Override + public void onSuccess(Task t) { + t.execute(child); + } + + @Override + public void onError(Throwable error) { + child.onError(error); + } + + }); + } + }); + } + + /** + * Flattens two Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2) { + return Observable.merge(toObservable(t1), toObservable(t2)); + } + + /** + * Flattens three Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3)); + } + + /** + * Flattens four Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @param t4 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3, Task t4) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4)); + } + + /** + * Flattens five Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @param t4 + * a Task to be merged + * @param t5 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3, Task t4, Task t5) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5)); + } + + /** + * Flattens six Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @param t4 + * a Task to be merged + * @param t5 + * a Task to be merged + * @param t6 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6)); + } + + /** + * Flattens seven Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @param t4 + * a Task to be merged + * @param t5 + * a Task to be merged + * @param t6 + * a Task to be merged + * @param t7 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6, Task t7) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7)); + } + + /** + * Flattens eight Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @param t4 + * a Task to be merged + * @param t5 + * a Task to be merged + * @param t6 + * a Task to be merged + * @param t7 + * a Task to be merged + * @param t8 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6, Task t7, Task t8) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8)); + } + + /** + * Flattens nine Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @param t2 + * a Task to be merged + * @param t3 + * a Task to be merged + * @param t4 + * a Task to be merged + * @param t5 + * a Task to be merged + * @param t6 + * a Task to be merged + * @param t7 + * a Task to be merged + * @param t8 + * a Task to be merged + * @param t9 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Task t1, Task t2, Task t3, Task t4, Task t5, Task t6, Task t7, Task t8, Task t9) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8), toObservable(t9)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * two items emitted, in sequence, by two other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1} and the first item + * emitted by {@code o2}; the second item emitted by the new Observable will be the result of the function + * applied to the second item emitted by {@code o1} and the second item emitted by {@code o2}; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results + * in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, final Func2 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * three items emitted, in sequence, by three other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1}, the first item + * emitted by {@code o2}, and the first item emitted by {@code o3}; the second item emitted by the new + * Observable will be the result of the function applied to the second item emitted by {@code o1}, the + * second item emitted by {@code o2}, and the second item emitted by {@code o3}; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Func3 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1}, the first item + * emitted by {@code o2}, the first item emitted by {@code o3}, and the first item emitted by {@code 04}; + * the second item emitted by the new Observable will be the result of the function applied to the second + * item emitted by each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Task o4, Func4 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * five items emitted, in sequence, by five other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1}, the first item + * emitted by {@code o2}, the first item emitted by {@code o3}, the first item emitted by {@code o4}, and + * the first item emitted by {@code o5}; the second item emitted by the new Observable will be the result of + * the function applied to the second item emitted by each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Task o4, Task o5, Func5 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * six items emitted, in sequence, by six other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Task o4, Task o5, Task o6, + Func6 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * seven items emitted, in sequence, by seven other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Task o4, Task o5, Task o6, Task o7, + Func7 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6), toObservable(o7) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * eight items emitted, in sequence, by eight other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param o8 + * an eighth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Task o4, Task o5, Task o6, Task o7, Task o8, + Func8 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6), toObservable(o7), toObservable(o8) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * nine items emitted, in sequence, by nine other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param o8 + * an eighth source Observable + * @param o9 + * a ninth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Task zip(Task o1, Task o2, Task o3, Task o4, Task o5, Task o6, Task o7, Task o8, + Task o9, Func9 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6), toObservable(o7), toObservable(o8), toObservable(o9) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the items emitted from the current Observable, then the next, one after + * the other, without interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be concatenated after the current + * @return an Observable that emits items emitted by the two source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final Observable concatWith(Task t1) { + return concat(this, t1); + } + + /** + * Returns an Observable that emits items based on applying a function that you supply to each item emitted + * by the source Observable, where that function returns an Observable, and then merging those resulting + * Observables and emitting the results of this merger. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function that, when applied to an item emitted by the source Observable, returns an + * Observable + * @return an Observable that emits the result of applying the transformation function to each item emitted + * by the source Observable and merging the results of the Observables obtained from this + * transformation + * @see ReactiveX operators documentation: FlatMap + */ + public final Task flatMap(final Func1> func) { + return merge(map(func)); + } + + /** + * Returns an Observable that emits items based on applying a function that you supply to each item emitted + * by the source Observable, where that function returns an Observable, and then merging those resulting + * Observables and emitting the results of this merger. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function that, when applied to an item emitted by the source Observable, returns an + * Observable + * @return an Observable that emits the result of applying the transformation function to each item emitted + * by the source Observable and merging the results of the Observables obtained from this + * transformation + * @see ReactiveX operators documentation: FlatMap + */ + public final Observable flatMapObservable(Func1> func) { + return Observable.merge(toObservable(map(func))); + } + + /** + * Returns a Task that applies a specified function to the item emitted by the source Task and + * emits the result of this function applications. + *

+ * + *

+ *
Scheduler:
+ *
{@code map} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function to apply to the item emitted by the Task + * @return a Task that emits the item from the source Task, transformed by the specified function + * @see ReactiveX operators documentation: Map + */ + public final Task map(Func1 func) { + return lift(new OperatorMap(func)); + } + + /** + * Flattens this and another Observable into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code mergeWith} method. + *

+ *
Scheduler:
+ *
{@code mergeWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Task to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final Observable mergeWith(Task t1) { + return merge(this, t1); + } + + /** + * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, + * asynchronously with an unbounded buffer. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @return the source Observable modified so that its {@link Observer}s are notified on the specified {@link Scheduler} + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + * @see #subscribeOn + */ + public final Task observeOn(Scheduler scheduler) { + return lift(new OperatorObserveOn(scheduler)); + } + + /** + * Instructs an Observable to emit an item (returned by a specified function) rather than invoking {@link Observer#onError onError} if it encounters an error. + *

+ * + *

+ * By default, when an Observable encounters an error that prevents it from emitting the expected item to + * its {@link Observer}, the Observable invokes its Observer's {@code onError} method, and then quits + * without invoking any more of its Observer's methods. The {@code onErrorReturn} method changes this + * behavior. If you pass a function ({@code resumeFunction}) to an Observable's {@code onErrorReturn} method, if the original Observable encounters an error, instead of invoking its Observer's + * {@code onError} method, it will instead emit the return value of {@code resumeFunction}. + *

+ * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + *

+ *
Scheduler:
+ *
{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param resumeFunction + * a function that returns an item that the new Observable will emit if the source Observable + * encounters an error + * @return the original Observable with appropriately modified behavior + * @see ReactiveX operators documentation: Catch + */ + public final Task onErrorReturn(Func1 resumeFunction) { + return lift(new OperatorOnErrorReturn(resumeFunction)); + } + + public final void execute(final TaskObserver te) { + subscribe(new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + te.onError(e); + } + + @Override + public void onNext(T t) { + te.onSuccess(t); + } + + }); + + } + + /** + * Subscribes to an Observable but ignore its emissions and notifications. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before + * the Observable has finished sending them + * @throws OnErrorNotImplementedException + * if the Observable tries to call {@code onError} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe() { + return subscribe(new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + + @Override + public final void onNext(T args) { + // do nothing + } + + }); + } + + /** + * Subscribes to an Observable and provides a callback to handle the items it emits. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onNext + * the {@code Action1} you have designed to accept emissions from the Observable + * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before + * the Observable has finished sending them + * @throws IllegalArgumentException + * if {@code onNext} is null + * @throws OnErrorNotImplementedException + * if the Observable tries to call {@code onError} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe(final Action1 onSuccess) { + if (onSuccess == null) { + throw new IllegalArgumentException("onSuccess can not be null"); + } + + return subscribe(new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + + @Override + public final void onNext(T args) { + onSuccess.call(args); + } + + }); + } + + /** + * Subscribes to an Observable and provides callbacks to handle the items it emits and any error + * notification it issues. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onNext + * the {@code Action1} you have designed to accept emissions from the Observable + * @param onError + * the {@code Action1} you have designed to accept any error notification from the + * Observable + * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before + * the Observable has finished sending them + * @see ReactiveX operators documentation: Subscribe + * @throws IllegalArgumentException + * if {@code onNext} is null, or + * if {@code onError} is null + */ + public final Subscription subscribe(final Action1 onSuccess, final Action1 onError) { + if (onSuccess == null) { + throw new IllegalArgumentException("onSuccess can not be null"); + } + if (onError == null) { + throw new IllegalArgumentException("onError can not be null"); + } + + return subscribe(new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + onError.call(e); + } + + @Override + public final void onNext(T args) { + onSuccess.call(args); + } + + }); + } + + /** + * Subscribes to an Observable and invokes {@link OnSubscribe} function without any contract protection, + * error handling, unsubscribe, or execution hooks. + *

+ * Use this only for implementing an {@link Operator} that requires nested subscriptions. For other + * purposes, use {@link #subscribe(Subscriber)} which ensures the Rx contract and other functionality. + *

+ *
Scheduler:
+ *
{@code unsafeSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param subscriber + * the Subscriber that will handle emissions and notifications from the Observable + * @return a {@link Subscription} reference with which the {@link Subscriber} can stop receiving items + * before the Observable has completed + */ + public final Subscription unsafeSubscribe(Subscriber subscriber) { + try { + // new Subscriber so onStart it + subscriber.onStart(); + // TODO add back the hook + // hook.onSubscribeStart(this, onSubscribe).call(subscriber); + onSubscribe.call(subscriber); + return hook.onSubscribeReturn(subscriber); + } catch (Throwable e) { + // special handling for certain Throwable/Error/Exception types + Exceptions.throwIfFatal(e); + // if an unhandled error occurs executing the onSubscribe we will propagate it + try { + subscriber.onError(hook.onSubscribeError(e)); + } catch (OnErrorNotImplementedException e2) { + // special handling when onError is not implemented ... we just rethrow + throw e2; + } catch (Throwable e2) { + // if this happens it means the onError itself failed (perhaps an invalid function implementation) + // so we are unable to propagate the error correctly and will just throw + RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); + // TODO could the hook be the cause of the error in the on error handling. + hook.onSubscribeError(r); + // TODO why aren't we throwing the hook's return value. + throw r; + } + return Subscriptions.unsubscribed(); + } + } + + /** + * Subscribes to an Observable and provides a Subscriber that implements functions to handle the items the + * Observable emits and any error or completion notification it issues. + *

+ * A typical implementation of {@code subscribe} does the following: + *

    + *
  1. It stores a reference to the Subscriber in a collection object, such as a {@code List} object.
  2. + *
  3. It returns a reference to the {@link Subscription} interface. This enables Subscribers to + * unsubscribe, that is, to stop receiving items and notifications before the Observable completes, which + * also invokes the Subscriber's {@link Subscriber#onCompleted onCompleted} method.
  4. + *

+ * An {@code Observable} instance is responsible for accepting all subscriptions and notifying all + * Subscribers. Unless the documentation for a particular {@code Observable} implementation indicates + * otherwise, Subscriber should make no assumptions about the order in which multiple Subscribers will + * receive their notifications. + *

+ * For more information see the + * ReactiveX documentation. + *

+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param subscriber + * the {@link Subscriber} that will handle emissions and notifications from the Observable + * @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can + * unsubscribe from the Observable + * @throws IllegalStateException + * if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function + * @throws IllegalArgumentException + * if the {@link Subscriber} provided as the argument to {@code subscribe} is {@code null} + * @throws OnErrorNotImplementedException + * if the {@link Subscriber}'s {@code onError} method is null + * @throws RuntimeException + * if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe(Subscriber subscriber) { + // validate and proceed + if (subscriber == null) { + throw new IllegalArgumentException("observer can not be null"); + } + if (onSubscribe == null) { + throw new IllegalStateException("onSubscribe function can not be null."); + /* + * the subscribe function can also be overridden but generally that's not the appropriate approach + * so I won't mention that in the exception + */ + } + + // new Subscriber so onStart it + subscriber.onStart(); + + /* + * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls + * to user code from within an Observer" + */ + // if not already wrapped + if (!(subscriber instanceof SafeSubscriber)) { + // assign to `observer` so we return the protected version + subscriber = new SafeSubscriber(subscriber); + } + + // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. + try { + // allow the hook to intercept and/or decorate + // TODO add back the hook + // hook.onSubscribeStart(this, onSubscribe).call(subscriber); + onSubscribe.call(subscriber); + return hook.onSubscribeReturn(subscriber); + } catch (Throwable e) { + // special handling for certain Throwable/Error/Exception types + Exceptions.throwIfFatal(e); + // if an unhandled error occurs executing the onSubscribe we will propagate it + try { + subscriber.onError(hook.onSubscribeError(e)); + } catch (OnErrorNotImplementedException e2) { + // special handling when onError is not implemented ... we just rethrow + throw e2; + } catch (Throwable e2) { + // if this happens it means the onError itself failed (perhaps an invalid function implementation) + // so we are unable to propagate the error correctly and will just throw + RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); + // TODO could the hook be the cause of the error in the on error handling. + hook.onSubscribeError(r); + // TODO why aren't we throwing the hook's return value. + throw r; + } + return Subscriptions.unsubscribed(); + } + } + + /** + * Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to perform subscription actions on + * @return the source Observable modified so that its subscriptions happen on the + * specified {@link Scheduler} + * @see ReactiveX operators documentation: SubscribeOn + * @see RxJava Threading Examples + * @see #observeOn + */ + public final Task subscribeOn(Scheduler scheduler) { + return nest().lift(new OperatorSubscribeOn(scheduler)); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, + * the resulting Observable terminates and notifies observers of a {@code TimeoutException}. + *

+ * + *

+ *
Scheduler:
+ *
This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param timeout + * maximum duration between emitted items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument. + * @return the source Observable modified to notify observers of a {@code TimeoutException} in case of a + * timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Task timeout(long timeout, TimeUnit timeUnit) { + return timeout(timeout, timeUnit, null, Schedulers.computation()); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item, where this policy is governed on a specified Scheduler. If the next item isn't emitted within the + * specified timeout duration starting from its predecessor, the resulting Observable terminates and + * notifies observers of a {@code TimeoutException}. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param timeout + * maximum duration between items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument + * @param scheduler + * the Scheduler to run the timeout timers on + * @return the source Observable modified to notify observers of a {@code TimeoutException} in case of a + * timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Task timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) { + return timeout(timeout, timeUnit, null, scheduler); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, + * the resulting Observable begins instead to mirror a fallback Observable. + *

+ * + *

+ *
Scheduler:
+ *
This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param timeout + * maximum duration between items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument + * @param other + * the fallback Observable to use in case of a timeout + * @return the source Observable modified to switch to the fallback Observable in case of a timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Task timeout(long timeout, TimeUnit timeUnit, Task other) { + return timeout(timeout, timeUnit, other, Schedulers.computation()); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item using a specified Scheduler. If the next item isn't emitted within the specified timeout duration + * starting from its predecessor, the resulting Observable begins instead to mirror a fallback Observable. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param timeout + * maximum duration between items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument + * @param other + * the Observable to use as the fallback in case of a timeout + * @param scheduler + * the {@link Scheduler} to run the timeout timers on + * @return the source Observable modified so that it will switch to the fallback Observable in case of a + * timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Task timeout(long timeout, TimeUnit timeUnit, Task other, Scheduler scheduler) { + return lift(new OperatorTimeout(timeout, timeUnit, toObservable(other), scheduler)); + } + + /** + * TODO TEMPORARY? Should we have a BlockingTask? + * + * Converts a Task into a {@link BlockingObservable} (an Observable with blocking operators). + *
+ *
Scheduler:
+ *
{@code toBlocking} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a {@code BlockingObservable} version of this Observable + * @see ReactiveX operators documentation: To + * @Experimental + */ + @Experimental + public final BlockingObservable toBlocking() { + return BlockingObservable.from(toObservable(this)); + } + + /** + * Returns an Observable that emits items that are the result of applying a specified function to pairs of + * values, one each from the source Observable and another specified Observable. + *

+ * + *

+ *
Scheduler:
+ *
{@code zipWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of items emitted by the {@code other} Observable + * @param + * the type of items emitted by the resulting Observable + * @param other + * the other Observable + * @param zipFunction + * a function that combines the pairs of items from the two Observables to generate the items to + * be emitted by the resulting Observable + * @return an Observable that pairs up values from the source Observable and the {@code other} Observable + * and emits the results of {@code zipFunction} applied to these pairs + * @see ReactiveX operators documentation: Zip + */ + public final Task zipWith(Task other, Func2 zipFunction) { + return zip(this, other, zipFunction); + } + +} diff --git a/src/test/java/rx/TaskTest.java b/src/test/java/rx/TaskTest.java new file mode 100644 index 0000000000..b01e31f802 --- /dev/null +++ b/src/test/java/rx/TaskTest.java @@ -0,0 +1,196 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package rx; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Task.OnExecute; +import rx.Task.TaskObserver; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +public class TaskTest { + + @Test + public void testHelloWorld() { + TestSubscriber ts = new TestSubscriber(); + Task.just("Hello World!").subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("Hello World!")); + } + + @Test + public void testHelloWorld2() { + final AtomicReference v = new AtomicReference(); + Task.just("Hello World!").execute(new TaskObserver() { + + @Override + public void onSuccess(String value) { + v.set(value); + } + + @Override + public void onError(Throwable error) { + + } + + }); + assertEquals("Hello World!", v.get()); + } + + @Test + public void testMap() { + TestSubscriber ts = new TestSubscriber(); + Task.just("A") + .map(new Func1() { + + @Override + public String call(String s) { + return s + "B"; + } + + }) + .subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("AB")); + } + + @Test + public void testZip() { + TestSubscriber ts = new TestSubscriber(); + Task a = Task.just("A"); + Task b = Task.just("B"); + + Task.zip(a, b, new Func2() { + + @Override + public String call(String a, String b) { + return a + b; + } + + }) + .subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("AB")); + } + + @Test + public void testZipWith() { + TestSubscriber ts = new TestSubscriber(); + + Task.just("A").zipWith(Task.just("B"), new Func2() { + + @Override + public String call(String a, String b) { + return a + b; + } + + }) + .subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("AB")); + } + + @Test + public void testMerge() { + TestSubscriber ts = new TestSubscriber(); + Task a = Task.just("A"); + Task b = Task.just("B"); + + Task.merge(a, b).subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("A", "B")); + } + + @Test + public void testMergeWith() { + TestSubscriber ts = new TestSubscriber(); + + Task.just("A").mergeWith(Task.just("B")).subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("A", "B")); + } + + @Test + public void testCreateSuccess() { + TestSubscriber ts = new TestSubscriber(); + Task.create(new OnExecute() { + + @Override + public void call(TaskObserver s) { + s.onSuccess("Hello"); + } + + }).subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("Hello")); + } + + @Test + public void testCreateError() { + TestSubscriber ts = new TestSubscriber(); + Task.create(new OnExecute() { + + @Override + public void call(TaskObserver s) { + s.onError(new RuntimeException("fail")); + } + + }).subscribe(ts); + assertEquals(1, ts.getOnErrorEvents().size()); + } + + @Test + public void testAsync() { + TestSubscriber ts = new TestSubscriber(); + Task.just("Hello") + .subscribeOn(Schedulers.io()) + .map(new Func1() { + + @Override + public String call(String v) { + System.out.println("SubscribeOn Thread: " + Thread.currentThread()); + return v; + } + + }) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + @Override + public String call(String v) { + System.out.println("ObserveOn Thread: " + Thread.currentThread()); + return v; + } + + }) + .subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList("Hello")); + } + + @Test + public void testFlatMap() { + TestSubscriber ts = new TestSubscriber(); + Task.just("Hello").flatMap(new Func1>() { + + @Override + public Task call(String s) { + return Task.just(s + " World!").subscribeOn(Schedulers.computation()); + } + + }).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList("Hello World!")); + } +}