Skip to content

Commit 002074d

Browse files
committed
1.x: fix using() resource cleanup when factory throws or being non-eager
1 parent 1512c10 commit 002074d

File tree

2 files changed

+116
-35
lines changed

2 files changed

+116
-35
lines changed

src/main/java/rx/internal/operators/OnSubscribeUsing.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
/**
2828
* Constructs an observable sequence that depends on a resource object.
29+
*
30+
* @param <T> the output value type
31+
* @param <Resource> the resource type
2932
*/
3033
public final class OnSubscribeUsing<T, Resource> implements OnSubscribe<T> {
3134

@@ -56,22 +59,43 @@ public void call(final Subscriber<? super T> subscriber) {
5659
// dispose on unsubscription
5760
subscriber.add(disposeOnceOnly);
5861
// create the observable
59-
final Observable<? extends T> source = observableFactory
60-
// create the observable
61-
.call(resource);
62+
final Observable<? extends T> source;
63+
64+
try {
65+
source = observableFactory
66+
// create the observable
67+
.call(resource);
68+
} catch (Throwable e) {
69+
Throwable disposeError = dispose(disposeOnceOnly);
70+
Exceptions.throwIfFatal(e);
71+
Exceptions.throwIfFatal(disposeError);
72+
if (disposeError != null) {
73+
subscriber.onError(new CompositeException(Arrays.asList(e, disposeError)));
74+
} else {
75+
// propagate error
76+
subscriber.onError(e);
77+
}
78+
return;
79+
}
80+
6281
final Observable<? extends T> observable;
6382
// supplement with on termination disposal if requested
64-
if (disposeEagerly)
83+
if (disposeEagerly) {
6584
observable = source
6685
// dispose on completion or error
6786
.doOnTerminate(disposeOnceOnly);
68-
else
69-
observable = source;
87+
} else {
88+
// dispose after the terminal signals were sent out
89+
observable = source
90+
.doAfterTerminate(disposeOnceOnly)
91+
;
92+
}
93+
7094
try {
7195
// start
7296
observable.unsafeSubscribe(Subscribers.wrap(subscriber));
7397
} catch (Throwable e) {
74-
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
98+
Throwable disposeError = dispose(disposeOnceOnly);
7599
Exceptions.throwIfFatal(e);
76100
Exceptions.throwIfFatal(disposeError);
77101
if (disposeError != null)
@@ -86,16 +110,13 @@ public void call(final Subscriber<? super T> subscriber) {
86110
}
87111
}
88112

89-
private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) {
90-
if (disposeEagerly)
91-
try {
92-
disposeOnceOnly.call();
93-
return null;
94-
} catch (Throwable e) {
95-
return e;
96-
}
97-
else
113+
private Throwable dispose(final Action0 disposeOnceOnly) {
114+
try {
115+
disposeOnceOnly.call();
98116
return null;
117+
} catch (Throwable e) {
118+
return e;
119+
}
99120
}
100121

101122
private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0,

src/test/java/rx/internal/operators/OnSubscribeUsingTest.java

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,22 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.fail;
20-
import static org.mockito.Mockito.inOrder;
21-
import static org.mockito.Mockito.mock;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
24-
import static org.mockito.Mockito.when;
25-
26-
import java.util.ArrayList;
27-
import java.util.Arrays;
28-
import java.util.List;
29-
30-
import org.junit.Test;
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.*;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
import org.junit.*;
3125
import org.mockito.InOrder;
3226

27+
import rx.*;
3328
import rx.Observable;
3429
import rx.Observable.OnSubscribe;
3530
import rx.Observer;
36-
import rx.Subscriber;
37-
import rx.Subscription;
3831
import rx.exceptions.TestException;
39-
import rx.functions.Action0;
40-
import rx.functions.Action1;
41-
import rx.functions.Func0;
42-
import rx.functions.Func1;
32+
import rx.functions.*;
33+
import rx.observers.TestSubscriber;
4334
import rx.subscriptions.Subscriptions;
4435

4536
public class OnSubscribeUsingTest {
@@ -432,4 +423,73 @@ public void call() {
432423
};
433424
}
434425

426+
@Test
427+
public void factoryThrows() {
428+
429+
TestSubscriber<Integer> ts = TestSubscriber.create();
430+
431+
final AtomicInteger count = new AtomicInteger();
432+
433+
Observable.<Integer, Integer>using(
434+
new Func0<Integer>() {
435+
@Override
436+
public Integer call() {
437+
return 1;
438+
}
439+
},
440+
new Func1<Integer, Observable<Integer>>() {
441+
@Override
442+
public Observable<Integer> call(Integer v) {
443+
throw new TestException("forced failure");
444+
}
445+
},
446+
new Action1<Integer>() {
447+
@Override
448+
public void call(Integer c) {
449+
count.incrementAndGet();
450+
}
451+
}
452+
)
453+
.unsafeSubscribe(ts);
454+
455+
ts.assertError(TestException.class);
456+
457+
Assert.assertEquals(1, count.get());
458+
}
459+
460+
@Test
461+
public void nonEagerTermination() {
462+
463+
TestSubscriber<Integer> ts = TestSubscriber.create();
464+
465+
final AtomicInteger count = new AtomicInteger();
466+
467+
Observable.<Integer, Integer>using(
468+
new Func0<Integer>() {
469+
@Override
470+
public Integer call() {
471+
return 1;
472+
}
473+
},
474+
new Func1<Integer, Observable<Integer>>() {
475+
@Override
476+
public Observable<Integer> call(Integer v) {
477+
return Observable.just(v);
478+
}
479+
},
480+
new Action1<Integer>() {
481+
@Override
482+
public void call(Integer c) {
483+
count.incrementAndGet();
484+
}
485+
}, false
486+
)
487+
.unsafeSubscribe(ts);
488+
489+
ts.assertValue(1);
490+
ts.assertNoErrors();
491+
ts.assertCompleted();
492+
493+
Assert.assertEquals(1, count.get());
494+
}
435495
}

0 commit comments

Comments
 (0)