Skip to content

Commit ba6f392

Browse files
authored
2.x: Observable.repeatWhen fix for onError (#4819)
1 parent 6df4ea3 commit ba6f392

File tree

4 files changed

+50
-7
lines changed

4 files changed

+50
-7
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8566,7 +8566,7 @@ public final Observable<T> repeatUntil(BooleanSupplier stop) {
85668566
@SchedulerSupport(SchedulerSupport.NONE)
85678567
public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
85688568
ObjectHelper.requireNonNull(handler, "handler is null");
8569-
return RxJavaPlugins.onAssembly(new ObservableRedo<T>(this, ObservableInternalHelper.repeatWhenHandler(handler)));
8569+
return RxJavaPlugins.onAssembly(new ObservableRedo<T>(this, ObservableInternalHelper.repeatWhenHandler(handler), false));
85708570
}
85718571

85728572
/**
@@ -9219,7 +9219,7 @@ public final Observable<T> retryUntil(final BooleanSupplier stop) {
92199219
public final Observable<T> retryWhen(
92209220
final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
92219221
ObjectHelper.requireNonNull(handler, "handler is null");
9222-
return RxJavaPlugins.onAssembly(new ObservableRedo<T>(this, ObservableInternalHelper.retryWhenHandler(handler)));
9222+
return RxJavaPlugins.onAssembly(new ObservableRedo<T>(this, ObservableInternalHelper.retryWhenHandler(handler), true));
92239223
}
92249224

92259225
/**

src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,22 @@
2727
public final class ObservableRedo<T> extends AbstractObservableWithUpstream<T, T> {
2828
final Function<? super Observable<Notification<Object>>, ? extends ObservableSource<?>> manager;
2929

30+
final boolean retryMode;
31+
3032
public ObservableRedo(ObservableSource<T> source,
31-
Function<? super Observable<Notification<Object>>, ? extends ObservableSource<?>> manager) {
33+
Function<? super Observable<Notification<Object>>, ? extends ObservableSource<?>> manager,
34+
boolean retryMode) {
3235
super(source);
3336
this.manager = manager;
37+
this.retryMode = retryMode;
3438
}
3539

3640
@Override
3741
public void subscribeActual(Observer<? super T> s) {
3842

3943
Subject<Notification<Object>> subject = BehaviorSubject.<Notification<Object>>create().toSerialized();
4044

41-
final RedoObserver<T> parent = new RedoObserver<T>(s, subject, source);
45+
final RedoObserver<T> parent = new RedoObserver<T>(s, subject, source, retryMode);
4246

4347
ToNotificationObserver<Object> actionObserver = new ToNotificationObserver<Object>(new Consumer<Notification<Object>>() {
4448
@Override
@@ -73,13 +77,16 @@ static final class RedoObserver<T> extends AtomicBoolean implements Observer<T>
7377
final ObservableSource<? extends T> source;
7478
final SequentialDisposable arbiter;
7579

80+
final boolean retryMode;
81+
7682
final AtomicInteger wip = new AtomicInteger();
7783

78-
RedoObserver(Observer<? super T> actual, Subject<Notification<Object>> subject, ObservableSource<? extends T> source) {
84+
RedoObserver(Observer<? super T> actual, Subject<Notification<Object>> subject, ObservableSource<? extends T> source, boolean retryMode) {
7985
this.actual = actual;
8086
this.subject = subject;
8187
this.source = source;
8288
this.arbiter = new SequentialDisposable();
89+
this.retryMode = retryMode;
8390
this.lazySet(true);
8491
}
8592

@@ -96,14 +103,22 @@ public void onNext(T t) {
96103
@Override
97104
public void onError(Throwable t) {
98105
if (compareAndSet(false, true)) {
99-
subject.onNext(Notification.createOnError(t));
106+
if (retryMode) {
107+
subject.onNext(Notification.createOnError(t));
108+
} else {
109+
subject.onError(t);
110+
}
100111
}
101112
}
102113

103114
@Override
104115
public void onComplete() {
105116
if (compareAndSet(false, true)) {
106-
subject.onNext(Notification.createOnComplete());
117+
if (retryMode) {
118+
subject.onComplete();
119+
} else {
120+
subject.onNext(Notification.createOnComplete());
121+
}
107122
}
108123
}
109124

src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,4 +329,18 @@ public Flowable<Object> apply(Object ignore) throws Exception {
329329
disposable.dispose();
330330
assertFalse(subject.hasSubscribers());
331331
}
332+
333+
@Test
334+
public void testRepeatWhen() {
335+
Flowable.error(new TestException())
336+
.repeatWhen(new Function<Flowable<Object>, Flowable<Object>>() {
337+
@Override
338+
public Flowable<Object> apply(Flowable<Object> v) throws Exception {
339+
return v.delay(10, TimeUnit.SECONDS);
340+
}
341+
})
342+
.test()
343+
.awaitDone(5, TimeUnit.SECONDS)
344+
.assertFailure(TestException.class);
345+
}
332346
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,18 @@ public ObservableSource<Object> apply(Object ignore) throws Exception {
280280
disposable.dispose();
281281
assertFalse(subject.hasObservers());
282282
}
283+
284+
@Test
285+
public void testRepeatWhen() {
286+
Observable.error(new TestException())
287+
.repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
288+
@Override
289+
public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
290+
return v.delay(10, TimeUnit.SECONDS);
291+
}
292+
})
293+
.test()
294+
.awaitDone(5, TimeUnit.SECONDS)
295+
.assertFailure(TestException.class);
296+
}
283297
}

0 commit comments

Comments
 (0)