From 520bedf1e757ad829056cc50adc57b75ec134516 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Wed, 27 Mar 2013 14:36:54 -0700 Subject: [PATCH] * Removed all methods working with Objects from rxjava-core - This allows for type safety in statically-typed languages - This prevents dynamically-typed languages from hooking into rxjava-core. -- See #204 for details on code generation for dynamic languages * Added Scala implicits into rx.lang.scala.RxImplicits * Added tests of most methods on Observable using Scala functions * Fixed Scala Gradle/ScalaTest build --- language-adaptors/rxjava-scala/build.gradle | 9 +- .../scala/rx/lang/scala/RxImplicits.scala | 596 +++++++++ .../scala/rx/lang/scala/ScalaAdaptor.scala | 204 ---- rxjava-core/src/main/java/rx/Observable.java | 1068 +---------------- .../src/main/java/rx/subjects/Subject.java | 6 +- 5 files changed, 629 insertions(+), 1254 deletions(-) create mode 100644 language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala delete mode 100644 language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala diff --git a/language-adaptors/rxjava-scala/build.gradle b/language-adaptors/rxjava-scala/build.gradle index b0ac3b942b..fdeddde269 100644 --- a/language-adaptors/rxjava-scala/build.gradle +++ b/language-adaptors/rxjava-scala/build.gradle @@ -6,6 +6,7 @@ apply plugin: 'osgi' tasks.withType(ScalaCompile) { scalaCompileOptions.fork = true scalaCompileOptions.unchecked = true + scalaCompileOptions.setAdditionalParameters(['-feature']) configure(scalaCompileOptions.forkOptions) { memoryMaximumSize = '1g' @@ -24,14 +25,18 @@ dependencies { provided 'org.mockito:mockito-core:1.8.5' testCompile 'org.scalatest:scalatest_2.10:1.9.1' + testCompile 'junit:junit:4.10' + testCompile 'org.mockito:mockito-core:1.8.5' } +sourceSets.test.scala.srcDir 'src/main/scala' + task test(overwrite: true, dependsOn: testClasses) << { ant.taskdef(name: 'scalatest', classname: 'org.scalatest.tools.ScalaTestAntTask', - classpath: sourceSets.test.runtimeClasspath.asPath + classpath: configurations.testRuntime.asPath + ':' + compileScala.destinationDir ) - ant.scalatest(runpath: sourceSets.test.classesDir, + ant.scalatest(runpath: sourceSets.test.output.classesDir, haltonfailure: 'true', fork: 'false') {reporter(type: 'stdout')} } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala new file mode 100644 index 0000000000..7d9388764f --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala @@ -0,0 +1,596 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala + +object RxImplicits { + import java.{ lang => jlang } + import language.implicitConversions + + import rx.Observable + import rx.util.functions._ + + /** + * Converts 0-arg function to Rx Action0 + */ + implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 = + new Action0 { + def call(): Unit = f() + } + + /** + * Converts 1-arg function to Rx Action1 + */ + implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] = + new Action1[A] { + def call(a: A): Unit = f(a) + } + + /** + * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean] + */ + implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] = + new Func1[A, jlang.Boolean] { + def call(a: A): jlang.Boolean = f(a).booleanValue + } + + /** + * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2 + */ + implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] = + new Func2[A, jlang.Integer, jlang.Boolean] { + def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue + } + + /** + * Converts a function shaped ilke compareTo into the equivalent Rx Func2 + */ + implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] = + new Func2[A, A, jlang.Integer] { + def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue + } + + /* + * This implicit allows Scala code to use any exception type and still work + * with invariant Func1 interface + */ + implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] = + new Func1[Exception, B] { + def call(ex: Exception): B = f(ex.asInstanceOf[A]) + } + + /** + * The following implicits convert functions of different arities into the Rx equivalents + */ + implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] = + new Func0[A] { + def call(): A = f() + } + + implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] = + new Func1[A, B] { + def call(a: A): B = f(a) + } + + implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] = + new Func2[A, B, C] { + def call(a: A, b: B) = f(a, b) + } + + implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] = + new Func3[A, B, C, D] { + def call(a: A, b: B, c: C) = f(a, b, c) + } + + implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] = + new Func4[A, B, C, D, E] { + def call(a: A, b: B, c: C, d: D) = f(a, b, c, d) + } + + /** + * This implicit class implements all of the methods necessary for including Observables in a + * for-comprehension. Note that return type is always Observable, so that the ScalaObservable + * type never escapes the for-comprehension + */ + implicit class ScalaObservable[A](wrapped: Observable[A]) { + def map[B](f: A => B): Observable[B] = wrapped.map(f) + def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f) + def foreach(f: A => Unit): Unit = wrapped.forEach(f) + def withFilter(p: A => Boolean): WithFilter = new WithFilter(p) + + class WithFilter(p: A => Boolean) { + def map[B](f: A => B): Observable[B] = wrapped.filter(p).map(f) + def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.filter(p).flatMap(f) + def foreach(f: A => Unit): Unit = wrapped.filter(p).forEach(f) + def withFilter(p: A => Boolean): Observable[A] = wrapped.filter(p) + } + } +} + +import org.scalatest.junit.JUnitSuite + +class UnitTestSuite extends JUnitSuite { + import rx.lang.scala.RxImplicits._ + + import org.junit.{ Before, Test } + import org.junit.Assert._ + import org.mockito.Matchers.any + import org.mockito.Mockito._ + import org.mockito.{ MockitoAnnotations, Mock } + import rx.{ Notification, Observer, Observable, Subscription } + import rx.observables.GroupedObservable + import collection.mutable.ArrayBuffer + import collection.JavaConverters._ + + @Mock private[this] + val observer: Observer[Any] = null + + @Mock private[this] + val subscription: Subscription = null + + val isOdd = (i: Int) => i % 2 == 1 + val isEven = (i: Int) => i % 2 == 0 + + class ObservableWithException(s: Subscription, values: String*) extends Observable[String] { + var t: Thread = null + + override def subscribe(observer: Observer[String]): Subscription = { + println("ObservableWithException subscribed to ...") + t = new Thread(new Runnable() { + override def run() { + try { + println("running ObservableWithException thread") + values.toList.foreach(v => { + println("ObservableWithException onNext: " + v) + observer.onNext(v) + }) + throw new RuntimeException("Forced Failure") + } catch { + case ex: Exception => observer.onError(ex) + } + } + }) + println("starting ObservableWithException thread") + t.start() + println("done starting ObservableWithException thread") + s + } + } + + @Before def before { + MockitoAnnotations.initMocks(this) + } + + // tests of static methods + + @Test def testSingle { + assertEquals(1, Observable.from(1).single) + } + + @Test def testSinglePredicate { + val found = Observable.from(1, 2, 3).single(isEven) + assertEquals(2, found) + } + + @Test def testSingleOrDefault { + assertEquals(0, Observable.from().singleOrDefault(0)) + assertEquals(1, Observable.from(1).singleOrDefault(0)) + try { + Observable.from(1, 2, 3).singleOrDefault(0) + fail("Did not catch any exception, expected IllegalStateException") + } catch { + case ex: IllegalStateException => println("Caught expected IllegalStateException") + case ex: Throwable => fail("Caught unexpected exception " + ex.getCause + ", expected IllegalStateException") + } + } + + @Test def testSingleOrDefaultPredicate { + assertEquals(2, Observable.from(1, 2, 3).singleOrDefault(0, isEven)) + assertEquals(0, Observable.from(1, 3).singleOrDefault(0, isEven)) + try { + Observable.from(1, 2, 3).singleOrDefault(0, isOdd) + fail("Did not catch any exception, expected IllegalStateException") + } catch { + case ex: IllegalStateException => println("Caught expected IllegalStateException") + case ex: Throwable => fail("Caught unexpected exception " + ex.getCause + ", expected IllegalStateException") + } + } + + @Test def testFromJavaInterop { + val observable = Observable.from(List(1, 2, 3).asJava) + assertSubscribeReceives(observable)(1, 2, 3) + } + + @Test def testSubscribe { + val observable = Observable.from("1", "2", "3") + assertSubscribeReceives(observable)("1", "2", "3") + } + + //should not compile - adapted from https://gist.github.com/jmhofer/5195589 + /*@Test def testSubscribeOnInt() { + val observable = Observable.from("1", "2", "3") + observable.subscribe((arg: Int) => { + println("testSubscribe: arg = " + arg) + }) + }*/ + + @Test def testDefer { + val lazyObservableFactory = () => Observable.from(1, 2) + val observable = Observable.defer(lazyObservableFactory) + assertSubscribeReceives(observable)(1, 2) + } + + @Test def testJust { + val observable = Observable.just("foo") + assertSubscribeReceives(observable)("foo") + } + + @Test def testMerge { + val observable1 = Observable.from(1, 2, 3) + val observable2 = Observable.from(4, 5, 6) + val observableList = List(observable1, observable2).asJava + val merged = Observable.merge(observableList) + assertSubscribeReceives(merged)(1, 2, 3, 4, 5, 6) + } + + @Test def testFlattenMerge { + val observable = Observable.from(Observable.from(1, 2, 3)) + val merged = Observable.merge(observable) + assertSubscribeReceives(merged)(1, 2, 3) + } + + @Test def testSequenceMerge { + val observable1 = Observable.from(1, 2, 3) + val observable2 = Observable.from(4, 5, 6) + val merged = Observable.merge(observable1, observable2) + assertSubscribeReceives(merged)(1, 2, 3, 4, 5, 6) + } + + @Test def testConcat { + val observable1 = Observable.from(1, 2, 3) + val observable2 = Observable.from(4, 5, 6) + val concatenated = Observable.concat(observable1, observable2) + assertSubscribeReceives(concatenated)(1, 2, 3, 4, 5, 6) + } + + @Test def testSynchronize { + val observable = Observable.from(1, 2, 3) + val synchronized = Observable.synchronize(observable) + assertSubscribeReceives(synchronized)(1, 2, 3) + } + + @Test def testZip3() { + val numbers = Observable.from(1, 2, 3) + val colors = Observable.from("red", "green", "blue") + val names = Observable.from("lion-o", "cheetara", "panthro") + + case class Character(id: Int, color: String, name: String) + + val liono = Character(1, "red", "lion-o") + val cheetara = Character(2, "green", "cheetara") + val panthro = Character(3, "blue", "panthro") + + val characters = Observable.zip(numbers, colors, names, Character.apply _) + assertSubscribeReceives(characters)(liono, cheetara, panthro) + } + + @Test def testZip4() { + val numbers = Observable.from(1, 2, 3) + val colors = Observable.from("red", "green", "blue") + val names = Observable.from("lion-o", "cheetara", "panthro") + val isLeader = Observable.from(true, false, false) + + case class Character(id: Int, color: String, name: String, isLeader: Boolean) + + val liono = Character(1, "red", "lion-o", true) + val cheetara = Character(2, "green", "cheetara", false) + val panthro = Character(3, "blue", "panthro", false) + + val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _) + assertSubscribeReceives(characters)(liono, cheetara, panthro) + } + + //tests of instance methods + + // missing tests for : takeUntil, groupBy, next, mostRecent + + @Test def testFilter { + val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9) + val observable = numbers.filter(isEven) + assertSubscribeReceives(observable)(2, 4, 6, 8) + } + + @Test def testLast { + val observable = Observable.from(1, 2, 3, 4) + assertEquals(4, observable.last) + } + + @Test def testLastPredicate { + val observable = Observable.from(1, 2, 3, 4) + assertEquals(3, observable.last(isOdd)) + } + + @Test def testLastOrDefault { + val observable = Observable.from(1, 2, 3, 4) + assertEquals(4, observable.lastOrDefault(5)) + assertEquals(5, Observable.from[Int]().lastOrDefault(5)) + } + + @Test def testLastOrDefaultPredicate { + val observable = Observable.from(1, 2, 3, 4) + assertEquals(3, observable.lastOrDefault(5, isOdd)) + assertEquals(5, Observable.from[Int]().lastOrDefault(5, isOdd)) + } + + @Test def testMap { + val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9) + val mappedNumbers = ArrayBuffer.empty[Int] + numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => { + mappedNumbers.append(squareVal) + }) + assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList) + } + + @Test def testMapMany { + val numbers = Observable.from(1, 2, 3, 4) + val f = (i: Int) => Observable.from(List(i, -i).asJava) + val mappedNumbers = ArrayBuffer.empty[Int] + numbers.mapMany(f).subscribe((i: Int) => { + mappedNumbers.append(i) + }) + assertEquals(List(1, -1, 2, -2, 3, -3, 4, -4), mappedNumbers.toList) + } + + @Test def testMaterialize { + val observable = Observable.from(1, 2, 3, 4) + val expectedNotifications: List[Notification[Int]] = + ((1.to(4).map(i => new Notification(i))) :+ new Notification()).toList + val actualNotifications: ArrayBuffer[Notification[Int]] = ArrayBuffer.empty + observable.materialize.subscribe((n: Notification[Int]) => { + actualNotifications.append(n) + }) + assertEquals(expectedNotifications, actualNotifications.toList) + } + + @Test def testDematerialize { + val notifications: List[Notification[Int]] = + ((1.to(4).map(i => new Notification(i))) :+ new Notification()).toList + val observableNotifications: Observable[Notification[Int]] = + Observable.from(notifications.asJava) + val observable: Observable[Int] = + observableNotifications.dematerialize() + assertSubscribeReceives(observable)(1, 2, 3, 4) + } + + @Test def testOnErrorResumeNextObservableNoError { + val observable = Observable.from(1, 2, 3, 4) + val resumeObservable = Observable.from(5, 6, 7, 8) + val observableWithErrorHandler = observable.onErrorResumeNext(resumeObservable) + assertSubscribeReceives(observableWithErrorHandler)(1, 2, 3, 4) + } + + @Test def testOnErrorResumeNextObservableErrorOccurs { + val observable = new ObservableWithException(subscription, "foo", "bar") + val resumeObservable = Observable.from("a", "b", "c", "d") + val observableWithErrorHandler = observable.onErrorResumeNext(resumeObservable) + observableWithErrorHandler.subscribe(observer.asInstanceOf[Observer[String]]) + + try { + observable.t.join() + } catch { + case ex: InterruptedException => fail(ex.getMessage) + } + + List("foo", "bar", "a", "b", "c", "d").foreach(t => verify(observer, times(1)).onNext(t)) + verify(observer, never()).onError(any(classOf[Exception])) + verify(observer, times(1)).onCompleted() + } + + @Test def testOnErrorResumeNextFuncNoError { + val observable = Observable.from(1, 2, 3, 4) + val resumeFunc = (ex: RuntimeException) => Observable.from(5, 6, 7, 8) + val observableWithErrorHandler = observable.onErrorResumeNext(resumeFunc) + assertSubscribeReceives(observableWithErrorHandler)(1, 2, 3, 4) + } + + @Test def testOnErrorResumeNextFuncErrorOccurs { + val observable = new ObservableWithException(subscription, "foo", "bar") + val resumeFunc = (ex: RuntimeException) => Observable.from("a", "b", "c", "d") + val observableWithErrorHandler = observable.onErrorResumeNext(resumeFunc) + observableWithErrorHandler.subscribe(observer.asInstanceOf[Observer[String]]) + + try { + observable.t.join() + } catch { + case ex: InterruptedException => fail(ex.getMessage) + } + + List("foo", "bar", "a", "b", "c", "d").foreach(t => verify(observer, times(1)).onNext(t)) + verify(observer, never()).onError(any(classOf[Exception])) + verify(observer, times(1)).onCompleted() + } + + @Test def testOnErrorReturnFuncNoError { + val observable = Observable.from(1, 2, 3, 4) + val returnFunc = (ex: RuntimeException) => 87 + val observableWithErrorHandler = observable.onErrorReturn(returnFunc) + assertSubscribeReceives(observableWithErrorHandler)(1, 2, 3, 4) + } + + @Test def testOnErrorReturnFuncErrorOccurs { + val observable = new ObservableWithException(subscription, "foo", "bar") + val returnFunc = (ex: RuntimeException) => "baz" + val observableWithErrorHandler = observable.onErrorReturn(returnFunc) + observableWithErrorHandler.subscribe(observer.asInstanceOf[Observer[String]]) + + try { + observable.t.join() + } catch { + case ex: InterruptedException => fail(ex.getMessage) + } + + List("foo", "bar", "baz").foreach(t => verify(observer, times(1)).onNext(t)) + verify(observer, never()).onError(any(classOf[Exception])) + verify(observer, times(1)).onCompleted() + } + + @Test def testReduce { + val observable = Observable.from(1, 2, 3, 4) + assertEquals(10, observable.reduce((a: Int, b: Int) => a + b).single) + } + + @Test def testReduceWithInitialValue { + val observable = Observable.from(1, 2, 3, 4) + assertEquals(110, observable.reduce(100, (a: Int, b: Int) => a + b).single) + } + + @Test def testSkip { + val observable = Observable.from(1, 2, 3, 4) + val skipped = observable.skip(2) + assertSubscribeReceives(skipped)(3, 4) + } + + /** + * Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating. + * observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds + * it should produce onNext(first), onNext(second), and 1 onCompleted + * + * Switching to Observable.create(OperationTake.take(observable, 2)) works as expected + */ + @Test def testTake { + import rx.operators._ + + val observable = Observable.from(1, 2, 3, 4, 5) + val took = Observable.create(OperationTake.take(observable, 2)) + assertSubscribeReceives(took)(1, 2) + } + + @Test def testTakeWhile { + val observable = Observable.from(1, 3, 5, 6, 7, 9, 11) + val took = observable.takeWhile(isOdd) + assertSubscribeReceives(took)(1, 3, 5) + } + + /*@Test def testTakeWhileWithIndex { + val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17) + val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4) + assertSubscribeReceives(took)(9, 11) + }*/ + + @Test def testTakeLast { + val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9) + val tookLast = observable.takeLast(3) + assertSubscribeReceives(tookLast)(7, 8, 9) + } + + @Test def testToList { + val observable = Observable.from(1, 2, 3, 4) + val toList = observable.toList + assertSubscribeReceives(toList)(List(1, 2, 3, 4).asJava) + } + + @Test def testToSortedList { + val observable = Observable.from(1, 3, 4, 2) + val toSortedList = observable.toSortedList + assertSubscribeReceives(toSortedList)(List(1, 2, 3, 4).asJava) + } + + @Test def testToArbitrarySortedList { + val observable = Observable.from("a", "aaa", "aaaa", "aa") + val sortByLength = (s1: String, s2: String) => s1.length.compareTo(s2.length) + val toSortedList = observable.toSortedList(sortByLength) + assertSubscribeReceives(toSortedList)(List("a", "aa", "aaa", "aaaa").asJava) + } + + @Test def testToIterable { + val observable = Observable.from(1, 2) + val it = observable.toIterable.iterator + assertTrue(it.hasNext) + assertEquals(1, it.next) + assertTrue(it.hasNext) + assertEquals(2, it.next) + assertFalse(it.hasNext) + } + + @Test def testStartWith { + val observable = Observable.from(1, 2, 3, 4) + val newStart = observable.startWith(-1, 0) + assertSubscribeReceives(newStart)(-1, 0, 1, 2, 3, 4) + } + + @Test def testOneLineForComprehension { + val mappedObservable = for { + i: Int <- Observable.from(1, 2, 3, 4) + } yield i + 1 + assertSubscribeReceives(mappedObservable)(2, 3, 4, 5) + assertFalse(mappedObservable.isInstanceOf[ScalaObservable[_]]) + } + + @Test def testSimpleMultiLineForComprehension { + val flatMappedObservable = for { + i: Int <- Observable.from(1, 2, 3, 4) + j: Int <- Observable.from(1, 10, 100, 1000) + } yield i + j + assertSubscribeReceives(flatMappedObservable)(2, 12, 103, 1004) + assertFalse(flatMappedObservable.isInstanceOf[ScalaObservable[_]]) + } + + @Test def testMultiLineForComprehension { + val doubler = (i: Int) => Observable.from(i, i) + val flatMappedObservable = for { + i: Int <- Observable.from(1, 2, 3, 4) + j: Int <- doubler(i) + } yield j + //can't use assertSubscribeReceives since each number comes in 2x + flatMappedObservable.subscribe(observer.asInstanceOf[Observer[Int]]) + List(1, 2, 3, 4).foreach(i => verify(observer, times(2)).onNext(i)) + verify(observer, never()).onError(any(classOf[Exception])) + verify(observer, times(1)).onCompleted() + assertFalse(flatMappedObservable.isInstanceOf[ScalaObservable[_]]) + } + + @Test def testFilterInForComprehension { + val doubler = (i: Int) => Observable.from(i, i) + val filteredObservable = for { + i: Int <- Observable.from(1, 2, 3, 4) + j: Int <- doubler(i) if isOdd(i) + } yield j + //can't use assertSubscribeReceives since each number comes in 2x + filteredObservable.subscribe(observer.asInstanceOf[Observer[Int]]) + List(1, 3).foreach(i => verify(observer, times(2)).onNext(i)) + verify(observer, never()).onError(any(classOf[Exception])) + verify(observer, times(1)).onCompleted() + assertFalse(filteredObservable.isInstanceOf[ScalaObservable[_]]) + } + + @Test def testForEachForComprehension { + val doubler = (i: Int) => Observable.from(i, i) + val intBuffer = ArrayBuffer.empty[Int] + val forEachComprehension = for { + i: Int <- Observable.from(1, 2, 3, 4) + j: Int <- doubler(i) if isEven(i) + } { + intBuffer.append(j) + } + assertEquals(List(2, 2, 4, 4), intBuffer.toList) + } + + private def assertSubscribeReceives[T](o: Observable[T])(values: T*) = { + o.subscribe(observer.asInstanceOf[Observer[T]]) + values.toList.foreach(t => verify(observer, times(1)).onNext(t)) + verify(observer, never()).onError(any(classOf[Exception])) + verify(observer, times(1)).onCompleted() + } +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala deleted file mode 100644 index 584c7c587f..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala - -import rx.util.functions.FunctionLanguageAdaptor -import org.junit.{Assert, Before, Test} -import rx.Observable -import org.scalatest.junit.JUnitSuite -import org.mockito.Mockito._ -import org.mockito.{MockitoAnnotations, Mock} - -import scala.collection.JavaConverters._ -import collection.mutable.ArrayBuffer - -class ScalaAdaptor extends FunctionLanguageAdaptor { - - val ON_NEXT = "onNext" - val ON_ERROR = "onError" - val ON_COMPLETED = "onCompleted" - - def getFunctionClass: Array[Class[_]] = { - return Array(classOf[Map[String, _]], classOf[(AnyRef) => Object], classOf[(AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef) => Object], classOf[(AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) =>Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object], - classOf[(AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object]) - } - - def call(function: AnyRef, args: Array[AnyRef]) : Object = { - function match { - case (func: Map[String, _]) => return matchOption(func.get(ON_NEXT), args) - case _ => return matchFunction(function, args) - } - } - - private def matchOption(funcOption: Option[_], args: Array[AnyRef]) : Object = { - funcOption match { - case Some(func: AnyRef) => return matchFunction(func, args) - case _ => return None - } - } - - private def matchFunction(function: AnyRef, args: Array[AnyRef]) : Object = function match { - case (f: ((AnyRef) => Object)) => return f(args(0)) - case (f: ((AnyRef, AnyRef) => Object)) => return f(args(0), args(1)) - case (f: ((AnyRef, AnyRef, AnyRef) => Object)) => return f(args(0), args(1), args(2)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15), args(16)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15), args(16), args(17)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15), args(16), args(17), args(18)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15), args(16), args(17), args(18), args(19)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15), args(16), args(17), args(18), args(19), args(20)) - case (f: ((AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef, AnyRef) => Object)) => - return f(args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11), args(12), args(13), args(14), args(15), args(16), args(17), args(18), args(19), args(20), args(21)) - - } -} - -class UnitTestSuite extends JUnitSuite { - @Mock private[this] - val assertion: ScriptAssertion = null - - @Before def before { - MockitoAnnotations.initMocks(this) - } - - @Test def testTake() { - Observable.toObservable("1", "2", "3").take(1).subscribe(Map( - "onNext" -> ((callback: String) => { - print("testTake: callback = " + callback) - assertion.received(callback) - }) - )) - verify(assertion, times(1)).received("1") - } - - @Test def testClosureVersusMap() { - // using closure - Observable.toObservable("1", "2", "3") - .take(2) - .subscribe((callback: String) => { - println(callback) - }) - - // using Map of closures - Observable.toObservable("1", "2", "3") - .take(2) - .subscribe(Map( - "onNext" -> ((callback: String) => { - println(callback) - }))) - } - - @Test def testFilterWithToList() { - val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) - numbers.filter((x: Int) => 0 == (x % 2)).toList().subscribe( - (callback: java.util.List[Int]) => { - val lst = callback.asScala.toList - println("filter onNext -> got " + lst) - assertion.received(lst) - } - ) - verify(assertion, times(1)).received(List(2,4,6,8)) - } - - @Test def testTakeLast() { - val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) - numbers.takeLast(1).subscribe((callback: Int) => { - println("testTakeLast: onNext -> got " + callback) - assertion.received(callback) - }) - verify(assertion, times(1)).received(9) - } - - @Test def testMap() { - val numbers = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9) - val mappedNumbers = new ArrayBuffer[Int]() - numbers.map(((x: Int)=> { x * x })).subscribe(((squareVal: Int) => { - println("square is " + squareVal ) - mappedNumbers += squareVal - })) - Assert.assertEquals(List(1,4,9,16,25,36,49,64,81), mappedNumbers.toList) - - } - - @Test def testZip() { - val numbers = Observable.toObservable(1, 2, 3) - val colors = Observable.toObservable("red", "green", "blue") - val characters = Observable.toObservable("lion-o", "cheetara", "panthro") - - Observable.zip(numbers.toList, colors.toList, characters.toList, ((n: java.util.List[Int], c: java.util.List[String], t: java.util.List[String]) => { Map( - "numbers" -> n, - "colors" -> c, - "thundercats" -> t - )})).subscribe((m: Map[String, _]) => { - println("zipped map is " + m.toString()) - }) - - - } - - trait ScriptAssertion { - def error(ex: Exception) - - def received(obj: Any) - } -} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d24e48106c..ccbb498d0b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -184,70 +184,6 @@ public Subscription subscribe(Observer observer) { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(final Map callbacks) { - // lookup and memoize onNext - Object _onNext = callbacks.get("onNext"); - if (_onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNext = Functions.from(_onNext); - - return subscribe(new Observer() { - - public void onCompleted() { - Object onComplete = callbacks.get("onCompleted"); - if (onComplete != null) { - Functions.from(onComplete).call(); - } - } - - public void onError(Exception e) { - handleError(e); - Object onError = callbacks.get("onError"); - if (onError != null) { - Functions.from(onError).call(e); - } - } - - public void onNext(Object args) { - onNext.call(args); - } - - }); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(final Object o) { - if (o instanceof Observer) { - // in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method. - return subscribe((Observer) o); - } - - // lookup and memoize onNext - if (o == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNext = Functions.from(o); - - return subscribe(new Observer() { - - public void onCompleted() { - // do nothing - } - - public void onError(Exception e) { - handleError(e); - // no callback defined - } - - public void onNext(Object args) { - onNext.call(args); - } - - }); - } - public Subscription subscribe(final Action1 onNext) { return subscribe(new Observer() { @@ -271,34 +207,6 @@ public void onNext(T args) { }); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(final Object onNext, final Object onError) { - // lookup and memoize onNext - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNextFunction = Functions.from(onNext); - - return subscribe(new Observer() { - - public void onCompleted() { - // do nothing - } - - public void onError(Exception e) { - handleError(e); - if (onError != null) { - Functions.from(onError).call(e); - } - } - - public void onNext(Object args) { - onNextFunction.call(args); - } - - }); - } - public Subscription subscribe(final Action1 onNext, final Action1 onError) { return subscribe(new Observer() { @@ -324,36 +232,6 @@ public void onNext(T args) { }); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) { - // lookup and memoize onNext - if (onNext == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNextFunction = Functions.from(onNext); - - return subscribe(new Observer() { - - public void onCompleted() { - if (onComplete != null) { - Functions.from(onComplete).call(); - } - } - - public void onError(Exception e) { - handleError(e); - if (onError != null) { - Functions.from(onError).call(e); - } - } - - public void onNext(Object args) { - onNextFunction.call(args); - } - - }); - } - public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { return subscribe(new Observer() { @@ -435,40 +313,6 @@ public void onNext(T args) { } } - /** - * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. - *

