Skip to content

Commit 90bca55

Browse files
authored
2.x: Improve coverage, fix operator logic 03/12 (#5910)
1 parent ab52050 commit 90bca55

24 files changed

+895
-86
lines changed

src/main/java/io/reactivex/exceptions/CompositeException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public int size() {
278278
* @param e the {@link Throwable} {@code e}.
279279
* @return The root cause of {@code e}. If {@code e.getCause()} returns {@code null} or {@code e}, just return {@code e} itself.
280280
*/
281-
private Throwable getRootCause(Throwable e) {
281+
/*private */Throwable getRootCause(Throwable e) {
282282
Throwable root = e.getCause();
283283
if (root == null || cause == root) {
284284
return e;

src/main/java/io/reactivex/internal/functions/ObjectHelper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static int compare(int v1, int v2) {
7171
}
7272

7373
/**
74-
* Compares two integer values similar to Long.compare.
74+
* Compares two long values similar to Long.compare.
7575
* @param v1 the first value
7676
* @param v2 the second value
7777
* @return the comparison result

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java

+11-15
Original file line numberDiff line numberDiff line change
@@ -196,24 +196,20 @@ void next() {
196196

197197
BufferBoundarySubscriber<T, U, B> bs = new BufferBoundarySubscriber<T, U, B>(this);
198198

199-
Disposable o = other.get();
200-
201-
if (!other.compareAndSet(o, bs)) {
202-
return;
203-
}
204-
205-
U b;
206-
synchronized (this) {
207-
b = buffer;
208-
if (b == null) {
209-
return;
199+
if (DisposableHelper.replace(other, bs)) {
200+
U b;
201+
synchronized (this) {
202+
b = buffer;
203+
if (b == null) {
204+
return;
205+
}
206+
buffer = next;
210207
}
211-
buffer = next;
212-
}
213208

214-
boundary.subscribe(bs);
209+
boundary.subscribe(bs);
215210

216-
fastPathEmitMax(b, false, this);
211+
fastPathEmitMax(b, false, this);
212+
}
217213
}
218214

219215
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ public void request(long n) {
177177

178178
@Override
179179
public void cancel() {
180+
cancelled = true;
180181
s.cancel();
181-
182182
DisposableHelper.dispose(timer);
183183
}
184184

@@ -199,14 +199,10 @@ public void run() {
199199

200200
synchronized (this) {
201201
current = buffer;
202-
if (current != null) {
203-
buffer = next;
202+
if (current == null) {
203+
return;
204204
}
205-
}
206-
207-
if (current == null) {
208-
DisposableHelper.dispose(timer);
209-
return;
205+
buffer = next;
210206
}
211207

212208
fastPathEmitMax(current, false, this);
@@ -324,9 +320,10 @@ public void request(long n) {
324320

325321
@Override
326322
public void cancel() {
327-
clear();
323+
cancelled = true;
328324
s.cancel();
329325
w.dispose();
326+
clear();
330327
}
331328

332329
void clear() {

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

+20-17
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
5858

5959
Subscription s;
6060

61-
final SequentialDisposable timer = new SequentialDisposable();
61+
Disposable timer;
6262

6363
volatile long index;
6464

@@ -88,17 +88,15 @@ public void onNext(T t) {
8888
long idx = index + 1;
8989
index = idx;
9090

91-
Disposable d = timer.get();
91+
Disposable d = timer;
9292
if (d != null) {
9393
d.dispose();
9494
}
9595

9696
DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
97-
if (timer.replace(de)) {
98-
d = worker.schedule(de, timeout, unit);
99-
100-
de.setResource(d);
101-
}
97+
timer = de;
98+
d = worker.schedule(de, timeout, unit);
99+
de.setResource(d);
102100
}
103101

104102
@Override
@@ -108,6 +106,10 @@ public void onError(Throwable t) {
108106
return;
109107
}
110108
done = true;
109+
Disposable d = timer;
110+
if (d != null) {
111+
d.dispose();
112+
}
111113
actual.onError(t);
112114
worker.dispose();
113115
}
@@ -119,17 +121,18 @@ public void onComplete() {
119121
}
120122
done = true;
121123

122-
Disposable d = timer.get();
123-
if (!DisposableHelper.isDisposed(d)) {
124-
@SuppressWarnings("unchecked")
125-
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
126-
if (de != null) {
127-
de.emit();
128-
}
129-
DisposableHelper.dispose(timer);
130-
actual.onComplete();
131-
worker.dispose();
124+
Disposable d = timer;
125+
if (d != null) {
126+
d.dispose();
127+
}
128+
@SuppressWarnings("unchecked")
129+
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
130+
if (de != null) {
131+
de.emit();
132132
}
133+
134+
actual.onComplete();
135+
worker.dispose();
133136
}
134137

135138
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,13 @@ public void onError(Throwable t) {
127127
@Override
128128
public void onComplete() {
129129
U b = buffer;
130-
buffer = null;
131-
if (b != null && !b.isEmpty()) {
132-
actual.onNext(b);
130+
if (b != null) {
131+
buffer = null;
132+
if (!b.isEmpty()) {
133+
actual.onNext(b);
134+
}
135+
actual.onComplete();
133136
}
134-
actual.onComplete();
135137
}
136138
}
137139

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java

+11-15
Original file line numberDiff line numberDiff line change
@@ -190,24 +190,20 @@ void next() {
190190

191191
BufferBoundaryObserver<T, U, B> bs = new BufferBoundaryObserver<T, U, B>(this);
192192

193-
Disposable o = other.get();
194-
195-
if (!other.compareAndSet(o, bs)) {
196-
return;
197-
}
198-
199-
U b;
200-
synchronized (this) {
201-
b = buffer;
202-
if (b == null) {
203-
return;
193+
if (DisposableHelper.replace(other, bs)) {
194+
U b;
195+
synchronized (this) {
196+
b = buffer;
197+
if (b == null) {
198+
return;
199+
}
200+
buffer = next;
204201
}
205-
buffer = next;
206-
}
207202

208-
boundary.subscribe(bs);
203+
boundary.subscribe(bs);
209204

210-
fastPathEmit(b, false, this);
205+
fastPathEmit(b, false, this);
206+
}
211207
}
212208

213209
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java

+19-17
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ static final class DebounceTimedObserver<T>
5151

5252
Disposable s;
5353

54-
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
54+
Disposable timer;
5555

5656
volatile long index;
5757

@@ -80,18 +80,15 @@ public void onNext(T t) {
8080
long idx = index + 1;
8181
index = idx;
8282

83-
Disposable d = timer.get();
83+
Disposable d = timer;
8484
if (d != null) {
8585
d.dispose();
8686
}
8787

8888
DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
89-
if (timer.compareAndSet(d, de)) {
90-
d = worker.schedule(de, timeout, unit);
91-
92-
de.setResource(d);
93-
}
94-
89+
timer = de;
90+
d = worker.schedule(de, timeout, unit);
91+
de.setResource(d);
9592
}
9693

9794
@Override
@@ -100,6 +97,10 @@ public void onError(Throwable t) {
10097
RxJavaPlugins.onError(t);
10198
return;
10299
}
100+
Disposable d = timer;
101+
if (d != null) {
102+
d.dispose();
103+
}
103104
done = true;
104105
actual.onError(t);
105106
worker.dispose();
@@ -112,16 +113,17 @@ public void onComplete() {
112113
}
113114
done = true;
114115

115-
Disposable d = timer.get();
116-
if (d != DisposableHelper.DISPOSED) {
117-
@SuppressWarnings("unchecked")
118-
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
119-
if (de != null) {
120-
de.run();
121-
}
122-
actual.onComplete();
123-
worker.dispose();
116+
Disposable d = timer;
117+
if (d != null) {
118+
d.dispose();
124119
}
120+
@SuppressWarnings("unchecked")
121+
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
122+
if (de != null) {
123+
de.run();
124+
}
125+
actual.onComplete();
126+
worker.dispose();
125127
}
126128

127129
@Override

src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,9 @@ public void onSubscribe(Disposable d) {
8585
public void onSuccess(T value) {
8686
other.dispose();
8787

88-
Disposable a = get();
88+
Disposable a = getAndSet(DisposableHelper.DISPOSED);
8989
if (a != DisposableHelper.DISPOSED) {
90-
a = getAndSet(DisposableHelper.DISPOSED);
91-
if (a != DisposableHelper.DISPOSED) {
92-
actual.onSuccess(value);
93-
}
90+
actual.onSuccess(value);
9491
}
9592
}
9693

src/main/java/io/reactivex/internal/util/MergerBiFunction.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ public List<T> apply(List<T> a, List<T> b) throws Exception {
5858
while (at.hasNext()) {
5959
both.add(at.next());
6060
}
61-
} else
62-
if (s2 != null) {
61+
} else {
6362
both.add(s2);
6463
while (bt.hasNext()) {
6564
both.add(bt.next());

src/test/java/io/reactivex/exceptions/CompositeExceptionTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,22 @@ public void badException() {
349349
assertSame(e, new CompositeException(e).getCause().getCause());
350350
assertSame(e, new CompositeException(new RuntimeException(e)).getCause().getCause().getCause());
351351
}
352+
353+
@Test
354+
public void rootCauseEval() {
355+
final TestException ex0 = new TestException();
356+
Throwable throwable = new Throwable() {
357+
358+
private static final long serialVersionUID = 3597694032723032281L;
359+
360+
@Override
361+
public synchronized Throwable getCause() {
362+
return ex0;
363+
}
364+
};
365+
CompositeException ex = new CompositeException(throwable);
366+
assertSame(ex, ex.getRootCause(ex));
367+
}
352368
}
353369

354370
final class BadException extends Throwable {

src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,11 @@ public void compare() {
5858
assertEquals(0, ObjectHelper.compare(0, 0));
5959
assertEquals(1, ObjectHelper.compare(2, 0));
6060
}
61+
62+
@Test
63+
public void compareLong() {
64+
assertEquals(-1, ObjectHelper.compare(0L, 2L));
65+
assertEquals(0, ObjectHelper.compare(0L, 0L));
66+
assertEquals(1, ObjectHelper.compare(2L, 0L));
67+
}
6168
}

0 commit comments

Comments
 (0)