Skip to content

Commit 0ea2c95

Browse files
authored
2.x: Improve coverage and fix small mistakes/untaken paths in operators (#5883)
1 parent f6f6d82 commit 0ea2c95

File tree

46 files changed

+2713
-178
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2713
-178
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>
888888
public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
889889
ObjectHelper.requireNonNull(sources, "source is null");
890890
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
891-
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize()));
891+
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, 1));
892892
}
893893

894894
/**
@@ -1222,12 +1222,11 @@ public static <T> Flowable<T> mergeDelayError(Iterable<? extends MaybeSource<? e
12221222
* {@code source} Publisher
12231223
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
12241224
*/
1225-
@SuppressWarnings({ "unchecked", "rawtypes" })
12261225
@BackpressureSupport(BackpressureKind.FULL)
12271226
@CheckReturnValue
12281227
@SchedulerSupport(SchedulerSupport.NONE)
12291228
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources) {
1230-
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true);
1229+
return mergeDelayError(sources, Integer.MAX_VALUE);
12311230
}
12321231

12331232

@@ -1267,7 +1266,9 @@ public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<?
12671266
@SchedulerSupport(SchedulerSupport.NONE)
12681267
@Experimental
12691268
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
1270-
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true, maxConcurrency);
1269+
ObjectHelper.requireNonNull(sources, "source is null");
1270+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
1271+
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), true, maxConcurrency, 1));
12711272
}
12721273