- * NOTE: This will block even if the Observable is asynchronous. - *

- * This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods. - * - * @param onNext - * {@link Action1} - * @throws RuntimeException - * if error occurs - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void forEach(final Object o) { - if (o instanceof Action1) { - // in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method. - forEach((Action1) o); - } - - // lookup and memoize onNext - if (o == null) { - throw new RuntimeException("onNext must be implemented"); - } - final FuncN onNext = Functions.from(o); - - forEach(new Action1() { - - public void call(Object args) { - onNext.call(args); - } - - }); - } - /** * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. * @@ -489,17 +333,6 @@ public T single(Func1 predicate) { return single(this, predicate); } - /** - * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. - * - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence. - */ - public T single(Object predicate) { - return single(this, predicate); - } - /** * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. * @@ -524,19 +357,6 @@ public T singleOrDefault(T defaultValue, Func1 predicate) { return singleOrDefault(this, defaultValue, predicate); } - /** - * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param defaultValue - * default value for a sequence. - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public T singleOrDefault(T defaultValue, Object predicate) { - return singleOrDefault(this, defaultValue, predicate); - } - /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * @@ -625,38 +445,6 @@ private static Observable _create(Func1, Subscription> func) return new Observable(func, true); } - /** - * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. - *

- * This method accept {@link Object} to allow different languages to pass in closures using {@link FunctionLanguageAdaptor}. - *

- * Write the function you pass to create so that it behaves as an Observable - calling the passed-in - * onNext, onError, and onCompleted methods appropriately. - *

- * A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once. - *

- * See Rx Design Guidelines (PDF) for detailed information. - * - * @param - * the type emitted by the Observable sequence - * @param func - * a function that accepts an Observer and calls its onNext, onError, and onCompleted methods - * as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable) - * @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function - */ - public static Observable create(final Object func) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(func); - return create(new Func1, Subscription>() { - - @Override - public Subscription call(Observer t1) { - return (Subscription) _f.call(t1); - } - - }); - } - /** * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method. *

@@ -699,31 +487,6 @@ public static Observable filter(Observable that, Func1 pre return _create(OperationFilter.filter(that, predicate)); } - /** - * Filters an Observable by discarding any of its emissions that do not meet some test. - *

- * - * - * @param that - * the Observable to filter - * @param function - * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter - * @return an Observable that emits only those items in the original Observable that the filter evaluates as true - */ - public static Observable filter(Observable that, final Object function) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(function); - return filter(that, new Func1() { - - @Override - public Boolean call(T t1) { - return (Boolean) _f.call(t1); - - } - - }); - } - /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

