Skip to content

Commit 27c63b6

Browse files
authored
2.x: Improve Completable.delay operator internals (#6096)
1 parent 1aeac06 commit 27c63b6

File tree

2 files changed

+117
-37
lines changed

2 files changed

+117
-37
lines changed

src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
package io.reactivex.internal.operators.completable;
1515

1616
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicReference;
1718

1819
import io.reactivex.*;
19-
import io.reactivex.disposables.*;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.internal.disposables.DisposableHelper;
2022

2123
public final class CompletableDelay extends Completable {
2224

@@ -40,54 +42,70 @@ public CompletableDelay(CompletableSource source, long delay, TimeUnit unit, Sch
4042

4143
@Override
4244
protected void subscribeActual(final CompletableObserver s) {
43-
final CompositeDisposable set = new CompositeDisposable();
44-
45-
source.subscribe(new Delay(set, s));
45+
source.subscribe(new Delay(s, delay, unit, scheduler, delayError));
4646
}
4747

48-
final class Delay implements CompletableObserver {
48+
static final class Delay extends AtomicReference<Disposable>
49+
implements CompletableObserver, Runnable, Disposable {
50+
51+
private static final long serialVersionUID = 465972761105851022L;
52+
53+
final CompletableObserver downstream;
54+
55+
final long delay;
56+
57+
final TimeUnit unit;
4958

50-
private final CompositeDisposable set;
51-
final CompletableObserver s;
59+
final Scheduler scheduler;
5260

53-
Delay(CompositeDisposable set, CompletableObserver s) {
54-
this.set = set;
55-
this.s = s;
61+
final boolean delayError;
62+
63+
Throwable error;
64+
65+
Delay(CompletableObserver downstream, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
66+
this.downstream = downstream;
67+
this.delay = delay;
68+
this.unit = unit;
69+
this.scheduler = scheduler;
70+
this.delayError = delayError;
71+
}
72+
73+
@Override
74+
public void onSubscribe(Disposable d) {
75+
if (DisposableHelper.setOnce(this, d)) {
76+
downstream.onSubscribe(this);
77+
}
5678
}
5779

5880
@Override
5981
public void onComplete() {
60-
set.add(scheduler.scheduleDirect(new OnComplete(), delay, unit));
82+
DisposableHelper.replace(this, scheduler.scheduleDirect(this, delay, unit));
6183
}
6284

6385
@Override
6486
public void onError(final Throwable e) {
65-
set.add(scheduler.scheduleDirect(new OnError(e), delayError ? delay : 0, unit));
87+
error = e;
88+
DisposableHelper.replace(this, scheduler.scheduleDirect(this, delayError ? delay : 0, unit));
6689
}
6790

6891
@Override
69-
public void onSubscribe(Disposable d) {
70-
set.add(d);
71-
s.onSubscribe(set);
92+
public void dispose() {
93+
DisposableHelper.dispose(this);
7294
}
7395

74-
final class OnComplete implements Runnable {
75-
@Override
76-
public void run() {
77-
s.onComplete();
78-
}
96+
@Override
97+
public boolean isDisposed() {
98+
return DisposableHelper.isDisposed(get());
7999
}
80100

81-
final class OnError implements Runnable {
82-
private final Throwable e;
83-
84-
OnError(Throwable e) {
85-
this.e = e;
86-
}
87-
88-
@Override
89-
public void run() {
90-
s.onError(e);
101+
@Override
102+
public void run() {
103+
Throwable e = error;
104+
error = null;
105+
if (e != null) {
106+
downstream.onError(e);
107+
} else {
108+
downstream.onComplete();
91109
}
92110
}
93111
}

src/test/java/io/reactivex/internal/operators/completable/CompletableDelayTest.java

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@
1313

1414
package io.reactivex.internal.operators.completable;
1515

16-
import java.util.concurrent.CountDownLatch;
17-
import java.util.concurrent.TimeUnit;
16+
import static org.junit.Assert.assertNotEquals;
17+
18+
import java.util.concurrent.*;
1819
import java.util.concurrent.atomic.AtomicReference;
1920

20-
import io.reactivex.functions.Consumer;
2121
import org.junit.Test;
2222

23-
import io.reactivex.Completable;
24-
import io.reactivex.schedulers.Schedulers;
25-
26-
import static org.junit.Assert.assertNotEquals;
23+
import io.reactivex.*;
24+
import io.reactivex.exceptions.TestException;
25+
import io.reactivex.functions.*;
26+
import io.reactivex.observers.TestObserver;
27+
import io.reactivex.schedulers.*;
2728

2829
public class CompletableDelayTest {
2930

@@ -58,4 +59,65 @@ public void accept(Throwable throwable) throws Exception {
5859
assertNotEquals(Thread.currentThread(), thread.get());
5960
}
6061

62+
@Test
63+
public void disposed() {
64+
TestHelper.checkDisposed(Completable.never().delay(1, TimeUnit.MINUTES));
65+
}
66+
67+
@Test
68+
public void doubleOnSubscribe() {
69+
TestHelper.checkDoubleOnSubscribeCompletable(new Function<Completable, CompletableSource>() {
70+
@Override
71+
public CompletableSource apply(Completable c) throws Exception {
72+
return c.delay(1, TimeUnit.MINUTES);
73+
}
74+
});
75+
}
76+
77+
@Test
78+
public void normal() {
79+
Completable.complete()
80+
.delay(1, TimeUnit.MILLISECONDS)
81+
.test()
82+
.awaitDone(5, TimeUnit.SECONDS)
83+
.assertResult();
84+
}
85+
86+
@Test
87+
public void errorNotDelayed() {
88+
TestScheduler scheduler = new TestScheduler();
89+
90+
TestObserver<Void> to = Completable.error(new TestException())
91+
.delay(100, TimeUnit.MILLISECONDS, scheduler, false)
92+
.test();
93+
94+
to.assertEmpty();
95+
96+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
97+
98+
to.assertFailure(TestException.class);
99+
100+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
101+
102+
to.assertFailure(TestException.class);
103+
}
104+
105+
@Test
106+
public void errorDelayed() {
107+
TestScheduler scheduler = new TestScheduler();
108+
109+
TestObserver<Void> to = Completable.error(new TestException())
110+
.delay(100, TimeUnit.MILLISECONDS, scheduler, true)
111+
.test();
112+
113+
to.assertEmpty();
114+
115+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
116+
117+
to.assertEmpty();
118+
119+
scheduler.advanceTimeBy(99, TimeUnit.MILLISECONDS);
120+
121+
to.assertFailure(TestException.class);
122+
}
61123
}

0 commit comments

Comments
 (0)