12731274
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,10 @@ boolean addInner(InnerSubscriber<T, U> inner) {
185185
void removeInner(InnerSubscriber<T, U> inner) {
186186
for (;;) {
187187
InnerSubscriber<?, ?>[] a = subscribers.get();
188-
if (a == CANCELLED || a == EMPTY) {
188+
int n = a.length;
189+
if (n == 0) {
189190
return;
190191
}
191-
int n = a.length;
192192
int j = -1;
193193
for (int i = 0; i < n; i++) {
194194
if (a[i] == inner) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.functions.BiFunction;
2222
import io.reactivex.internal.functions.ObjectHelper;
2323
import io.reactivex.internal.subscriptions.SubscriptionHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
/**
2627
* Reduce a sequence of values, starting from a seed value and by using
@@ -78,28 +79,36 @@ public void onSubscribe(Subscription s) {
7879
@Override
7980
public void onNext(T value) {
8081
R v = this.value;
81-
try {
82-
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
83-
} catch (Throwable ex) {
84-
Exceptions.throwIfFatal(ex);
85-
s.cancel();
86-
onError(ex);
82+
if (v != null) {
83+
try {
84+
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
85+
} catch (Throwable ex) {
86+
Exceptions.throwIfFatal(ex);
87+
s.cancel();
88+
onError(ex);
89+
}
8790
}
8891
}
8992

9093
@Override
9194
public void onError(Throwable e) {
92-
value = null;
93-
s = SubscriptionHelper.CANCELLED;
94-
actual.onError(e);
95+
if (value != null) {
96+
value = null;
97+
s = SubscriptionHelper.CANCELLED;
98+
actual.onError(e);
99+
} else {
100+
RxJavaPlugins.onError(e);
101+
}
95102
}
96103

97104
@Override
98105
public void onComplete() {
99106
R v = value;
100-
value = null;
101-
s = SubscriptionHelper.CANCELLED;
102-
actual.onSuccess(v);
107+
if (v != null) {
108+
value = null;
109+
s = SubscriptionHelper.CANCELLED;
110+
actual.onSuccess(v);
111+
}
103112
}
104113

105114
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public final class FlowableReplay<T> extends ConnectableFlowable<T> implements H
5757
public static <U, R> Flowable<R> multicastSelector(
5858
final Callable<? extends ConnectableFlowable<U>> connectableFactory,
5959
final Function<? super Flowable<U>, ? extends Publisher<R>> selector) {
60-
return Flowable.unsafeCreate(new MultiCastPublisher<R, U>(connectableFactory, selector));
60+
return new MulticastFlowable<R, U>(connectableFactory, selector);
6161
}
6262

6363
/**
@@ -1100,17 +1100,17 @@ Node getHead() {
11001100
}
11011101
}
11021102

1103-
static final class MultiCastPublisher<R, U> implements Publisher<R> {
1103+
static final class MulticastFlowable<R, U> extends Flowable<R> {
11041104
private final Callable<? extends ConnectableFlowable<U>> connectableFactory;
11051105
private final Function<? super Flowable<U>, ? extends Publisher<R>> selector;
11061106

1107-
MultiCastPublisher(Callable<? extends ConnectableFlowable<U>> connectableFactory, Function<? super Flowable<U>, ? extends Publisher<R>> selector) {
1107+
MulticastFlowable(Callable<? extends ConnectableFlowable<U>> connectableFactory, Function<? super Flowable<U>, ? extends Publisher<R>> selector) {
11081108
this.connectableFactory = connectableFactory;
11091109
this.selector = selector;
11101110
}
11111111

11121112
@Override
1113-
public void subscribe(Subscriber<? super R> child) {
1113+
protected void subscribeActual(Subscriber<? super R> child) {
11141114
ConnectableFlowable<U> co;
11151115
try {
11161116
co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned null");

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public void onSubscribe(Subscription s) {
9393
produced(1);
9494
}
9595
} else {
96+
s.cancel();
9697
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests"));
9798
return;
9899
}
@@ -254,11 +255,6 @@ void next() {
254255
}
255256
}
256257

257-
@Override
258-
public boolean accept(Subscriber<? super Flowable<T>> a, Object v) {
259-
// not used by this operator
260-
return false;
261-
}
262258
}
263259

264260
static final class WindowBoundaryInnerSubscriber<T, B> extends DisposableSubscriber<B> {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -323,36 +323,22 @@ static final class WindowOperation<T, B> {
323323
static final class OperatorWindowBoundaryOpenSubscriber<T, B> extends DisposableSubscriber<B> {
324324
final WindowBoundaryMainSubscriber<T, B, ?> parent;
325325

326-
boolean done;
327-
328326
OperatorWindowBoundaryOpenSubscriber(WindowBoundaryMainSubscriber<T, B, ?> parent) {
329327
this.parent = parent;
330328
}
331329

332330
@Override
333331
public void onNext(B t) {
334-
if (done) {
335-
return;
336-
}
337332
parent.open(t);
338333
}
339334

340335
@Override
341336
public void onError(Throwable t) {
342-
if (done) {
343-
RxJavaPlugins.onError(t);
344-
return;
345-
}
346-
done = true;
347337
parent.error(t);
348338
}
349339

350340
@Override
351341
public void onComplete() {
352-
if (done) {
353-
return;
354-
}
355-
done = true;
356342
parent.onComplete();
357343
}
358344
}
@@ -370,12 +356,8 @@ static final class OperatorWindowBoundaryCloseSubscriber<T, V> extends Disposabl
370356

371357
@Override
372358
public void onNext(V t) {
373-
if (done) {
374-
return;
375-
}
376-
done = true;
377359
cancel();
378-
parent.close(this);
360+
onComplete();
379361
}
380362

381363
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ public void onComplete() {
326326
}
327327
done = true;
328328
parent.onComplete();
329-
// parent.next();
330329
}
331330
}
332331
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,7 @@ public void onNext(T t) {
390390
tm.dispose();
391391
Disposable task = worker.schedulePeriodically(
392392
new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit);
393-
if (!timer.compareAndSet(tm, task)) {
394-
task.dispose();
395-
}
393+
timer.replace(task);
396394
}
397395
} else {
398396
window = null;
@@ -549,9 +547,7 @@ void drainLoop() {
549547

550548
Disposable task = worker.schedulePeriodically(
551549
new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit);
552-
if (!timer.compareAndSet(tm, task)) {
553-
task.dispose();
554-
}
550+
timer.replace(task);
555551
}
556552

557553
} else {

src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -199,24 +199,6 @@ public Object apply(Object t) throws Exception {
199199
}
200200
}
201201

202-
static final class RepeatWhenOuterHandler
203-
implements Function<Observable<Notification<Object>>, ObservableSource<?>> {
204-
private final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler;
205-
206-
RepeatWhenOuterHandler(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
207-
this.handler = handler;
208-
}
209-
210-
@Override
211-
public ObservableSource<?> apply(Observable<Notification<Object>> no) throws Exception {
212-
return handler.apply(no.map(MapToInt.INSTANCE));
213-
}
214-
}
215-
216-
public static Function<Observable<Notification<Object>>, ObservableSource<?>> repeatWhenHandler(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
217-
return new RepeatWhenOuterHandler(handler);
218-
}
219-
220202
public static <T> Callable<ConnectableObservable<T>> replayCallable(final Observable<T> parent) {
221203
return new ReplayCallable<T>(parent);
222204
}
@@ -237,42 +219,6 @@ public static <T, R> Function<Observable<T>, ObservableSource<R>> replayFunction
237219
return new ReplayFunction<T, R>(selector, scheduler);
238220
}
239221

240-
enum ErrorMapperFilter implements Function<Notification<Object>, Throwable>, Predicate<Notification<Object>> {
241-
INSTANCE;
242-
243-
@Override
244-
public Throwable apply(Notification<Object> t) throws Exception {
245-
return t.getError();
246-
}
247-
248-
@Override
249-
public boolean test(Notification<Object> t) throws Exception {
250-
return t.isOnError();
251-
}
252-
}
253-
254-
static final class RetryWhenInner
255-
implements Function<Observable<Notification<Object>>, ObservableSource<?>> {
256-
private final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler;
257-
258-
RetryWhenInner(
259-
Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
260-
this.handler = handler;
261-
}
262-
263-
@Override
264-
public ObservableSource<?> apply(Observable<Notification<Object>> no) throws Exception {
265-
Observable<Throwable> map = no
266-
.takeWhile(ErrorMapperFilter.INSTANCE)
267-
.map(ErrorMapperFilter.INSTANCE);
268-
return handler.apply(map);
269-
}
270-
}
271-
272-
public static <T> Function<Observable<Notification<Object>>, ObservableSource<?>> retryWhenHandler(final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
273-
return new RetryWhenInner(handler);
274-
}
275-
276222
static final class ZipIterableFunction<T, R>
277223
implements Function<List<ObservableSource<? extends T>>, ObservableSource<? extends R>> {
278224
private final Function<? super Object[], ? extends R> zipper;

src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ public void onNext(T value) {
8989
@Override
9090
public void onError(Throwable e) {
9191
R v = value;
92-
value = null;
9392
if (v != null) {
93+
value = null;
9494
actual.onError(e);
9595
} else {
9696
RxJavaPlugins.onError(e);
@@ -100,8 +100,8 @@ public void onError(Throwable e) {
100100
@Override
101101
public void onComplete() {
102102
R v = value;
103-
value = null;
104103
if (v != null) {
104+
value = null;
105105
actual.onSuccess(v);
106106
}
107107
}

0 commit comments

Comments
 (0)