@@ -799,33 +562,6 @@ public static Observable defer(Func0> observableFactory) { return _create(OperationDefer.defer(observableFactory)); } - /** - * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. - * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer - * subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version - * of the sequence. - * - * @param observableFactory - * the observable factory function to invoke for each observer that subscribes to the resulting sequence. - * @param - * the type of the observable. - * @return the observable sequence whose observers trigger an invocation of the given observable factory function. - */ - public static Observable defer(Object observableFactory) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(observableFactory); - - return _create(OperationDefer.defer(new Func0>() { - - @Override - @SuppressWarnings("unchecked") - public Observable call() { - return (Observable) _f.call(); - } - - })); - } - /** * Returns an Observable that notifies an {@link Observer} of a single value and then completes. *

@@ -879,19 +615,6 @@ public static T last(final Observable that, final Func1 predi return last(that.filter(predicate)); } - /** - * Returns the last element of an observable sequence that matches the predicate. - * - * @param that - * the source Observable - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element in the observable sequence. - */ - public static T last(final Observable that, final Object predicate) { - return last(that.filter(predicate)); - } - /** * Returns the last element of an observable sequence, or a default value if no value is found. * @@ -936,31 +659,6 @@ public static T lastOrDefault(Observable source, T defaultValue, Func1 - * the type of source. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public static T lastOrDefault(Observable source, T defaultValue, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return lastOrDefault(source, defaultValue, new Func1() { - @Override - public Boolean call(T args) { - return (Boolean) _f.call(args); - } - }); - } - /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. @@ -982,37 +680,6 @@ public static Observable map(Observable sequence, Func1 func) return _create(OperationMap.map(sequence, func)); } - /** - * Applies a function of your choosing to every notification emitted by an Observable, and returns - * this transformation as a new Observable sequence. - *

- * - * - * @param sequence - * the source Observable - * @param func - * a function to apply to each item in the sequence emitted by the source Observable - * @param - * the type of items emitted by the the source Observable - * @param - * the type of items returned by map function - * @return an Observable that is the result of applying the transformation function to each item - * in the sequence emitted by the source Observable - */ - public static Observable map(Observable sequence, final Object func) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(func); - return map(sequence, new Func1() { - - @SuppressWarnings("unchecked") - @Override - public R call(T t1) { - return (R) _f.call(t1); - } - - }); - } - /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, @@ -1038,41 +705,6 @@ public static Observable mapMany(Observable sequence, Func1 - * - * - * @param sequence - * the source Observable - * @param func - * a function to apply to each item emitted by the source Observable, generating a - * Observable - * @param - * the type emitted by the source Observable - * @param - * the type emitted by the Observables emitted by func - * @return an Observable that emits a sequence that is the result of applying the transformation - * function to each item emitted by the source Observable and merging the results of - * the Observables obtained from this transformation - */ - public static Observable mapMany(Observable sequence, final Object func) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(func); - return mapMany(sequence, new Func1() { - - @SuppressWarnings("unchecked") - @Override - public R call(T t1) { - return (R) _f.call(t1); - } - - }); - } - /** * Materializes the implicit notifications of an observable sequence as explicit notification values. *

@@ -1319,43 +951,6 @@ public static Observable onErrorResumeNext(final Observable that, fina return _create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); } - /** - * Instruct an Observable to pass control to another Observable (the return value of a function) - * rather than calling onError if it encounters an error. - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, - * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more - * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a - * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, - * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it - * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if - * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may - * never know that an error happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. - *

- * - * - * @param that - * the source Observable - * @param resumeFunction - * a function that returns an Observable that will take over if the source Observable - * encounters an error - * @return the source Observable, with its behavior modified as described - */ - public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(resumeFunction); - return onErrorResumeNext(that, new Func1>() { - - @SuppressWarnings("unchecked") - @Override - public Observable call(Exception e) { - return (Observable) _f.call(e); - } - }); - } - /** * Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error. *

@@ -1398,83 +993,13 @@ public static Observable onErrorResumeNext(final Observable that, fina * * @param that * the source Observable - * @param resumeFunction - * a function that returns a value that will be passed into an {@link Observer}'s onNext function if the Observable encounters an error that would - * otherwise cause it to call onError - * @return the source Observable, with its behavior modified as described - */ - public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { - return _create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject - * method that does a similar operation on lists. - *

- * - * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable) - * - * @return an Observable that emits a single element that is the result of accumulating the - * output from applying the accumulator to the sequence of items emitted by the source - * Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public static Observable reduce(Observable sequence, Func2 accumulator) { - return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject - * method that does a similar operation on lists. - *

- * - * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable) - * - * @return an Observable that emits a single element that is the result of accumulating the - * output from applying the accumulator to the sequence of items emitted by the source - * Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public static Observable reduce(final Observable sequence, final Object accumulator) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(accumulator); - return reduce(sequence, new Func2() { - - @SuppressWarnings("unchecked") - @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); - } - - }); + * @param resumeFunction + * a function that returns a value that will be passed into an {@link Observer}'s onNext function if the Observable encounters an error that would + * otherwise cause it to call onError + * @return the source Observable, with its behavior modified as described + */ + public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { + return _create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); } /** @@ -1493,8 +1018,6 @@ public T call(T t1, T t2) { * the type item emitted by the source Observable * @param sequence * the source Observable - * @param initialValue - * a seed passed into the first execution of the accumulator function * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) @@ -1505,8 +1028,8 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { - return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1); + public static Observable reduce(Observable sequence, Func2 accumulator) { + return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1); } /** @@ -1536,18 +1059,8 @@ public static Observable reduce(Observable sequence, T initialValue, F * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public static Observable reduce(final Observable sequence, final T initialValue, final Object accumulator) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(accumulator); - return reduce(sequence, initialValue, new Func2() { - - @SuppressWarnings("unchecked") - @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); - } - - }); + public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { + return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** @@ -1573,39 +1086,6 @@ public static Observable scan(Observable sequence, Func2 accu return _create(OperationScan.scan(sequence, accumulator)); } - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations as its own sequence. - *

- * - * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be emitted and used in the next accumulator call (if applicable) - * @return an Observable that emits a sequence of items that are the result of accumulating the - * output from the sequence emitted by the source Observable - * @see MSDN: Observable.Scan - */ - public static Observable scan(final Observable sequence, final Object accumulator) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(accumulator); - return scan(sequence, new Func2() { - - @SuppressWarnings("unchecked") - @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); - } - - }); - } - /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1782,26 +1262,6 @@ public static Observable takeWhile(final Observable items, Func1 Observable takeWhile(final Observable items, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return takeWhile(items, new Func1() { - @Override - public Boolean call(T t) { - return (Boolean) _f.call(t); - } - }); - } - /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * @@ -1811,10 +1271,6 @@ public Boolean call(T t) { * @return */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { - return create(OperationTake.takeWhileWithIndex(items, predicate)); - } - - public static Observable takeWhileWithIndex(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); @@ -1935,28 +1391,6 @@ public static T single(Observable that, Func1 predicate) { return single(that.filter(predicate)); } - /** - * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. - * - * @param that - * the source Observable - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence. - * @throws IllegalStateException - * if there is not exactly one element in the observable sequence that matches the predicate - */ - public static T single(Observable that, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return single(that, new Func1() { - @Override - public Boolean call(T t) { - return (Boolean) _f.call(t); - } - }); - } /** * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. @@ -1986,29 +1420,6 @@ public static T singleOrDefault(Observable that, T defaultValue, Func1 T singleOrDefault(Observable that, T defaultValue, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return singleOrDefault(that, defaultValue, new Func1() { - @Override - public Boolean call(T t) { - return (Boolean) _f.call(t); - } - }); - } - private static T singleOrDefault(Observable that, boolean hasDefault, T defaultVal) { Iterator it = that.toIterable().iterator(); @@ -2136,28 +1547,6 @@ public static Observable> toSortedList(Observable sequence, Func2 return _create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); } - /** - * Sort T objects using the defined sort function. - *

- * - * - * @param sequence - * @param sortFunction - * @return - */ - public static Observable> toSortedList(Observable sequence, final Object sortFunction) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(sortFunction); - return _create(OperationToObservableSortedList.toSortedList(sequence, new Func2() { - - @Override - public Integer call(T t1, T t2) { - return (Integer) _f.call(t1, t2); - } - - })); - } - /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by two other Observables, with the results of this function becoming the @@ -2223,145 +1612,17 @@ public static Observable sequenceEqual(Observable first, Observa return zip(first, second, equality); } - /** - * Determines whether two sequences are equal by comparing the elements pairwise using a specified equality function. - * - * @param first - * observable sequence to compare - * @param second - * observable sequence to compare - * @param equality - * a function used to compare elements of both sequences - * @param - * type of sequence - * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. - */ - public static Observable sequenceEqual(Observable first, Observable second, Object equality) { - return zip(first, second, equality); - } - - /** - * Returns an Observable that applies a function of your choosing to the combination of items - * emitted, in sequence, by two other Observables, with the results of this function becoming the - * sequence emitted by the returned Observable. - *

- * zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by - * w0 - * and the first item emitted by w1; the - * second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0 and the second item emitted by w1; and so forth. - *

- * The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the - * shortest sequence. - *

- * - * - * @param w0 - * one source Observable - * @param w1 - * another source Observable - * @param function - * a function that, when applied to an item emitted by each of the source Observables, - * results in a value that will be emitted by the resulting Observable - * @return an Observable that emits the zipped results - */ - public static Observable zip(Observable w0, Observable w1, final Object function) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(function); - return zip(w0, w1, new Func2() { - - @SuppressWarnings("unchecked") - @Override - public R call(T0 t0, T1 t1) { - return (R) _f.call(t0, t1); - } - - }); - } - - /** - * Returns an Observable that applies a function of your choosing to the combination of items - * emitted, in sequence, by three other Observables, with the results of this function becoming - * the sequence emitted by the returned Observable. - *

- * zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by - * w0, - * the first item emitted by w1, and the - * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item - * emitted by w1, and the second item - * emitted by w2; and so forth. - *

- * The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the - * shortest sequence. - *

- * - * - * @param w0 - * one source Observable - * @param w1 - * another source Observable - * @param w2 - * a third source Observable - * @param function - * a function that, when applied to an item emitted by each of the source Observables, - * results in a value that will be emitted by the resulting Observable - * @return an Observable that emits the zipped results - */ - public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { - return _create(OperationZip.zip(w0, w1, w2, function)); - } - /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by three other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by - * w0, - * the first item emitted by w1, and the - * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item - * emitted by w1, and the second item - * emitted by w2; and so forth. - *

