Skip to content

Commit 1a1ba81

Browse files
committed
1.x: fix unsubscription and producer issues in sample(other)
1 parent c5a4902 commit 1a1ba81

File tree

2 files changed

+95
-8
lines changed

2 files changed

+95
-8
lines changed

src/main/java/rx/internal/operators/OperatorSampleWithObservable.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.atomic.AtomicReference;
19-
import rx.Observable;
19+
20+
import rx.*;
2021
import rx.Observable.Operator;
21-
import rx.Subscriber;
2222
import rx.observers.SerializedSubscriber;
2323

2424
/**
@@ -44,7 +44,9 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
4444

4545
final AtomicReference<Object> value = new AtomicReference<Object>(EMPTY_TOKEN);
4646

47-
Subscriber<U> samplerSub = new Subscriber<U>(child) {
47+
final AtomicReference<Subscription> main = new AtomicReference<Subscription>();
48+
49+
final Subscriber<U> samplerSub = new Subscriber<U>() {
4850
@Override
4951
public void onNext(U t) {
5052
Object localValue = value.getAndSet(EMPTY_TOKEN);
@@ -57,16 +59,18 @@ public void onNext(U t) {
5759

5860
@Override
5961
public void onError(Throwable e) {
62+
// no need to null check, main is assigned before any of the two gets subscribed
63+
main.get().unsubscribe();
6064
s.onError(e);
61-
unsubscribe();
6265
}
6366

6467
@Override
6568
public void onCompleted() {
69+
// no need to null check, main is assigned before any of the two gets subscribed
70+
main.get().unsubscribe();
71+
// onNext(null); // emit the very last value?
6672
s.onCompleted();
67-
unsubscribe();
6873
}
69-
7074
};
7175

7276
Subscriber<T> result = new Subscriber<T>() {
@@ -77,18 +81,22 @@ public void onNext(T t) {
7781

7882
@Override
7983
public void onError(Throwable e) {
84+
samplerSub.unsubscribe();
8085
s.onError(e);
81-
unsubscribe();
8286
}
8387

8488
@Override
8589
public void onCompleted() {
90+
samplerSub.unsubscribe();
91+
// samplerSub.onNext(null); // emit the very last value?
8692
s.onCompleted();
87-
unsubscribe();
8893
}
8994
};
9095

96+
main.lazySet(result);
97+
9198
child.add(result);
99+
child.add(samplerSub);
92100

93101
sampler.unsafeSubscribe(samplerSub);
94102

src/test/java/rx/internal/operators/OperatorSampleTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.junit.*;
2425
import org.mockito.InOrder;
2526

2627
import rx.*;
2728
import rx.Observable.OnSubscribe;
2829
import rx.functions.*;
30+
import rx.observers.TestSubscriber;
2931
import rx.schedulers.TestScheduler;
3032
import rx.subjects.PublishSubject;
3133

@@ -312,4 +314,81 @@ public void call(Long t) {
312314

313315
Assert.assertEquals(Long.MAX_VALUE, requested[0]);
314316
}
317+
318+
@Test
319+
public void dontUnsubscribeChild1() {
320+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
321+
322+
PublishSubject<Integer> source = PublishSubject.create();
323+
324+
PublishSubject<Integer> sampler = PublishSubject.create();
325+
326+
source.sample(sampler).unsafeSubscribe(ts);
327+
328+
source.onCompleted();
329+
330+
Assert.assertFalse("Source has subscribers?", source.hasObservers());
331+
Assert.assertFalse("Sampler has subscribers?", sampler.hasObservers());
332+
333+
Assert.assertFalse("TS unsubscribed?", ts.isUnsubscribed());
334+
}
335+
336+
@Test
337+
public void dontUnsubscribeChild2() {
338+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
339+
340+
PublishSubject<Integer> source = PublishSubject.create();
341+
342+
PublishSubject<Integer> sampler = PublishSubject.create();
343+
344+
source.sample(sampler).unsafeSubscribe(ts);
345+
346+
sampler.onCompleted();
347+
348+
Assert.assertFalse("Source has subscribers?", source.hasObservers());
349+
Assert.assertFalse("Sampler has subscribers?", sampler.hasObservers());
350+
351+
Assert.assertFalse("TS unsubscribed?", ts.isUnsubscribed());
352+
}
353+
354+
@Test
355+
public void neverSetProducer() {
356+
Observable<Integer> neverBackpressure = Observable.create(new OnSubscribe<Integer>() {
357+
@Override
358+
public void call(Subscriber<? super Integer> t) {
359+
t.setProducer(new Producer() {
360+
@Override
361+
public void request(long n) {
362+
// irrelevant in this test
363+
}
364+
});
365+
}
366+
});
367+
368+
final AtomicInteger count = new AtomicInteger();
369+
370+
neverBackpressure.sample(neverBackpressure).unsafeSubscribe(new Subscriber<Integer>() {
371+
@Override
372+
public void onNext(Integer t) {
373+
// irrelevant
374+
}
375+
376+
@Override
377+
public void onError(Throwable e) {
378+
// irrelevant
379+
}
380+
381+
@Override
382+
public void onCompleted() {
383+
// irrelevant
384+
}
385+
386+
@Override
387+
public void setProducer(Producer p) {
388+
count.incrementAndGet();
389+
}
390+
});
391+
392+
Assert.assertEquals(0, count.get());
393+
}
315394
}

0 commit comments

Comments
 (0)