Skip to content

Commit 1b0cd2a

Browse files
authored
2.x: inline disposability in Obs.concatMap(Completable) (#5652)
1 parent 3abd86a commit 1b0cd2a

File tree

2 files changed

+33
-32
lines changed

2 files changed

+33
-32
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
package io.reactivex.internal.operators.observable;
1414

1515
import java.util.concurrent.Callable;
16-
import java.util.concurrent.atomic.AtomicInteger;
16+
import java.util.concurrent.atomic.*;
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
@@ -59,9 +59,8 @@ static final class SourceObserver<T, U> extends AtomicInteger implements Observe
5959

6060
private static final long serialVersionUID = 8828587559905699186L;
6161
final Observer<? super U> actual;
62-
final SequentialDisposable sa;
6362
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
64-
final Observer<U> inner;
63+
final InnerObserver<U> inner;
6564
final int bufferSize;
6665

6766
SimpleQueue<T> queue;
@@ -82,7 +81,6 @@ static final class SourceObserver<T, U> extends AtomicInteger implements Observe
8281
this.mapper = mapper;
8382
this.bufferSize = bufferSize;
8483
this.inner = new InnerObserver<U>(actual, this);
85-
this.sa = new SequentialDisposable();
8684
}
8785
@Override
8886
public void onSubscribe(Disposable s) {
@@ -161,18 +159,14 @@ public boolean isDisposed() {
161159
@Override
162160
public void dispose() {
163161
disposed = true;
164-
sa.dispose();
162+
inner.dispose();
165163
s.dispose();
166164

167165
if (getAndIncrement() == 0) {
168166
queue.clear();
169167
}
170168
}
171169

172-
void innerSubscribe(Disposable s) {
173-
sa.update(s);
174-
}
175-
176170
void drain() {
177171
if (getAndIncrement() != 0) {
178172
return;
@@ -231,7 +225,10 @@ void drain() {
231225
}
232226
}
233227

234-
static final class InnerObserver<U> implements Observer<U> {
228+
static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {
229+
230+
private static final long serialVersionUID = -7449079488798789337L;
231+
235232
final Observer<? super U> actual;
236233
final SourceObserver<?, ?> parent;
237234

@@ -242,7 +239,7 @@ static final class InnerObserver<U> implements Observer<U> {
242239

243240
@Override
244241
public void onSubscribe(Disposable s) {
245-
parent.innerSubscribe(s);
242+
DisposableHelper.set(this, s);
246243
}
247244

248245
@Override
@@ -258,6 +255,10 @@ public void onError(Throwable t) {
258255
public void onComplete() {
259256
parent.innerComplete();
260257
}
258+
259+
void dispose() {
260+
DisposableHelper.dispose(this);
261+
}
261262
}
262263
}
263264

@@ -278,8 +279,6 @@ static final class ConcatMapDelayErrorObserver<T, R>
278279

279280
final DelayErrorInnerObserver<R> observer;
280281

281-
final SequentialDisposable arbiter;
282-
283282
final boolean tillTheEnd;
284283

285284
SimpleQueue<T> queue;
@@ -303,7 +302,6 @@ static final class ConcatMapDelayErrorObserver<T, R>
303302
this.tillTheEnd = tillTheEnd;
304303
this.error = new AtomicThrowable();
305304
this.observer = new DelayErrorInnerObserver<R>(actual, this);
306-
this.arbiter = new SequentialDisposable();
307305
}
308306

309307
@Override
@@ -375,7 +373,7 @@ public boolean isDisposed() {
375373
public void dispose() {
376374
cancelled = true;
377375
d.dispose();
378-
arbiter.dispose();
376+
observer.dispose();
379377
}
380378

381379
@SuppressWarnings("unchecked")
@@ -479,7 +477,9 @@ void drain() {
479477
}
480478
}
481479

482-
static final class DelayErrorInnerObserver<R> implements Observer<R> {
480+
static final class DelayErrorInnerObserver<R> extends AtomicReference<Disposable> implements Observer<R> {
481+
482+
private static final long serialVersionUID = 2620149119579502636L;
483483

484484
final Observer<? super R> actual;
485485

@@ -492,7 +492,7 @@ static final class DelayErrorInnerObserver<R> implements Observer<R> {
492492

493493
@Override
494494
public void onSubscribe(Disposable d) {
495-
parent.arbiter.replace(d);
495+
DisposableHelper.replace(this, d);
496496
}
497497

498498
@Override
@@ -520,6 +520,10 @@ public void onComplete() {
520520
p.active = false;
521521
p.drain();
522522
}
523+
524+
void dispose() {
525+
DisposableHelper.dispose(this);
526+
}
523527
}
524528
}
525529
}

src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapCompletable.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,18 @@
1212
*/
1313
package io.reactivex.internal.operators.observable;
1414

15+
import java.util.concurrent.atomic.*;
16+
1517
import io.reactivex.*;
1618
import io.reactivex.disposables.Disposable;
1719
import io.reactivex.exceptions.Exceptions;
1820
import io.reactivex.functions.Function;
1921
import io.reactivex.internal.disposables.DisposableHelper;
20-
import io.reactivex.internal.disposables.SequentialDisposable;
2122
import io.reactivex.internal.functions.ObjectHelper;
22-
import io.reactivex.internal.fuseable.QueueDisposable;
23-
import io.reactivex.internal.fuseable.SimpleQueue;
23+
import io.reactivex.internal.fuseable.*;
2424
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2525
import io.reactivex.plugins.RxJavaPlugins;
2626

27-
import java.util.concurrent.atomic.AtomicInteger;
28-
2927
public final class ObservableConcatMapCompletable<T> extends Completable {
3028

3129
final ObservableSource<T> source;
@@ -47,9 +45,8 @@ static final class SourceObserver<T> extends AtomicInteger implements Observer<T
4745

4846
private static final long serialVersionUID = 6893587405571511048L;
4947
final CompletableObserver actual;
50-
final SequentialDisposable sa;
5148
final Function<? super T, ? extends CompletableSource> mapper;
52-
final CompletableObserver inner;
49+
final InnerObserver inner;
5350
final int bufferSize;
5451

5552
SimpleQueue<T> queue;
@@ -70,7 +67,6 @@ static final class SourceObserver<T> extends AtomicInteger implements Observer<T
7067
this.mapper = mapper;
7168
this.bufferSize = bufferSize;
7269
this.inner = new InnerObserver(actual, this);
73-
this.sa = new SequentialDisposable();
7470
}
7571
@Override
7672
public void onSubscribe(Disposable s) {
@@ -149,18 +145,14 @@ public boolean isDisposed() {
149145
@Override
150146
public void dispose() {
151147
disposed = true;
152-
sa.dispose();
148+
inner.dispose();
153149
s.dispose();
154150

155151
if (getAndIncrement() == 0) {
156152
queue.clear();
157153
}
158154
}
159155

160-
void innerSubscribe(Disposable s) {
161-
sa.update(s);
162-
}
163-
164156
void drain() {
165157
if (getAndIncrement() != 0) {
166158
return;
@@ -219,7 +211,8 @@ void drain() {
219211
}
220212
}
221213

222-
static final class InnerObserver implements CompletableObserver {
214+
static final class InnerObserver extends AtomicReference<Disposable> implements CompletableObserver {
215+
private static final long serialVersionUID = -5987419458390772447L;
223216
final CompletableObserver actual;
224217
final SourceObserver<?> parent;
225218

@@ -230,7 +223,7 @@ static final class InnerObserver implements CompletableObserver {
230223

231224
@Override
232225
public void onSubscribe(Disposable s) {
233-
parent.innerSubscribe(s);
226+
DisposableHelper.set(this, s);
234227
}
235228

236229
@Override
@@ -242,6 +235,10 @@ public void onError(Throwable t) {
242235
public void onComplete() {
243236
parent.innerComplete();
244237
}
238+
239+
void dispose() {
240+
DisposableHelper.dispose(this);
241+
}
245242
}
246243
}
247244
}

0 commit comments

Comments
 (0)