- * The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the - * shortest sequence. - *

- * - * - * @param w0 - * one source Observable - * @param w1 - * another source Observable - * @param w2 - * a third source Observable - * @param function - * a function that, when applied to an item emitted by each of the source Observables, - * results in a value that will be emitted by the resulting Observable - * @return an Observable that emits the zipped results - */ - public static Observable zip(Observable w0, Observable w1, Observable w2, final Object function) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(function); - return zip(w0, w1, w2, new Func3() { - - @SuppressWarnings("unchecked") - @Override - public R call(T0 t0, T1 t1, T2 t2) { - return (R) _f.call(t0, t1, t2); - } - - }); - } - - /** - * Returns an Observable that applies a function of your choosing to the combination of items - * emitted, in sequence, by four other Observables, with the results of this function becoming - * the sequence emitted by the returned Observable. - *

- * zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by - * w0, - * the first item emitted by w1, the - * first item emitted by w2, and the first item emitted by w3; the second item emitted by the new Observable will be the result of the function applied to the second item - * emitted by each of those Observables; and so forth. + * w0, + * the first item emitted by w1, and the + * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item + * emitted by w1, and the second item + * emitted by w2; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. @@ -2374,15 +1635,13 @@ public R call(T0 t0, T1 t1, T2 t2) { * another source Observable * @param w2 * a third source Observable - * @param w3 - * a fourth source Observable - * @param reduceFunction + * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ - public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { - return _create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); + public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { + return _create(OperationZip.zip(w0, w1, w2, function)); } /** @@ -2409,23 +1668,13 @@ public static Observable zip(Observable w0, Observabl * a third source Observable * @param w3 * a fourth source Observable - * @param function + * @param reduceFunction * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ - public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, final Object function) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(function); - return zip(w0, w1, w2, w3, new Func4() { - - @SuppressWarnings("unchecked") - @Override - public R call(T0 t0, T1 t1, T2 t2, T3 t3) { - return (R) _f.call(t0, t1, t2, t3); - } - - }); + public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { + return _create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); } /** @@ -2443,28 +1692,6 @@ public Observable filter(Func1 predicate) { return filter(this, predicate); } - /** - * Filters an Observable by discarding any of its emissions that do not meet some test. - *

- * - * - * @param callback - * a function that evaluates the items emitted by the source Observable, returning - * true if they pass the filter - * @return an Observable that emits only those items in the original Observable that the filter - * evaluates as "true" - */ - public Observable filter(final Object callback) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(callback); - return filter(this, new Func1() { - - public Boolean call(T t1) { - return (Boolean) _f.call(t1); - } - }); - } - /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

