From 3d75f3524243c9aeda022a3c7caaf14b49ec6e7f Mon Sep 17 00:00:00 2001 From: Aaron He Date: Wed, 21 Mar 2018 22:51:40 -0700 Subject: [PATCH] Add @Nullable annotations to Processors --- src/main/java/io/reactivex/processors/AsyncProcessor.java | 2 ++ src/main/java/io/reactivex/processors/BehaviorProcessor.java | 2 ++ src/main/java/io/reactivex/processors/PublishProcessor.java | 1 + src/main/java/io/reactivex/processors/ReplayProcessor.java | 4 ++++ .../java/io/reactivex/processors/SerializedProcessor.java | 2 ++ src/main/java/io/reactivex/processors/UnicastProcessor.java | 1 + 6 files changed, 12 insertions(+) diff --git a/src/main/java/io/reactivex/processors/AsyncProcessor.java b/src/main/java/io/reactivex/processors/AsyncProcessor.java index 57ad766e5a..b03bef985d 100644 --- a/src/main/java/io/reactivex/processors/AsyncProcessor.java +++ b/src/main/java/io/reactivex/processors/AsyncProcessor.java @@ -138,6 +138,7 @@ public boolean hasComplete() { } @Override + @Nullable public Throwable getThrowable() { return subscribers.get() == TERMINATED ? error : null; } @@ -244,6 +245,7 @@ public boolean hasValue() { *

The method is thread-safe. * @return a single value the Subject currently has or null if no such value exists */ + @Nullable public T getValue() { return subscribers.get() == TERMINATED ? value : null; } diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index bc0d416160..91aa77b942 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -348,6 +348,7 @@ public boolean hasSubscribers() { } @Override + @Nullable public Throwable getThrowable() { Object o = value.get(); if (NotificationLite.isError(o)) { @@ -361,6 +362,7 @@ public Throwable getThrowable() { *

The method is thread-safe. * @return a single value the BehaviorProcessor currently has or null if no such value exists */ + @Nullable public T getValue() { Object o = value.get(); if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { diff --git a/src/main/java/io/reactivex/processors/PublishProcessor.java b/src/main/java/io/reactivex/processors/PublishProcessor.java index 92af846040..d299bffea2 100644 --- a/src/main/java/io/reactivex/processors/PublishProcessor.java +++ b/src/main/java/io/reactivex/processors/PublishProcessor.java @@ -261,6 +261,7 @@ public boolean hasSubscribers() { } @Override + @Nullable public Throwable getThrowable() { if (subscribers.get() == TERMINATED) { return error; diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 069e6628fd..839170fa27 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -354,6 +354,7 @@ public boolean hasSubscribers() { } @Override + @Nullable public Throwable getThrowable() { ReplayBuffer b = buffer; if (b.isDone()) { @@ -510,6 +511,7 @@ interface ReplayBuffer { int size(); + @Nullable T getValue(); T[] getValues(T[] array); @@ -598,6 +600,7 @@ public void trimHead() { } @Override + @Nullable public T getValue() { int s = size; if (s == 0) { @@ -1091,6 +1094,7 @@ public void complete() { } @Override + @Nullable public T getValue() { TimedNode h = head; diff --git a/src/main/java/io/reactivex/processors/SerializedProcessor.java b/src/main/java/io/reactivex/processors/SerializedProcessor.java index 7a755500b9..8c0a37e02a 100644 --- a/src/main/java/io/reactivex/processors/SerializedProcessor.java +++ b/src/main/java/io/reactivex/processors/SerializedProcessor.java @@ -13,6 +13,7 @@ package io.reactivex.processors; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.internal.util.*; @@ -187,6 +188,7 @@ public boolean hasThrowable() { } @Override + @Nullable public Throwable getThrowable() { return actual.getThrowable(); } diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index e51dcc4dcc..847060feb8 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -457,6 +457,7 @@ public boolean hasSubscribers() { } @Override + @Nullable public Throwable getThrowable() { if (done) { return error;