Skip to content

Commit f4be307

Browse files
committed
Merge pull request #3658 from akarnokd/SampleOtherMoreFixes1x
1.x: fix unsubscription and producer issues in sample(other)
2 parents c5a4902 + 4369e1c commit f4be307

File tree

2 files changed

+172
-8
lines changed

2 files changed

+172
-8
lines changed

src/main/java/rx/internal/operators/OperatorSampleWithObservable.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.atomic.AtomicReference;
19-
import rx.Observable;
19+
20+
import rx.*;
2021
import rx.Observable.Operator;
21-
import rx.Subscriber;
2222
import rx.observers.SerializedSubscriber;
2323

2424
/**
@@ -44,7 +44,9 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
4444

4545
final AtomicReference<Object> value = new AtomicReference<Object>(EMPTY_TOKEN);
4646

47-
Subscriber<U> samplerSub = new Subscriber<U>(child) {
47+
final AtomicReference<Subscription> main = new AtomicReference<Subscription>();
48+
49+
final Subscriber<U> samplerSub = new Subscriber<U>() {
4850
@Override
4951
public void onNext(U t) {
5052
Object localValue = value.getAndSet(EMPTY_TOKEN);
@@ -58,15 +60,17 @@ public void onNext(U t) {
5860
@Override
5961
public void onError(Throwable e) {
6062
s.onError(e);
61-
unsubscribe();
63+
// no need to null check, main is assigned before any of the two gets subscribed
64+
main.get().unsubscribe();
6265
}
6366

6467
@Override
6568
public void onCompleted() {
69+
// onNext(null); // emit the very last value?
6670
s.onCompleted();
67-
unsubscribe();
71+
// no need to null check, main is assigned before any of the two gets subscribed
72+
main.get().unsubscribe();
6873
}
69-
7074
};
7175

7276
Subscriber<T> result = new Subscriber<T>() {
@@ -78,17 +82,23 @@ public void onNext(T t) {
7882
@Override
7983
public void onError(Throwable e) {
8084
s.onError(e);
81-
unsubscribe();
85+
86+
samplerSub.unsubscribe();
8287
}
8388

8489
@Override
8590
public void onCompleted() {
91+
// samplerSub.onNext(null); // emit the very last value?
8692
s.onCompleted();
87-
unsubscribe();
93+
94+
samplerSub.unsubscribe();
8895
}
8996
};
9097

98+
main.lazySet(result);
99+
91100
child.add(result);
101+
child.add(samplerSub);
92102

93103
sampler.unsafeSubscribe(samplerSub);
94104

src/test/java/rx/internal/operators/OperatorSampleTest.java

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.*;
2223

2324
import org.junit.*;
2425
import org.mockito.InOrder;
2526

2627
import rx.*;
2728
import rx.Observable.OnSubscribe;
2829
import rx.functions.*;
30+
import rx.observers.TestSubscriber;
2931
import rx.schedulers.TestScheduler;
3032
import rx.subjects.PublishSubject;
33+
import rx.subscriptions.Subscriptions;
3134

3235
public class OperatorSampleTest {
3336
private TestScheduler scheduler;
@@ -312,4 +315,155 @@ public void call(Long t) {
312315

313316
Assert.assertEquals(Long.MAX_VALUE, requested[0]);
314317
}
318+
319+
@Test
320+
public void dontUnsubscribeChild1() {
321+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
322+
323+
PublishSubject<Integer> source = PublishSubject.create();
324+
325+
PublishSubject<Integer> sampler = PublishSubject.create();
326+
327+
source.sample(sampler).unsafeSubscribe(ts);
328+
329+
source.onCompleted();
330+
331+
Assert.assertFalse("Source has subscribers?", source.hasObservers());
332+
Assert.assertFalse("Sampler has subscribers?", sampler.hasObservers());
333+
334+
Assert.assertFalse("TS unsubscribed?", ts.isUnsubscribed());
335+
}
336+
337+
@Test
338+
public void dontUnsubscribeChild2() {
339+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
340+
341+
PublishSubject<Integer> source = PublishSubject.create();
342+
343+
PublishSubject<Integer> sampler = PublishSubject.create();
344+
345+
source.sample(sampler).unsafeSubscribe(ts);
346+
347+
sampler.onCompleted();
348+
349+
Assert.assertFalse("Source has subscribers?", source.hasObservers());
350+
Assert.assertFalse("Sampler has subscribers?", sampler.hasObservers());
351+
352+
Assert.assertFalse("TS unsubscribed?", ts.isUnsubscribed());
353+
}
354+
355+
@Test
356+
public void neverSetProducer() {
357+
Observable<Integer> neverBackpressure = Observable.create(new OnSubscribe<Integer>() {
358+
@Override
359+
public void call(Subscriber<? super Integer> t) {
360+
t.setProducer(new Producer() {
361+
@Override
362+
public void request(long n) {
363+
// irrelevant in this test
364+
}
365+
});
366+
}
367+
});
368+
369+
final AtomicInteger count = new AtomicInteger();
370+
371+
neverBackpressure.sample(neverBackpressure).unsafeSubscribe(new Subscriber<Integer>() {
372+
@Override
373+
public void onNext(Integer t) {
374+
// irrelevant
375+
}
376+
377+
@Override
378+
public void onError(Throwable e) {
379+
// irrelevant
380+
}
381+
382+
@Override
383+
public void onCompleted() {
384+
// irrelevant
385+
}
386+
387+
@Override
388+
public void setProducer(Producer p) {
389+
count.incrementAndGet();
390+
}
391+
});
392+
393+
Assert.assertEquals(0, count.get());
394+
}
395+
396+
@Test
397+
public void unsubscribeMainAfterCompleted() {
398+
final AtomicBoolean unsubscribed = new AtomicBoolean();
399+
400+
Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
401+
@Override
402+
public void call(Subscriber<? super Integer> t) {
403+
t.add(Subscriptions.create(new Action0() {
404+
@Override
405+
public void call() {
406+
unsubscribed.set(true);
407+
}
408+
}));
409+
}
410+
});
411+
412+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
413+
@Override
414+
public void onCompleted() {
415+
if (unsubscribed.get()) {
416+
onError(new IllegalStateException("Resource unsubscribed!"));
417+
} else {
418+
super.onCompleted();
419+
}
420+
}
421+
};
422+
423+
PublishSubject<Integer> sampler = PublishSubject.create();
424+
425+
source.sample(sampler).unsafeSubscribe(ts);
426+
427+
sampler.onCompleted();
428+
429+
ts.assertNoErrors();
430+
ts.assertCompleted();
431+
}
432+
433+
@Test
434+
public void unsubscribeSamplerAfterCompleted() {
435+
final AtomicBoolean unsubscribed = new AtomicBoolean();
436+
437+
Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
438+
@Override
439+
public void call(Subscriber<? super Integer> t) {
440+
t.add(Subscriptions.create(new Action0() {
441+
@Override
442+
public void call() {
443+
unsubscribed.set(true);
444+
}
445+
}));
446+
}
447+
});
448+
449+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
450+
@Override
451+
public void onCompleted() {
452+
if (unsubscribed.get()) {
453+
onError(new IllegalStateException("Resource unsubscribed!"));
454+
} else {
455+
super.onCompleted();
456+
}
457+
}
458+
};
459+
460+
PublishSubject<Integer> sampled = PublishSubject.create();
461+
462+
sampled.sample(source).unsafeSubscribe(ts);
463+
464+
sampled.onCompleted();
465+
466+
ts.assertNoErrors();
467+
ts.assertCompleted();
468+
}
315469
}

0 commit comments

Comments
 (0)