@@ -2497,17 +1724,6 @@ public T last() { * @return the last element in the observable sequence. */ public T last(final Func1 predicate) { - return last(this, predicate); - } - - /** - * Returns the last element of an observable sequence that matches the predicate. - * - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element in the observable sequence. - */ - public T last(final Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); @@ -2543,19 +1759,6 @@ public T lastOrDefault(T defaultValue, Func1 predicate) { return lastOrDefault(this, defaultValue, predicate); } - /** - * Returns the last element that matches the predicate, or a default value if no value is found. - * - * @param defaultValue - * a default value that would be returned if observable is empty. - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public T lastOrDefault(T defaultValue, Object predicate) { - return lastOrDefault(this, defaultValue, predicate); - } - /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. @@ -2571,29 +1774,6 @@ public Observable map(Func1 func) { return map(this, func); } - /** - * Applies a function of your choosing to every item emitted by an Observable, and returns this - * transformation as a new Observable sequence. - *

- * - * - * @param callback - * a function to apply to each item in the sequence. - * @return an Observable that emits a sequence that is the result of applying the transformation - * function to each item in the sequence emitted by the input Observable. - */ - public Observable map(final Object callback) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(callback); - return map(this, new Func1() { - - @SuppressWarnings("unchecked") - public R call(T t1) { - return (R) _f.call(t1); - } - }); - } - /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and @@ -2612,32 +1792,6 @@ public Observable mapMany(Func1> func) { return mapMany(this, func); } - /** - * Creates a new Observable sequence by applying a function that you supply to each item in the - * original Observable sequence, where that function is itself an Observable that emits items, and - * then merges the results of that function applied to every item emitted by the original - * Observable, emitting these merged results as its own sequence. - *

- * - * - * @param callback - * a function to apply to each item in the sequence that returns an Observable. - * @return an Observable that emits a sequence that is the result of applying the transformation' - * function to each item in the input sequence and merging the results of the - * Observables obtained from this transformation. - */ - public Observable mapMany(final Object callback) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(callback); - return mapMany(this, new Func1>() { - - @SuppressWarnings("unchecked") - public Observable call(T t1) { - return (Observable) _f.call(t1); - } - }); - } - /** * Materializes the implicit notifications of this observable sequence as explicit notification values. *

@@ -2672,7 +1826,7 @@ public Observable dematerialize() { * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeFunction) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's - * onErrort function, it will instead relinquish control to + * onError function, it will instead relinquish control to * resumeFunction which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. @@ -2689,41 +1843,6 @@ public Observable onErrorResumeNext(final Func1> res return onErrorResumeNext(this, resumeFunction); } - /** - * Instruct an Observable to emit a particular item rather than calling onError if - * it encounters an error. - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected - * item to its Observer, the Observable calls its Observer's onError function, and - * then quits without calling any more of its Observer's closures. The - * onErrorResumeNext method changes this behavior. If you pass another Observable - * (resumeFunction) to an Observable's onErrorResumeNext method, if the - * original Observable encounters an error, instead of calling its Observer's - * onError function, it will instead relinquish control to - * resumeFunction which will call the Observer's onNext method if it - * is able to do so. In such a case, because no Observable necessarily invokes - * onError, the Observer may never know that an error happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors - * be encountered. - *

- * - * - * @param resumeFunction - * @return the original Observable with appropriately modified behavior - */ - public Observable onErrorResumeNext(final Object resumeFunction) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(resumeFunction); - return onErrorResumeNext(this, new Func1>() { - - @SuppressWarnings("unchecked") - public Observable call(Exception e) { - return (Observable) _f.call(e); - } - }); - } - /** * Instruct an Observable to pass control to another Observable rather than calling * onError if it encounters an error. @@ -2774,37 +1893,6 @@ public Observable onErrorReturn(Func1 resumeFunction) { return onErrorReturn(this, resumeFunction); } - /** - * Instruct an Observable to emit a particular item rather than calling onError if - * it encounters an error. - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected - * object to its Observer, the Observable calls its Observer's onError function, and - * then quits without calling any more of its Observer's closures. The - * onErrorReturn method changes this behavior. If you pass a function - * (resumeFunction) to an Observable's onErrorReturn method, if the - * original Observable encounters an error, instead of calling its Observer's - * onError function, it will instead call pass the return value of - * resumeFunction to the Observer's onNext method. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors - * be encountered. - * - * @param resumeFunction - * @return the original Observable with appropriately modified behavior - */ - public Observable onErrorReturn(final Object resumeFunction) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(resumeFunction); - return onErrorReturn(this, new Func1() { - - @SuppressWarnings("unchecked") - public T call(Exception e) { - return (T) _f.call(e); - } - }); - } - /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -2831,32 +1919,6 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an - * inject method that does a similar operation on lists. - *

