Skip to content

Commit 852e052

Browse files
authored
3.x: Fix takeLast(time) last events time window calculation. (#6648)
1 parent 9a36930 commit 852e052

File tree

4 files changed

+59
-7
lines changed

4 files changed

+59
-7
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimed.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ void drain() {
139139
final Observer<? super T> a = downstream;
140140
final SpscLinkedArrayQueue<Object> q = queue;
141141
final boolean delayError = this.delayError;
142+
final long timestampLimit = scheduler.now(unit) - time;
142143

143144
for (;;) {
144145
if (cancelled) {
@@ -171,7 +172,7 @@ void drain() {
171172
@SuppressWarnings("unchecked")
172173
T o = (T)q.poll();
173174

174-
if ((Long)ts < scheduler.now(unit) - time) {
175+
if ((Long)ts < timestampLimit) {
175176
continue;
176177
}
177178

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLastTimedTest.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.rxjava3.processors.PublishProcessor;
2929
import io.reactivex.rxjava3.schedulers.*;
3030
import io.reactivex.rxjava3.subscribers.TestSubscriber;
31-
import io.reactivex.rxjava3.testsupport.TestHelper;
31+
import io.reactivex.rxjava3.testsupport.*;
3232

3333
public class FlowableTakeLastTimedTest extends RxJavaTest {
3434

@@ -338,4 +338,27 @@ public Publisher<Object> apply(Flowable<Object> f) throws Exception {
338338
public void badRequest() {
339339
TestHelper.assertBadRequestReported(PublishProcessor.create().takeLast(1, TimeUnit.SECONDS));
340340
}
341+
342+
@Test
343+
public void lastWindowIsFixedInTime() {
344+
TimesteppingScheduler scheduler = new TimesteppingScheduler();
345+
scheduler.stepEnabled = false;
346+
347+
PublishProcessor<Integer> pp = PublishProcessor.create();
348+
349+
TestSubscriber<Integer> ts = pp
350+
.takeLast(2, TimeUnit.SECONDS, scheduler)
351+
.test();
352+
353+
pp.onNext(1);
354+
pp.onNext(2);
355+
pp.onNext(3);
356+
pp.onNext(4);
357+
358+
scheduler.stepEnabled = true;
359+
360+
pp.onComplete();
361+
362+
ts.assertResult(1, 2, 3, 4);
363+
}
341364
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimedTest.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.rxjava3.observers.TestObserver;
2727
import io.reactivex.rxjava3.schedulers.*;
2828
import io.reactivex.rxjava3.subjects.PublishSubject;
29-
import io.reactivex.rxjava3.testsupport.TestHelper;
29+
import io.reactivex.rxjava3.testsupport.*;
3030

3131
public class ObservableTakeLastTimedTest extends RxJavaTest {
3232

@@ -277,4 +277,27 @@ public void run() {
277277
TestHelper.race(r1, r2);
278278
}
279279
}
280+
281+
@Test
282+
public void lastWindowIsFixedInTime() {
283+
TimesteppingScheduler scheduler = new TimesteppingScheduler();
284+
scheduler.stepEnabled = false;
285+
286+
PublishSubject<Integer> ps = PublishSubject.create();
287+
288+
TestObserver<Integer> to = ps
289+
.takeLast(2, TimeUnit.SECONDS, scheduler)
290+
.test();
291+
292+
ps.onNext(1);
293+
ps.onNext(2);
294+
ps.onNext(3);
295+
ps.onNext(4);
296+
297+
scheduler.stepEnabled = true;
298+
299+
ps.onComplete();
300+
301+
to.assertResult(1, 2, 3, 4);
302+
}
280303
}

src/test/java/io/reactivex/rxjava3/testsupport/TimesteppingScheduler.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* Basic scheduler that produces an ever increasing {@link #now(TimeUnit)} value.
2323
* Use this scheduler only as a time source!
2424
*/
25-
public class TimesteppingScheduler extends Scheduler {
25+
public final class TimesteppingScheduler extends Scheduler {
2626

2727
final class TimesteppingWorker extends Worker {
2828
@Override
@@ -42,11 +42,13 @@ public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
4242

4343
@Override
4444
public long now(TimeUnit unit) {
45-
return time++;
45+
return TimesteppingScheduler.this.now(unit);
4646
}
4747
}
4848

49-
long time;
49+
public long time;
50+
51+
public boolean stepEnabled = true;
5052

5153
@Override
5254
public Worker createWorker() {
@@ -55,6 +57,9 @@ public Worker createWorker() {
5557

5658
@Override
5759
public long now(TimeUnit unit) {
58-
return time++;
60+
if (stepEnabled) {
61+
return time++;
62+
}
63+
return time;
5964
}
6065
}

0 commit comments

Comments
 (0)