Skip to content

Commit cb05a26

Browse files
authored
2.x: Improve coverage & related cleanup 03/05 (#5891)
* 2.x: Improve coverage & related cleanup 03/05 * Fix camelCase local variable naming errors in tests.
1 parent 51dd03b commit cb05a26

File tree

93 files changed

+2523
-433
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+2523
-433
lines changed

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,7 @@ public T next() {
125125

126126
@Override
127127
public void onSubscribe(Subscription s) {
128-
if (SubscriptionHelper.setOnce(this, s)) {
129-
s.request(batchSize);
130-
}
128+
SubscriptionHelper.setOnce(this, s, batchSize);
131129
}
132130

133131
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,7 @@ static final class BufferOpenSubscriber<Open>
327327

328328
@Override
329329
public void onSubscribe(Subscription s) {
330-
if (SubscriptionHelper.setOnce(this, s)) {
331-
s.request(Long.MAX_VALUE);
332-
}
330+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
333331
}
334332

335333
@Override
@@ -378,9 +376,7 @@ static final class BufferCloseSubscriber<T, C extends Collection<? super T>>
378376

379377
@Override
380378
public void onSubscribe(Subscription s) {
381-
if (SubscriptionHelper.setOnce(this, s)) {
382-
s.request(Long.MAX_VALUE);
383-
}
379+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
384380
}
385381

386382
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,7 @@ public void removeChild(ReplaySubscription<T> p) {
180180

181181
@Override
182182
public void onSubscribe(Subscription s) {
183-
if (SubscriptionHelper.setOnce(connection, s)) {
184-
s.request(Long.MAX_VALUE);
185-
}
183+
SubscriptionHelper.setOnce(connection, s, Long.MAX_VALUE);
186184
}
187185

188186
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,7 @@ static final class CombineLatestInnerSubscriber<T>
516516

517517
@Override
518518
public void onSubscribe(Subscription s) {
519-
if (SubscriptionHelper.setOnce(this, s)) {
520-
s.request(prefetch);
521-
}
519+
SubscriptionHelper.setOnce(this, s, prefetch);
522520
}
523521

524522
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,7 @@ public boolean isDisposed() {
417417

418418
@Override
419419
public void onSubscribe(Subscription s) {
420-
if (SubscriptionHelper.setOnce(this, s)) {
421-
s.request(Long.MAX_VALUE);
422-
}
420+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
423421
}
424422

425423
@Override
@@ -470,9 +468,7 @@ public boolean isDisposed() {
470468

471469
@Override
472470
public void onSubscribe(Subscription s) {
473-
if (SubscriptionHelper.setOnce(this, s)) {
474-
s.request(Long.MAX_VALUE);
475-
}
471+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
476472
}
477473

478474
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ static final class MergeWithObserver<T> extends AtomicInteger
9898
}
9999

100100
@Override
101-
public void onSubscribe(Subscription d) {
102-
if (SubscriptionHelper.setOnce(mainSubscription, d)) {
103-
d.request(prefetch);
104-
}
101+
public void onSubscribe(Subscription s) {
102+
SubscriptionHelper.setOnce(mainSubscription, s, prefetch);
105103
}
106104

107105
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ static final class MergeWithObserver<T> extends AtomicInteger
9898
}
9999

100100
@Override
101-
public void onSubscribe(Subscription d) {
102-
if (SubscriptionHelper.setOnce(mainSubscription, d)) {
103-
d.request(prefetch);
104-
}
101+
public void onSubscribe(Subscription s) {
102+
SubscriptionHelper.setOnce(mainSubscription, s, prefetch);
105103
}
106104

107105
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -526,36 +526,16 @@ static final class InnerSubscription<T> extends AtomicLong implements Subscripti
526526
public void request(long n) {
527527
// ignore negative requests
528528
if (SubscriptionHelper.validate(n)) {
529-
// In general, RxJava doesn't prevent concurrent requests (with each other or with
530-
// a cancel) so we need a CAS-loop, but we need to handle
531-
// request overflow and cancelled/not requested state as well.
532-
for (;;) {
533-
// get the current request amount
534-
long r = get();
535-
// if child called cancel() do nothing
536-
if (r == CANCELLED) {
537-
return;
538-
}
539-
// ignore zero requests except any first that sets in zero
540-
if (r >= 0L && n == 0) {
541-
return;
542-
}
543-
// otherwise, increase the request count
544-
long u = BackpressureHelper.addCap(r, n);
545-
546-
// try setting the new request value
547-
if (compareAndSet(r, u)) {
548-
// increment the total request counter
549-
BackpressureHelper.add(totalRequested, n);
550-
// if successful, notify the parent dispatcher this child can receive more
551-
// elements
552-
parent.manageRequests();
553-
554-
parent.buffer.replay(this);
555-
return;
556-
}
557-
// otherwise, someone else changed the state (perhaps a concurrent
558-
// request or cancellation) so retry
529+
// add to the current requested and cap it at MAX_VALUE
530+
// except when there was a concurrent cancellation
531+
if (BackpressureHelper.addCancel(this, n) != CANCELLED) {
532+
// increment the total request counter
533+
BackpressureHelper.add(totalRequested, n);
534+
// if successful, notify the parent dispatcher this child can receive more
535+
// elements
536+
parent.manageRequests();
537+
// try replaying any cached content
538+
parent.buffer.replay(this);
559539
}
560540
}
561541
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ public void onComplete() {
9393
completeMain();
9494
}
9595

96-
boolean setOther(Subscription o) {
97-
return SubscriptionHelper.setOnce(other, o);
96+
void setOther(Subscription o) {
97+
SubscriptionHelper.setOnce(other, o, Long.MAX_VALUE);
9898
}
9999

100100
@Override
@@ -150,9 +150,7 @@ static final class SamplerSubscriber<T> implements FlowableSubscriber<Object> {
150150

151151
@Override
152152
public void onSubscribe(Subscription s) {
153-
if (parent.setOther(s)) {
154-
s.request(Long.MAX_VALUE);
155-
}
153+
parent.setOther(s);
156154
}
157155

158156
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipUntil.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ final class OtherSubscriber extends AtomicReference<Subscription>
114114

115115
@Override
116116
public void onSubscribe(Subscription s) {
117-
if (SubscriptionHelper.setOnce(this, s)) {
118-
s.request(Long.MAX_VALUE);
119-
}
117+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
120118
}
121119

122120
@Override

0 commit comments

Comments
 (0)