- * - * - * @param accumulator - * An accumulator function to be invoked on each element from the sequence, whose result - * will be used in the next accumulator call (if applicable). - * - * @return an Observable that emits a single element from the result of accumulating the output - * from the list of Observables. - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public Observable reduce(Object accumulator) { - return reduce(this, accumulator); - } - /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -2885,33 +1947,6 @@ public Observable reduce(T initialValue, Func2 accumulator) { return reduce(this, initialValue, accumulator); } - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an - * inject method that does a similar operation on lists. - *

- * - * - * @param initialValue - * The initial (seed) accumulator value. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable). - * @return an Observable that emits a single element from the result of accumulating the output - * from the list of Observables. - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public Observable reduce(T initialValue, Object accumulator) { - return reduce(this, initialValue, accumulator); - } - /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -2934,29 +1969,6 @@ public Observable scan(Func2 accumulator) { return scan(this, accumulator); } - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations. It emits the result of - * each of these iterations as a sequence from the returned Observable. This sort of function is - * sometimes called an accumulator. - *

- * - * - * @param accumulator - * An accumulator function to be invoked on each element from the sequence whose - * result will be sent via onNext and used in the next accumulator call - * (if applicable). - * - * @return an Observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see MSDN: Observable.Scan - */ - public Observable scan(final Object accumulator) { - return scan(this, accumulator); - } - /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3069,17 +2081,6 @@ public Observable takeWhile(final Func1 predicate) { return takeWhile(this, predicate); } - /** - * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param predicate - * a function to test each source element for a condition - * @return - */ - public Observable takeWhile(final Object predicate) { - return takeWhile(this, predicate); - } - /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * @@ -3091,17 +2092,6 @@ public Observable takeWhileWithIndex(final Func2 predica return takeWhileWithIndex(this, predicate); } - /** - * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. - * - * @param predicate - * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return - */ - public Observable takeWhileWithIndex(final Object predicate) { - return takeWhileWithIndex(this, predicate); - } - /** * Returns an Observable that emits the last count items emitted by the source * Observable. @@ -3174,18 +2164,6 @@ public Observable> toSortedList(Func2 sortFunction) { return toSortedList(this, sortFunction); } - /** - * Sort T objects using the defined sort function. - *

- * - * - * @param sortFunction - * @return - */ - public Observable> toSortedList(final Object sortFunction) { - return toSortedList(this, sortFunction); - } - /** * Converts an observable sequence to an Iterable. * diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java index c9279adb38..025a938075 100644 --- a/rxjava-core/src/main/java/rx/subjects/Subject.java +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -77,12 +77,12 @@ public static class UnitTest { @Test public void test() { Subject subject = Subject.create(); - final AtomicReference>> actualRef = new AtomicReference>>(); + final AtomicReference>> actualRef = new AtomicReference>>(); Observable>> wNotificationsList = subject.materialize().toList(); - wNotificationsList.subscribe(new Action1>>() { + wNotificationsList.subscribe(new Action1>>() { @Override - public void call(List> actual) { + public void call(List> actual) { actualRef.set(actual); } });