Skip to content

Commit c5a4902

Browse files
committed
Merge pull request #3653 from akarnokd/SampleUnboundedFix1x
1.x: fix sample(Observable) not requesting Long.MAX_VALUE
2 parents be493f1 + 995d3f1 commit c5a4902

File tree

2 files changed

+40
-9
lines changed

2 files changed

+40
-9
lines changed

src/main/java/rx/internal/operators/OperatorSampleWithObservable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void onCompleted() {
6969

7070
};
7171

72-
Subscriber<T> result = new Subscriber<T>(child) {
72+
Subscriber<T> result = new Subscriber<T>() {
7373
@Override
7474
public void onNext(T t) {
7575
value.set(t);
@@ -88,6 +88,8 @@ public void onCompleted() {
8888
}
8989
};
9090

91+
child.add(result);
92+
9193
sampler.unsafeSubscribe(samplerSub);
9294

9395
return result;

src/test/java/rx/internal/operators/OperatorSampleTest.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,16 @@
1616
package rx.internal.operators;
1717

1818
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.inOrder;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.never;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Mockito.*;
2420

2521
import java.util.concurrent.TimeUnit;
2622

27-
import org.junit.Before;
28-
import org.junit.Test;
23+
import org.junit.*;
2924
import org.mockito.InOrder;
3025

3126
import rx.*;
3227
import rx.Observable.OnSubscribe;
33-
import rx.functions.Action0;
28+
import rx.functions.*;
3429
import rx.schedulers.TestScheduler;
3530
import rx.subjects.PublishSubject;
3631

@@ -283,4 +278,38 @@ public void call(Subscriber<? super Integer> subscriber) {
283278
o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().unsubscribe();
284279
verify(s).unsubscribe();
285280
}
281+
282+
@Test
283+
public void testSampleOtherUnboundedIn() {
284+
285+
final long[] requested = { -1 };
286+
287+
PublishSubject.create()
288+
.doOnRequest(new Action1<Long>() {
289+
@Override
290+
public void call(Long t) {
291+
requested[0] = t;
292+
}
293+
})
294+
.sample(PublishSubject.create()).subscribe();
295+
296+
Assert.assertEquals(Long.MAX_VALUE, requested[0]);
297+
}
298+
299+
@Test
300+
public void testSampleTimedUnboundedIn() {
301+
302+
final long[] requested = { -1 };
303+
304+
PublishSubject.create()
305+
.doOnRequest(new Action1<Long>() {
306+
@Override
307+
public void call(Long t) {
308+
requested[0] = t;
309+
}
310+
})
311+
.sample(1, TimeUnit.SECONDS).subscribe().unsubscribe();
312+
313+
Assert.assertEquals(Long.MAX_VALUE, requested[0]);
314+
}
286315
}

0 commit comments

Comments
 (0)