diff --git a/src/main/java/io/reactivex/subjects/AsyncSubject.java b/src/main/java/io/reactivex/subjects/AsyncSubject.java index e447ee08ac..519811a803 100644 --- a/src/main/java/io/reactivex/subjects/AsyncSubject.java +++ b/src/main/java/io/reactivex/subjects/AsyncSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.Nullable; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; @@ -85,22 +86,22 @@ * AsyncSubject<Object> subject = AsyncSubject.create(); * * TestObserver<Object> to1 = subject.test(); - * + * * to1.assertEmpty(); - * + * * subject.onNext(1); - * + * * // AsyncSubject only emits when onComplete was called. * to1.assertEmpty(); * * subject.onNext(2); * subject.onComplete(); - * + * * // onComplete triggers the emission of the last cached item and the onComplete event. * to1.assertResult(2); - * + * * TestObserver<Object> to2 = subject.test(); - * + * * // late Observers receive the last cached item too * to2.assertResult(2); * @@ -313,6 +314,7 @@ public boolean hasValue() { *
The method is thread-safe. * @return a single value the Subject currently has or null if no such value exists */ + @Nullable public T getValue() { return subscribers.get() == TERMINATED ? value : null; } diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index d69d444ffb..3c006beb53 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -14,6 +14,7 @@ package io.reactivex.subjects; import io.reactivex.annotations.CheckReturnValue; +import io.reactivex.annotations.Nullable; import java.lang.reflect.Array; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.*; @@ -63,19 +64,19 @@ * observable.onNext(1); * // this will "clear" the cache * observable.onNext(EMPTY); - * + * * TestObserver<Integer> to2 = observable.test(); - * + * * subject.onNext(2); * subject.onComplete(); - * + * * // to1 received both non-empty items * to1.assertResult(1, 2); - * + * * // to2 received only 2 even though the current item was EMPTY * // when it got subscribed * to2.assertResult(2); - * + * * // Observers coming after the subject was terminated receive * // no items and only the onComplete event in this case. * observable.test().assertResult(); @@ -300,6 +301,7 @@ public boolean hasObservers() { } @Override + @Nullable public Throwable getThrowable() { Object o = value.get(); if (NotificationLite.isError(o)) { @@ -313,6 +315,7 @@ public Throwable getThrowable() { *
The method is thread-safe. * @return a single value the Subject currently has or null if no such value exists */ + @Nullable public T getValue() { Object o = value.get(); if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java index 69e25be91d..93c626baa0 100644 --- a/src/main/java/io/reactivex/subjects/CompletableSubject.java +++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.Nullable; import java.util.concurrent.atomic.*; import io.reactivex.*; @@ -65,12 +66,12 @@ * Example usage: *
* CompletableSubject subject = CompletableSubject.create();
- *
+ *
* TestObserver<Void> to1 = subject.test();
*
* // a fresh CompletableSubject is empty
* to1.assertEmpty();
- *
+ *
* subject.onComplete();
*
* // a CompletableSubject is always void of items
@@ -213,6 +214,7 @@ void remove(CompletableDisposable inner) {
* Returns the terminal error if this CompletableSubject has been terminated with an error, null otherwise.
* @return the terminal error or null if not terminated or not with an error
*/
+ @Nullable
public Throwable getThrowable() {
if (observers.get() == TERMINATED) {
return error;
diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java
index 629e63312b..1fec546d7c 100644
--- a/src/main/java/io/reactivex/subjects/MaybeSubject.java
+++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java
@@ -72,20 +72,20 @@
* Example usage:
*
* MaybeSubject<Integer> subject1 = MaybeSubject.create();
- *
+ *
* TestObserver<Integer> to1 = subject1.test();
- *
+ *
* // MaybeSubjects are empty by default
* to1.assertEmpty();
- *
+ *
* subject1.onSuccess(1);
- *
+ *
* // onSuccess is a terminal event with MaybeSubjects
* // TestObserver converts onSuccess into onNext + onComplete
* to1.assertResult(1);
*
* TestObserver<Integer> to2 = subject1.test();
- *
+ *
* // late Observers receive the terminal signal (onSuccess) too
* to2.assertResult(1);
*
@@ -94,14 +94,14 @@
* MaybeSubject<Integer> subject2 = MaybeSubject.create();
*
* TestObserver<Integer> to3 = subject2.test();
- *
+ *
* subject2.onComplete();
- *
+ *
* // a completed MaybeSubject completes its MaybeObservers
* to3.assertResult();
*
* TestObserver<Integer> to4 = subject1.test();
- *
+ *
* // late Observers receive the terminal signal (onComplete) too
* to4.assertResult();
*
@@ -263,6 +263,7 @@ void remove(MaybeDisposable inner) {
* Returns the success value if this MaybeSubject was terminated with a success value.
* @return the success value or null
*/
+ @Nullable
public T getValue() {
if (observers.get() == TERMINATED) {
return value;
@@ -282,6 +283,7 @@ public boolean hasValue() {
* Returns the terminal error if this MaybeSubject has been terminated with an error, null otherwise.
* @return the terminal error or null if not terminated or not with an error
*/
+ @Nullable
public Throwable getThrowable() {
if (observers.get() == TERMINATED) {
return error;
diff --git a/src/main/java/io/reactivex/subjects/PublishSubject.java b/src/main/java/io/reactivex/subjects/PublishSubject.java
index 6a13ffdd2a..7d22a81d2d 100644
--- a/src/main/java/io/reactivex/subjects/PublishSubject.java
+++ b/src/main/java/io/reactivex/subjects/PublishSubject.java
@@ -14,6 +14,7 @@
package io.reactivex.subjects;
import io.reactivex.annotations.CheckReturnValue;
+import io.reactivex.annotations.Nullable;
import java.util.concurrent.atomic.*;
import io.reactivex.Observer;
@@ -263,6 +264,7 @@ public boolean hasObservers() {
}
@Override
+ @Nullable
public Throwable getThrowable() {
if (subscribers.get() == TERMINATED) {
return error;
diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java
index fe89540733..29634ff1c4 100644
--- a/src/main/java/io/reactivex/subjects/ReplaySubject.java
+++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java
@@ -13,6 +13,7 @@
package io.reactivex.subjects;
+import io.reactivex.annotations.Nullable;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -395,6 +396,7 @@ public boolean hasObservers() {
}
@Override
+ @Nullable
public Throwable getThrowable() {
Object o = buffer.get();
if (NotificationLite.isError(o)) {
@@ -408,6 +410,7 @@ public Throwable getThrowable() {
* The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
*/
+ @Nullable
public T getValue() {
return buffer.getValue();
}
@@ -542,6 +545,7 @@ interface ReplayBuffer {
int size();
+ @Nullable
T getValue();
T[] getValues(T[] array);
@@ -620,6 +624,7 @@ public void addFinal(Object notificationLite) {
}
@Override
+ @Nullable
@SuppressWarnings("unchecked")
public T getValue() {
int s = size;
@@ -838,6 +843,7 @@ public void addFinal(Object notificationLite) {
}
@Override
+ @Nullable
@SuppressWarnings("unchecked")
public T getValue() {
Node