Skip to content

Commit 421c5bb

Browse files
authored
2.x: improve coverage, remove unused code (#5119)
1 parent 4a32963 commit 421c5bb

24 files changed

+1122
-841
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.reactivex.internal.functions.*;
2626
import io.reactivex.internal.fuseable.*;
2727
import io.reactivex.internal.operators.flowable.*;
28-
import io.reactivex.internal.operators.flowable.FlowableStrict.StrictSubscriber;
2928
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
3029
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3130
import io.reactivex.internal.subscribers.*;

src/main/java/io/reactivex/internal/observers/QueueDrainObserver.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.Observer;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.fuseable.SimpleQueue;
20+
import io.reactivex.internal.fuseable.*;
2121
import io.reactivex.internal.util.*;
2222

2323
/**
@@ -30,14 +30,14 @@
3030
*/
3131
public abstract class QueueDrainObserver<T, U, V> extends QueueDrainSubscriberPad2 implements Observer<T>, ObservableQueueDrain<U, V> {
3232
protected final Observer<? super V> actual;
33-
protected final SimpleQueue<U> queue;
33+
protected final SimplePlainQueue<U> queue;
3434

3535
protected volatile boolean cancelled;
3636

3737
protected volatile boolean done;
3838
protected Throwable error;
3939

40-
public QueueDrainObserver(Observer<? super V> actual, SimpleQueue<U> queue) {
40+
public QueueDrainObserver(Observer<? super V> actual, SimplePlainQueue<U> queue) {
4141
this.actual = actual;
4242
this.queue = queue;
4343
}
@@ -63,7 +63,7 @@ public final boolean fastEnter() {
6363

6464
protected final void fastPathEmit(U value, boolean delayError, Disposable dispose) {
6565
final Observer<? super V> s = actual;
66-
final SimpleQueue<U> q = queue;
66+
final SimplePlainQueue<U> q = queue;
6767

6868
if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
6969
accept(s, value);
@@ -87,7 +87,7 @@ protected final void fastPathEmit(U value, boolean delayError, Disposable dispos
8787
*/
8888
protected final void fastPathOrderedEmit(U value, boolean delayError, Disposable disposable) {
8989
final Observer<? super V> s = actual;
90-
final SimpleQueue<U> q = queue;
90+
final SimplePlainQueue<U> q = queue;
9191

9292
if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
9393
if (q.isEmpty()) {
@@ -117,12 +117,6 @@ public final int leave(int m) {
117117
return wip.addAndGet(m);
118118
}
119119

120-
public void drain(boolean delayError, Disposable dispose) {
121-
if (enter()) {
122-
QueueDrainHelper.drainLoop(queue, actual, delayError, dispose, this);
123-
}
124-
}
125-
126120
@Override
127121
public void accept(Observer<? super V> a, U v) {
128122
// ignored by default

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.reactivex.exceptions.Exceptions;
2525
import io.reactivex.functions.Function;
2626
import io.reactivex.internal.functions.ObjectHelper;
27-
import io.reactivex.internal.fuseable.SimpleQueue;
27+
import io.reactivex.internal.fuseable.SimplePlainQueue;
2828
import io.reactivex.internal.queue.MpscLinkedQueue;
2929
import io.reactivex.internal.subscribers.QueueDrainSubscriber;
3030
import io.reactivex.internal.subscriptions.SubscriptionHelper;
@@ -128,7 +128,7 @@ void complete() {
128128
buffers.clear();
129129
}
130130

131-
SimpleQueue<U> q = queue;
131+
SimplePlainQueue<U> q = queue;
132132
for (U u : list) {
133133
q.offer(u);
134134
}

0 commit comments

Comments
 (0)