Skip to content

Commit a07c45e

Browse files
authored
3.x: Cleanup addThrowable, "2.x" and null-value error messages (#6639)
1 parent f1441b0 commit a07c45e

File tree

106 files changed

+553
-616
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+553
-616
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableCreate.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.rxjava3.exceptions.Exceptions;
2121
import io.reactivex.rxjava3.functions.Cancellable;
2222
import io.reactivex.rxjava3.internal.disposables.*;
23+
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
2324
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2425

2526
public final class CompletableCreate extends Completable {
@@ -81,7 +82,7 @@ public void onError(Throwable t) {
8182
@Override
8283
public boolean tryOnError(Throwable t) {
8384
if (t == null) {
84-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
85+
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
8586
}
8687
if (get() != DisposableHelper.DISPOSED) {
8788
Disposable d = getAndSet(DisposableHelper.DISPOSED);

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMerge.java

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
2323
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
2424
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
25-
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2625

2726
public final class CompletableMerge extends Completable {
2827
final Publisher<? extends CompletableSource> source;
@@ -51,7 +50,7 @@ static final class CompletableMergeSubscriber
5150
final int maxConcurrency;
5251
final boolean delayErrors;
5352

54-
final AtomicThrowable error;
53+
final AtomicThrowable errors;
5554

5655
final CompositeDisposable set;
5756

@@ -62,15 +61,15 @@ static final class CompletableMergeSubscriber
6261
this.maxConcurrency = maxConcurrency;
6362
this.delayErrors = delayErrors;
6463
this.set = new CompositeDisposable();
65-
this.error = new AtomicThrowable();
64+
this.errors = new AtomicThrowable();
6665
lazySet(1);
6766
}
6867

6968
@Override
7069
public void dispose() {
7170
upstream.cancel();
7271
set.dispose();
73-
error.tryTerminateAndReport();
72+
errors.tryTerminateAndReport();
7473
}
7574

7675
@Override
@@ -105,28 +104,24 @@ public void onError(Throwable t) {
105104
if (!delayErrors) {
106105
set.dispose();
107106

108-
if (error.addThrowable(t)) {
107+
if (errors.tryAddThrowableOrReport(t)) {
109108
if (getAndSet(0) > 0) {
110-
error.tryTerminateConsumer(downstream);
109+
errors.tryTerminateConsumer(downstream);
111110
}
112-
} else {
113-
RxJavaPlugins.onError(t);
114111
}
115112
} else {
116-
if (error.addThrowable(t)) {
113+
if (errors.tryAddThrowableOrReport(t)) {
117114
if (decrementAndGet() == 0) {
118-
error.tryTerminateConsumer(downstream);
115+
errors.tryTerminateConsumer(downstream);
119116
}
120-
} else {
121-
RxJavaPlugins.onError(t);
122117
}
123118
}
124119
}
125120

126121
@Override
127122
public void onComplete() {
128123
if (decrementAndGet() == 0) {
129-
error.tryTerminateConsumer(downstream);
124+
errors.tryTerminateConsumer(downstream);
130125
}
131126
}
132127

@@ -136,32 +131,28 @@ void innerError(MergeInnerObserver inner, Throwable t) {
136131
upstream.cancel();
137132
set.dispose();
138133

139-
if (error.addThrowable(t)) {
134+
if (errors.tryAddThrowableOrReport(t)) {
140135
if (getAndSet(0) > 0) {
141-
error.tryTerminateConsumer(downstream);
136+
errors.tryTerminateConsumer(downstream);
142137
}
143-
} else {
144-
RxJavaPlugins.onError(t);
145138
}
146139
} else {
147-
if (error.addThrowable(t)) {
140+
if (errors.tryAddThrowableOrReport(t)) {
148141
if (decrementAndGet() == 0) {
149-
error.tryTerminateConsumer(downstream);
142+
errors.tryTerminateConsumer(downstream);
150143
} else {
151144
if (maxConcurrency != Integer.MAX_VALUE) {
152145
upstream.request(1);
153146
}
154147
}
155-
} else {
156-
RxJavaPlugins.onError(t);
157148
}
158149
}
159150
}
160151

161152
void innerComplete(MergeInnerObserver inner) {
162153
set.delete(inner);
163154
if (decrementAndGet() == 0) {
164-
error.tryTerminateConsumer(downstream);
155+
errors.tryTerminateConsumer(downstream);
165156
} else {
166157
if (maxConcurrency != Integer.MAX_VALUE) {
167158
upstream.request(1);

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeDelayErrorArray.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.reactivex.rxjava3.core.*;
1919
import io.reactivex.rxjava3.disposables.*;
2020
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
21-
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2221

2322
public final class CompletableMergeDelayErrorArray extends Completable {
2423

@@ -33,8 +32,8 @@ public void subscribeActual(final CompletableObserver observer) {
3332
final CompositeDisposable set = new CompositeDisposable();
3433
final AtomicInteger wip = new AtomicInteger(sources.length + 1);
3534

36-
final AtomicThrowable error = new AtomicThrowable();
37-
set.add(new TryTerminateAndReportDisposable(error));
35+
final AtomicThrowable errors = new AtomicThrowable();
36+
set.add(new TryTerminateAndReportDisposable(errors));
3837

3938
observer.onSubscribe(set);
4039

@@ -45,16 +44,16 @@ public void subscribeActual(final CompletableObserver observer) {
4544

4645
if (c == null) {
4746
Throwable ex = new NullPointerException("A completable source is null");
48-
error.addThrowable(ex);
47+
errors.tryAddThrowableOrReport(ex);
4948
wip.decrementAndGet();
5049
continue;
5150
}
5251

53-
c.subscribe(new MergeInnerCompletableObserver(observer, set, error, wip));
52+
c.subscribe(new MergeInnerCompletableObserver(observer, set, errors, wip));
5453
}
5554

5655
if (wip.decrementAndGet() == 0) {
57-
error.tryTerminateConsumer(observer);
56+
errors.tryTerminateConsumer(observer);
5857
}
5958
}
6059

@@ -79,14 +78,14 @@ static final class MergeInnerCompletableObserver
7978
implements CompletableObserver {
8079
final CompletableObserver downstream;
8180
final CompositeDisposable set;
82-
final AtomicThrowable error;
81+
final AtomicThrowable errors;
8382
final AtomicInteger wip;
8483

8584
MergeInnerCompletableObserver(CompletableObserver observer, CompositeDisposable set, AtomicThrowable error,
8685
AtomicInteger wip) {
8786
this.downstream = observer;
8887
this.set = set;
89-
this.error = error;
88+
this.errors = error;
9089
this.wip = wip;
9190
}
9291

@@ -97,10 +96,8 @@ public void onSubscribe(Disposable d) {
9796

9897
@Override
9998
public void onError(Throwable e) {
100-
if (error.addThrowable(e)) {
99+
if (errors.tryAddThrowableOrReport(e)) {
101100
tryTerminate();
102-
} else {
103-
RxJavaPlugins.onError(e);
104101
}
105102
}
106103

@@ -111,7 +108,7 @@ public void onComplete() {
111108

112109
void tryTerminate() {
113110
if (wip.decrementAndGet() == 0) {
114-
error.tryTerminateConsumer(downstream);
111+
errors.tryTerminateConsumer(downstream);
115112
}
116113
}
117114
}

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeDelayErrorIterable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public void subscribeActual(final CompletableObserver observer) {
4949

5050
final AtomicInteger wip = new AtomicInteger(1);
5151

52-
final AtomicThrowable error = new AtomicThrowable();
53-
set.add(new TryTerminateAndReportDisposable(error));
52+
final AtomicThrowable errors = new AtomicThrowable();
53+
set.add(new TryTerminateAndReportDisposable(errors));
5454

5555
for (;;) {
5656
if (set.isDisposed()) {
@@ -62,7 +62,7 @@ public void subscribeActual(final CompletableObserver observer) {
6262
b = iterator.hasNext();
6363
} catch (Throwable e) {
6464
Exceptions.throwIfFatal(e);
65-
error.addThrowable(e);
65+
errors.tryAddThrowableOrReport(e);
6666
break;
6767
}
6868

@@ -80,7 +80,7 @@ public void subscribeActual(final CompletableObserver observer) {
8080
c = ObjectHelper.requireNonNull(iterator.next(), "The iterator returned a null CompletableSource");
8181
} catch (Throwable e) {
8282
Exceptions.throwIfFatal(e);
83-
error.addThrowable(e);
83+
errors.tryAddThrowableOrReport(e);
8484
break;
8585
}
8686

@@ -90,11 +90,11 @@ public void subscribeActual(final CompletableObserver observer) {
9090

9191
wip.getAndIncrement();
9292

93-
c.subscribe(new MergeInnerCompletableObserver(observer, set, error, wip));
93+
c.subscribe(new MergeInnerCompletableObserver(observer, set, errors, wip));
9494
}
9595

9696
if (wip.decrementAndGet() == 0) {
97-
error.tryTerminateConsumer(observer);
97+
errors.tryTerminateConsumer(observer);
9898
}
9999
}
100100
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferBoundary.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,13 @@ public void onNext(T t) {
130130

131131
@Override
132132
public void onError(Throwable t) {
133-
if (errors.addThrowable(t)) {
133+
if (errors.tryAddThrowableOrReport(t)) {
134134
subscribers.dispose();
135135
synchronized (this) {
136136
buffers = null;
137137
}
138138
done = true;
139139
drain();
140-
} else {
141-
RxJavaPlugins.onError(t);
142140
}
143141
}
144142

@@ -264,8 +262,7 @@ void drain() {
264262
boolean d = done;
265263
if (d && errors.get() != null) {
266264
q.clear();
267-
Throwable ex = errors.terminate();
268-
a.onError(ex);
265+
errors.tryTerminateConsumer(a);
269266
return;
270267
}
271268

@@ -294,8 +291,7 @@ void drain() {
294291
if (done) {
295292
if (errors.get() != null) {
296293
q.clear();
297-
Throwable ex = errors.terminate();
298-
a.onError(ex);
294+
errors.tryTerminateConsumer(a);
299295
return;
300296
} else if (q.isEmpty()) {
301297
a.onComplete();

0 commit comments

Comments
 (0)