Skip to content

Commit 3853258

Browse files
Subjects & Observers
1 parent f0c8cb9 commit 3853258

File tree

5 files changed

+118
-88
lines changed

5 files changed

+118
-88
lines changed

language-adaptors/rxjava-scala/ReleaseNotes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ and implement any of the methods that you care about:
3434
}
3535
```
3636
or you can use one of the overloads of the companion `Observer` object by passing in implementations of the `onNext`,
37-
`onError` or `onCompleted` methods. The advantage of this is that you get type inference as in `Observer(println(_))`.
37+
`onError` or `onCompleted` methods.
3838

3939
Note that typically you do not need to create an `Observer` since all of the methods that accept an `Observer[T]`
4040
(for instance `subscribe`) usually come with overloads that accept the individual methods

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ trait Observable[+T]
7979
import rx.lang.scala.observables.BlockingObservable
8080
import rx.lang.scala.ImplicitFunctionConversions._
8181

82-
private [scala] def asJavaObservable: rx.Observable[_ <: T]
82+
private [scala] val asJavaObservable: rx.Observable[_ <: T]
8383

8484
/**
8585
* $subscribeObserverMain

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.lang.scala
1717

1818
import rx.joins.ObserverBase
19+
import java.util.concurrent.atomic.AtomicBoolean
1920

2021
/**
2122
Provides a mechanism for receiving push-based notifications.
@@ -26,11 +27,7 @@ import rx.joins.ObserverBase
2627
*/
2728
trait Observer[-T] {
2829

29-
private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
30-
protected def onCompletedCore(): Unit = onCompleted()
31-
protected def onErrorCore(error: Throwable): Unit = onError(error)
32-
protected def onNextCore(value: T): Unit = onNext(value)
33-
}
30+
private [scala] val asJavaObserver: rx.Observer[_ >: T]
3431

3532
/**
3633
* Provides the Observer with new data.
@@ -39,50 +36,51 @@ trait Observer[-T] {
3936
*
4037
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
4138
*/
42-
def onNext(value: T): Unit = {}
39+
def onNext(value: T): Unit
4340

4441
/**
4542
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
4643
*
4744
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
4845
*/
49-
def onError(error: Throwable): Unit = {}
46+
def onError(error: Throwable): Unit
5047

5148
/**
5249
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
5350
*
5451
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
5552
*/
56-
def onCompleted(): Unit = {}
53+
def onCompleted(): Unit
5754

5855
}
5956

6057
object Observer {
58+
6159
/**
62-
* Assume that the underlying rx.Observer does not need to be wrapped.
60+
* Scala calls XXX; Java receives XXX.
6361
*/
6462
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
6563
new Observer[T] {
6664

67-
override def asJavaObserver = observer
68-
69-
override def onNext(value: T): Unit = asJavaObserver.onNext(value)
70-
override def onError(error: Throwable): Unit = asJavaObserver.onError(error)
71-
override def onCompleted(): Unit = asJavaObserver.onCompleted()
65+
val asJavaObserver = observer
7266

67+
def onNext(value: T): Unit = asJavaObserver.onNext(value)
68+
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
69+
def onCompleted(): Unit = asJavaObserver.onCompleted()
7370
}
71+
7472
}
7573

7674
def apply[T]( ): Observer[T] = apply(v=>{}, e=>{}, ()=>{})
7775
def apply[T](onNext: T=>Unit ): Observer[T] = apply(onNext, e=>{}, ()=>{})
7876
def apply[T](onNext: T=>Unit, onError: Throwable=>Unit ): Observer[T] = apply(onNext, onError, ()=>{})
7977
def apply[T](onNext: T=>Unit, onCompleted: ()=>Unit): Observer[T] = apply(onNext, e=>{}, onCompleted)
80-
def apply[T](onNext: T=>Unit, onError: Throwable=>Unit, onCompleted: ()=>Unit): Observer[T] = {
81-
val n = onNext; val e = onError; val c = onCompleted
82-
new Observer[T] {
83-
override def onNext(value: T): Unit = n(value)
84-
override def onError(error: Throwable): Unit = e(error)
85-
override def onCompleted(): Unit = c()
86-
}
78+
def apply[T](n: T=>Unit, e: Throwable=>Unit, c: ()=>Unit): Observer[T] = {
79+
// Java calls XXX; Scala receives XXX.
80+
Observer(new rx.Observer[T]{
81+
def onNext(value: T): Unit = n(value)
82+
def onError(error: Throwable): Unit = e(error)
83+
def onCompleted(): Unit = c()
84+
})
8785
}
8886
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,13 @@ import rx.joins.ObserverBase
2323
trait Subject[-T, +R] extends Observable[R] with Observer[T] {
2424
private [scala] val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
2525

26-
def asJavaObservable: rx.Observable[_ <: R] = asJavaSubject
26+
val asJavaObservable: rx.Observable[_ <: R] = asJavaSubject
2727

28-
// temporary hack to workaround bugs in rx Subjects
29-
override def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
30-
protected def onNextCore(value: T) = asJavaSubject.onNext(value)
31-
protected def onErrorCore(error: Throwable) = asJavaSubject.onError(error)
32-
protected def onCompletedCore() = asJavaSubject.onCompleted()
33-
}
28+
val asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
29+
def onNext(value: T): Unit = { asJavaObserver.onNext(value)}
30+
def onError(error: Throwable): Unit = { asJavaObserver.onError(error) }
31+
def onCompleted() { asJavaObserver.onCompleted() }
3432

35-
override def onNext(value: T): Unit = asJavaObserver.onNext(value)
36-
override def onError(error: Throwable): Unit = asJavaObserver.onError(error)
37-
override def onCompleted(): Unit = asJavaObserver.onCompleted()
3833

3934
}
4035

Lines changed: 92 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,95 @@
11
package rx.lang.scala
22

3+
import org.junit.{Assert, Test}
4+
import org.scalatest.junit.JUnitSuite
5+
import scala.concurrent.duration._
6+
import scala.language.postfixOps
7+
import rx.lang.scala.schedulers.TestScheduler
8+
import rx.lang.scala.subjects.BehaviorSubject
9+
import org.mockito.Mockito._
10+
import org.mockito.Matchers._
311

4-
//package rx.lang.scala.examples
5-
//
6-
//import org.junit.{Assert, Test}
7-
//import org.scalatest.junit.JUnitSuite
8-
//import scala.concurrent.duration._
9-
//import scala.language.postfixOps
10-
//import rx.lang.scala.{ Observable, Observer }
11-
//import rx.lang.scala.schedulers.TestScheduler
12-
//import rx.lang.scala.subjects.BehaviorSubject
13-
//import org.mockito.Mockito._
14-
//import org.mockito.Matchers._
15-
//
16-
// @Test def PublishSubjectIsAChannel() {
17-
//
18-
// val channel: BehaviorSubject[Integer] = BehaviorSubject(2013)
19-
// val observerA: Observer[Integer] = mock(classOf[Observer[Integer]])
20-
// val observerB: Observer[Integer] = mock(classOf[Observer[Integer]])
21-
// val observerC: Observer[Integer] = mock(classOf[Observer[Integer]])
22-
//
23-
// val x = inOrder(observerA, observerB, observerC)
24-
//
25-
// val a = channel.subscribe(observerA)
26-
// val b = channel.subscribe(observerB)
27-
//
28-
// x.verify(observerA).onNext(2013)
29-
// x.verify(observerB).onNext(2013)
30-
//
31-
// channel.onNext(42)
32-
//
33-
// x.verify(observerA).onNext(42)
34-
// x.verify(observerB).onNext(42)
35-
//
36-
// a.unsubscribe()
37-
//
38-
// channel.onNext(4711)
39-
//
40-
// x.verify(observerA, never()).onNext(any())
41-
// x.verify(observerB).onNext(4711)
42-
//
43-
// channel.onCompleted()
44-
//
45-
// x.verify(observerA, never()).onCompleted()
46-
// x.verify(observerB).onCompleted()
47-
//
48-
// val c = channel.subscribe(observerC)
49-
//
50-
// x.verify(observerC).onCompleted()
51-
//
52-
// channel.onNext(13)
53-
//
54-
// x.verifyNoMoreInteractions()
55-
//
56-
// }
57-
//
58-
//}
12+
13+
/**
14+
* No fucking clue how to properly mock traits.
15+
* Some old-school imperative code works just as well.
16+
*/
17+
class SubjectTest extends JUnitSuite {
18+
19+
@Test def PublishSubjectIsAChannel() {
20+
21+
var lastA: Integer = null
22+
var errorA: Throwable = null
23+
var completedA: Boolean = false
24+
val observerA = Observer[Integer](
25+
(next: Integer) => { lastA = next },
26+
(error: Throwable) => { errorA = error },
27+
() => { completedA = true }
28+
)
29+
30+
var lastB: Integer = null
31+
var errorB: Throwable = null
32+
var completedB: Boolean = false
33+
val observerB = Observer[Integer](
34+
(next: Integer) => { lastB = next },
35+
(error: Throwable) => { errorB = error },
36+
() => { completedB = true }
37+
)
38+
39+
var lastC: Integer = null
40+
var errorC: Throwable = null
41+
var completedC: Boolean = false
42+
val observerC = Observer[Integer](
43+
(next: Integer) => { lastC = next },
44+
(error: Throwable) => { errorC = error },
45+
() => { completedC = true }
46+
)
47+
48+
val channel: BehaviorSubject[Integer] = BehaviorSubject(2013)
49+
50+
val a = channel.subscribe(observerA)
51+
Assert.assertEquals(2013, lastA)
52+
53+
val b = channel.subscribe(observerB)
54+
Assert.assertEquals(2013, lastB)
55+
56+
channel.onNext(42)
57+
Assert.assertEquals(42, lastA)
58+
Assert.assertEquals(42, lastB)
59+
60+
a.unsubscribe()
61+
channel.onNext(4711)
62+
Assert.assertEquals(42, lastA)
63+
Assert.assertEquals(4711, lastB)
64+
65+
channel.onCompleted()
66+
Assert.assertFalse(completedA)
67+
Assert.assertTrue(completedB)
68+
Assert.assertEquals(42, lastA)
69+
Assert.assertEquals(4711, lastB)
70+
71+
val c = channel.subscribe(observerC)
72+
channel.onNext(13)
73+
74+
Assert.assertEquals(null, lastC)
75+
Assert.assertTrue(completedC)
76+
77+
Assert.assertFalse(completedA)
78+
Assert.assertTrue(completedB)
79+
Assert.assertEquals(42, lastA)
80+
Assert.assertEquals(4711, lastB)
81+
82+
channel.onError(new Exception("!"))
83+
84+
Assert.assertEquals(null, lastC)
85+
Assert.assertTrue(completedC)
86+
87+
Assert.assertFalse(completedA)
88+
Assert.assertTrue(completedB)
89+
Assert.assertEquals(42, lastA)
90+
Assert.assertEquals(4711, lastB)
91+
92+
93+
}
94+
95+
}

0 commit comments

Comments
 (0)