Skip to content

Commit 2e0d3b9

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add @CheckReturnValue to create methods of Subjects + Processors (#4971)
1 parent d173b6d commit 2e0d3b9

10 files changed

+34
-0
lines changed

src/main/java/io/reactivex/processors/AsyncProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.reactivex.processors;
1414

15+
import io.reactivex.annotations.CheckReturnValue;
1516
import java.util.Arrays;
1617
import java.util.concurrent.atomic.AtomicReference;
1718

@@ -49,6 +50,7 @@ public final class AsyncProcessor<T> extends FlowableProcessor<T> {
4950
* @param <T> the value type to be received and emitted
5051
* @return the new AsyncProcessor instance
5152
*/
53+
@CheckReturnValue
5254
public static <T> AsyncProcessor<T> create() {
5355
return new AsyncProcessor<T>();
5456
}

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.processors;
1515

16+
import io.reactivex.annotations.CheckReturnValue;
1617
import java.lang.reflect.Array;
1718
import java.util.concurrent.atomic.*;
1819
import java.util.concurrent.locks.*;
@@ -97,6 +98,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
9798
* the type of item the Subject will emit
9899
* @return the constructed {@link BehaviorProcessor}
99100
*/
101+
@CheckReturnValue
100102
public static <T> BehaviorProcessor<T> create() {
101103
return new BehaviorProcessor<T>();
102104
}
@@ -112,6 +114,7 @@ public static <T> BehaviorProcessor<T> create() {
112114
* {@link BehaviorProcessor} has not yet observed any items from its source {@code Observable}
113115
* @return the constructed {@link BehaviorProcessor}
114116
*/
117+
@CheckReturnValue
115118
public static <T> BehaviorProcessor<T> createDefault(T defaultValue) {
116119
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
117120
return new BehaviorProcessor<T>(defaultValue);

src/main/java/io/reactivex/processors/PublishProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.reactivex.processors;
1414

15+
import io.reactivex.annotations.CheckReturnValue;
1516
import java.util.concurrent.atomic.*;
1617

1718
import org.reactivestreams.*;
@@ -74,6 +75,7 @@ public final class PublishProcessor<T> extends FlowableProcessor<T> {
7475
* @param <T> the value type
7576
* @return the new PublishProcessor
7677
*/
78+
@CheckReturnValue
7779
public static <T> PublishProcessor<T> create() {
7880
return new PublishProcessor<T>();
7981
}

src/main/java/io/reactivex/processors/ReplayProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.processors;
1515

16+
import io.reactivex.annotations.CheckReturnValue;
1617
import java.lang.reflect.Array;
1718
import java.util.*;
1819
import java.util.concurrent.TimeUnit;
@@ -89,6 +90,7 @@ public final class ReplayProcessor<T> extends FlowableProcessor<T> {
8990
* the type of items observed and emitted by the ReplayProcessor
9091
* @return the created ReplayProcessor
9192
*/
93+
@CheckReturnValue
9294
public static <T> ReplayProcessor<T> create() {
9395
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(16));
9496
}
@@ -108,6 +110,7 @@ public static <T> ReplayProcessor<T> create() {
108110
* the initial buffer capacity
109111
* @return the created subject
110112
*/
113+
@CheckReturnValue
111114
public static <T> ReplayProcessor<T> create(int capacityHint) {
112115
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(capacityHint));
113116
}
@@ -132,6 +135,7 @@ public static <T> ReplayProcessor<T> create(int capacityHint) {
132135
* the maximum number of buffered items
133136
* @return the created subject
134137
*/
138+
@CheckReturnValue
135139
public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
136140
return new ReplayProcessor<T>(new SizeBoundReplayBuffer<T>(maxSize));
137141
}
@@ -185,6 +189,7 @@ public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
185189
* the {@link Scheduler} that provides the current time
186190
* @return the created subject
187191
*/
192+
@CheckReturnValue
188193
public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
189194
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
190195
}
@@ -223,6 +228,7 @@ public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit,
223228
* the {@link Scheduler} that provides the current time
224229
* @return the created subject
225230
*/
231+
@CheckReturnValue
226232
public static <T> ReplayProcessor<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
227233
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
228234
}

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.processors;
1515

16+
import io.reactivex.annotations.CheckReturnValue;
1617
import java.util.concurrent.atomic.*;
1718

1819
import org.reactivestreams.*;
@@ -65,6 +66,7 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {
6566
* @param <T> the value type
6667
* @return an UnicastSubject instance
6768
*/
69+
@CheckReturnValue
6870
public static <T> UnicastProcessor<T> create() {
6971
return new UnicastProcessor<T>(bufferSize());
7072
}
@@ -75,6 +77,7 @@ public static <T> UnicastProcessor<T> create() {
7577
* @param capacityHint the hint to size the internal unbounded buffer
7678
* @return an UnicastProcessor instance
7779
*/
80+
@CheckReturnValue
7881
public static <T> UnicastProcessor<T> create(int capacityHint) {
7982
return new UnicastProcessor<T>(capacityHint);
8083
}
@@ -91,6 +94,7 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
9194
* @param onCancelled the non null callback
9295
* @return an UnicastProcessor instance
9396
*/
97+
@CheckReturnValue
9498
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
9599
return new UnicastProcessor<T>(capacityHint, onCancelled);
96100
}

src/main/java/io/reactivex/subjects/AsyncSubject.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.subjects;
1515

16+
import io.reactivex.annotations.CheckReturnValue;
1617
import java.util.Arrays;
1718
import java.util.concurrent.atomic.AtomicReference;
1819

@@ -51,6 +52,7 @@ public final class AsyncSubject<T> extends Subject<T> {
5152
* @param <T> the value type to be received and emitted
5253
* @return the new AsyncProcessor instance
5354
*/
55+
@CheckReturnValue
5456
public static <T> AsyncSubject<T> create() {
5557
return new AsyncSubject<T>();
5658
}

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.subjects;
1515

16+
import io.reactivex.annotations.CheckReturnValue;
1617
import java.lang.reflect.Array;
1718
import java.util.concurrent.atomic.AtomicReference;
1819
import java.util.concurrent.locks.*;
@@ -96,6 +97,7 @@ public final class BehaviorSubject<T> extends Subject<T> {
9697
* the type of item the Subject will emit
9798
* @return the constructed {@link BehaviorSubject}
9899
*/
100+
@CheckReturnValue
99101
public static <T> BehaviorSubject<T> create() {
100102
return new BehaviorSubject<T>();
101103
}
@@ -111,6 +113,7 @@ public static <T> BehaviorSubject<T> create() {
111113
* {@link BehaviorSubject} has not yet observed any items from its source {@code Observable}
112114
* @return the constructed {@link BehaviorSubject}
113115
*/
116+
@CheckReturnValue
114117
public static <T> BehaviorSubject<T> createDefault(T defaultValue) {
115118
return new BehaviorSubject<T>(defaultValue);
116119
}

src/main/java/io/reactivex/subjects/PublishSubject.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.subjects;
1515

16+
import io.reactivex.annotations.CheckReturnValue;
1617
import java.util.concurrent.atomic.*;
1718

1819
import io.reactivex.Observer;
@@ -63,6 +64,7 @@ public final class PublishSubject<T> extends Subject<T> {
6364
* @param <T> the value type
6465
* @return the new PublishSubject
6566
*/
67+
@CheckReturnValue
6668
public static <T> PublishSubject<T> create() {
6769
return new PublishSubject<T>();
6870
}

src/main/java/io/reactivex/subjects/ReplaySubject.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.reactivex.Observer;
2222
import io.reactivex.Scheduler;
23+
import io.reactivex.annotations.CheckReturnValue;
2324
import io.reactivex.disposables.Disposable;
2425
import io.reactivex.internal.functions.ObjectHelper;
2526
import io.reactivex.internal.util.NotificationLite;
@@ -74,6 +75,7 @@ public final class ReplaySubject<T> extends Subject<T> {
7475
* the type of items observed and emitted by the Subject
7576
* @return the created subject
7677
*/
78+
@CheckReturnValue
7779
public static <T> ReplaySubject<T> create() {
7880
return new ReplaySubject<T>(new UnboundedReplayBuffer<T>(16));
7981
}
@@ -93,6 +95,7 @@ public static <T> ReplaySubject<T> create() {
9395
* the initial buffer capacity
9496
* @return the created subject
9597
*/
98+
@CheckReturnValue
9699
public static <T> ReplaySubject<T> create(int capacityHint) {
97100
return new ReplaySubject<T>(new UnboundedReplayBuffer<T>(capacityHint));
98101
}
@@ -117,6 +120,7 @@ public static <T> ReplaySubject<T> create(int capacityHint) {
117120
* the maximum number of buffered items
118121
* @return the created subject
119122
*/
123+
@CheckReturnValue
120124
public static <T> ReplaySubject<T> createWithSize(int maxSize) {
121125
return new ReplaySubject<T>(new SizeBoundReplayBuffer<T>(maxSize));
122126
}
@@ -170,6 +174,7 @@ public static <T> ReplaySubject<T> createWithSize(int maxSize) {
170174
* the {@link Scheduler} that provides the current time
171175
* @return the created subject
172176
*/
177+
@CheckReturnValue
173178
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
174179
return new ReplaySubject<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
175180
}
@@ -208,6 +213,7 @@ public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Sc
208213
* the {@link Scheduler} that provides the current time
209214
* @return the created subject
210215
*/
216+
@CheckReturnValue
211217
public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
212218
return new ReplaySubject<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
213219
}

src/main/java/io/reactivex/subjects/UnicastSubject.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.*;
1818

1919
import io.reactivex.Observer;
20+
import io.reactivex.annotations.CheckReturnValue;
2021
import io.reactivex.disposables.Disposable;
2122
import io.reactivex.internal.disposables.EmptyDisposable;
2223
import io.reactivex.internal.functions.ObjectHelper;
@@ -73,6 +74,7 @@ public final class UnicastSubject<T> extends Subject<T> {
7374
* @param <T> the value type
7475
* @return an UnicastSubject instance
7576
*/
77+
@CheckReturnValue
7678
public static <T> UnicastSubject<T> create() {
7779
return new UnicastSubject<T>(bufferSize());
7880
}
@@ -83,6 +85,7 @@ public static <T> UnicastSubject<T> create() {
8385
* @param capacityHint the hint to size the internal unbounded buffer
8486
* @return an UnicastSubject instance
8587
*/
88+
@CheckReturnValue
8689
public static <T> UnicastSubject<T> create(int capacityHint) {
8790
return new UnicastSubject<T>(capacityHint);
8891
}
@@ -99,6 +102,7 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
99102
* @param onCancelled the non null callback
100103
* @return an UnicastSubject instance
101104
*/
105+
@CheckReturnValue
102106
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onCancelled) {
103107
return new UnicastSubject<T>(capacityHint, onCancelled);
104108
}

0 commit comments

Comments
 (0)