Skip to content

Commit 71330c0

Browse files
authored
2.x: add sample() overload that can emit the very last buffered item (#4955)
1 parent 479f89f commit 71330c0

File tree

8 files changed

+801
-38
lines changed

8 files changed

+801
-38
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11530,6 +11530,41 @@ public final Flowable<T> sample(long period, TimeUnit unit) {
1153011530
return sample(period, unit, Schedulers.computation());
1153111531
}
1153211532

11533+
/**
11534+
* Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher
11535+
* within periodic time intervals and optionally emit the very last upstream item when the upstream completes.
11536+
* <p>
11537+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png" alt="">
11538+
* <dl>
11539+
* <dt><b>Backpressure:</b></dt>
11540+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
11541+
* <dt><b>Scheduler:</b></dt>
11542+
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
11543+
* </dl>
11544+
*
11545+
* @param period
11546+
* the sampling rate
11547+
* @param unit
11548+
* the {@link TimeUnit} in which {@code period} is defined
11549+
* @param emitLast
11550+
* if true and the upstream completes while there is still an unsampled item available,
11551+
* that item is emitted to downstream before completion
11552+
* if false, an unsampled last item is ignored.
11553+
* @return a Flowable that emits the results of sampling the items emitted by the source Publisher at
11554+
* the specified time interval
11555+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
11556+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
11557+
* @see #throttleLast(long, TimeUnit)
11558+
* @since 2.0.5 - experimental
11559+
*/
11560+
@CheckReturnValue
11561+
@BackpressureSupport(BackpressureKind.ERROR)
11562+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
11563+
@Experimental
11564+
public final Flowable<T> sample(long period, TimeUnit unit, boolean emitLast) {
11565+
return sample(period, unit, Schedulers.computation(), emitLast);
11566+
}
11567+
1153311568
/**
1153411569
* Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher
1153511570
* within periodic time intervals, where the intervals are defined on a particular Scheduler.
@@ -11560,7 +11595,47 @@ public final Flowable<T> sample(long period, TimeUnit unit) {
1156011595
public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
1156111596
ObjectHelper.requireNonNull(unit, "unit is null");
1156211597
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
11563-
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<T>(this, period, unit, scheduler));
11598+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<T>(this, period, unit, scheduler, false));
11599+
}
11600+
11601+
/**
11602+
* Returns a Flowable that emits the most recently emitted item (if any) emitted by the source Publisher
11603+
* within periodic time intervals, where the intervals are defined on a particular Scheduler
11604+
* and optionally emit the very last upstream item when the upstream completes.
11605+
* <p>
11606+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.png" alt="">
11607+
* <dl>
11608+
* <dt><b>Backpressure:</b></dt>
11609+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
11610+
* <dt><b>Scheduler:</b></dt>
11611+
* <dd>You specify which {@link Scheduler} this operator will use</dd>
11612+
* </dl>
11613+
*
11614+
* @param period
11615+
* the sampling rate
11616+
* @param unit
11617+
* the {@link TimeUnit} in which {@code period} is defined
11618+
* @param scheduler
11619+
* the {@link Scheduler} to use when sampling
11620+
* @param emitLast
11621+
* if true and the upstream completes while there is still an unsampled item available,
11622+
* that item is emitted to downstream before completion
11623+
* if false, an unsampled last item is ignored.
11624+
* @return a Flowable that emits the results of sampling the items emitted by the source Publisher at
11625+
* the specified time interval
11626+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
11627+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
11628+
* @see #throttleLast(long, TimeUnit, Scheduler)
11629+
* @since 2.0.5 - experimental
11630+
*/
11631+
@CheckReturnValue
11632+
@BackpressureSupport(BackpressureKind.ERROR)
11633+
@SchedulerSupport(SchedulerSupport.CUSTOM)
11634+
@Experimental
11635+
public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
11636+
ObjectHelper.requireNonNull(unit, "unit is null");
11637+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
11638+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<T>(this, period, unit, scheduler, emitLast));
1156411639
}
1156511640

1156611641
/**
@@ -11590,7 +11665,44 @@ public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler)
1159011665
@SchedulerSupport(SchedulerSupport.NONE)
1159111666
public final <U> Flowable<T> sample(Publisher<U> sampler) {
1159211667
ObjectHelper.requireNonNull(sampler, "sampler is null");
11593-
return RxJavaPlugins.onAssembly(new FlowableSamplePublisher<T>(this, sampler));
11668+
return RxJavaPlugins.onAssembly(new FlowableSamplePublisher<T>(this, sampler, false));
11669+
}
11670+
11671+
/**
11672+
* Returns a Flowable that, when the specified {@code sampler} Publisher emits an item or completes,
11673+
* emits the most recently emitted item (if any) emitted by the source Publisher since the previous
11674+
* emission from the {@code sampler} Publisher
11675+
* and optionally emit the very last upstream item when the upstream or other Publisher complete.
11676+
* <p>
11677+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png" alt="">
11678+
* <dl>
11679+
* <dt><b>Backpressure:</b></dt>
11680+
* <dd>This operator does not support backpressure as it uses the emissions of the {@code sampler}
11681+
* Publisher to control data flow.</dd>
11682+
* <dt><b>Scheduler:</b></dt>
11683+
* <dd>This version of {@code sample} does not operate by default on a particular {@link Scheduler}.</dd>
11684+
* </dl>
11685+
*
11686+
* @param <U> the element type of the sampler Publisher
11687+
* @param sampler
11688+
* the Publisher to use for sampling the source Publisher
11689+
* @param emitLast
11690+
* if true and the upstream completes while there is still an unsampled item available,
11691+
* that item is emitted to downstream before completion
11692+
* if false, an unsampled last item is ignored.
11693+
* @return a Flowable that emits the results of sampling the items emitted by this Publisher whenever
11694+
* the {@code sampler} Publisher emits an item or completes
11695+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
11696+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
11697+
* @since 2.0.5 - experimental
11698+
*/
11699+
@CheckReturnValue
11700+
@BackpressureSupport(BackpressureKind.ERROR)
11701+
@SchedulerSupport(SchedulerSupport.NONE)
11702+
@Experimental
11703+
public final <U> Flowable<T> sample(Publisher<U> sampler, boolean emitLast) {
11704+
ObjectHelper.requireNonNull(sampler, "sampler is null");
11705+
return RxJavaPlugins.onAssembly(new FlowableSamplePublisher<T>(this, sampler, emitLast));
1159411706
}
1159511707

1159611708
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9610,6 +9610,37 @@ public final Observable<T> sample(long period, TimeUnit unit) {
96109610
return sample(period, unit, Schedulers.computation());
96119611
}
96129612

9613+
/**
9614+
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource
9615+
* within periodic time intervals and optionally emit the very last upstream item when the upstream completes.
9616+
* <p>
9617+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png" alt="">
9618+
* <dl>
9619+
* <dt><b>Scheduler:</b></dt>
9620+
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
9621+
* </dl>
9622+
*
9623+
* @param period
9624+
* the sampling rate
9625+
* @param unit
9626+
* the {@link TimeUnit} in which {@code period} is defined
9627+
* @return an Observable that emits the results of sampling the items emitted by the source ObservableSource at
9628+
* the specified time interval
9629+
* @param emitLast
9630+
* if true and the upstream completes while there is still an unsampled item available,
9631+
* that item is emitted to downstream before completion
9632+
* if false, an unsampled last item is ignored.
9633+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
9634+
* @see #throttleLast(long, TimeUnit)
9635+
* @since 2.0.5 - experimental
9636+
*/
9637+
@CheckReturnValue
9638+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
9639+
@Experimental
9640+
public final Observable<T> sample(long period, TimeUnit unit, boolean emitLast) {
9641+
return sample(period, unit, Schedulers.computation(), emitLast);
9642+
}
9643+
96139644
/**
96149645
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource
96159646
* within periodic time intervals, where the intervals are defined on a particular Scheduler.
@@ -9636,7 +9667,43 @@ public final Observable<T> sample(long period, TimeUnit unit) {
96369667
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
96379668
ObjectHelper.requireNonNull(unit, "unit is null");
96389669
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
9639-
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<T>(this, period, unit, scheduler));
9670+
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<T>(this, period, unit, scheduler, false));
9671+
}
9672+
9673+
/**
9674+
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source ObservableSource
9675+
* within periodic time intervals, where the intervals are defined on a particular Scheduler
9676+
* and optionally emit the very last upstream item when the upstream completes.
9677+
* <p>
9678+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.png" alt="">
9679+
* <dl>
9680+
* <dt><b>Scheduler:</b></dt>
9681+
* <dd>You specify which {@link Scheduler} this operator will use</dd>
9682+
* </dl>
9683+
*
9684+
* @param period
9685+
* the sampling rate
9686+
* @param unit
9687+
* the {@link TimeUnit} in which {@code period} is defined
9688+
* @param scheduler
9689+
* the {@link Scheduler} to use when sampling
9690+
* @param emitLast
9691+
* if true and the upstream completes while there is still an unsampled item available,
9692+
* that item is emitted to downstream before completion
9693+
* if false, an unsampled last item is ignored.
9694+
* @return an Observable that emits the results of sampling the items emitted by the source ObservableSource at
9695+
* the specified time interval
9696+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
9697+
* @see #throttleLast(long, TimeUnit, Scheduler)
9698+
* @since 2.0.5 - experimental
9699+
*/
9700+
@CheckReturnValue
9701+
@SchedulerSupport(SchedulerSupport.CUSTOM)
9702+
@Experimental
9703+
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
9704+
ObjectHelper.requireNonNull(unit, "unit is null");
9705+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
9706+
return RxJavaPlugins.onAssembly(new ObservableSampleTimed<T>(this, period, unit, scheduler, emitLast));
96409707
}
96419708

96429709
/**
@@ -9662,7 +9729,40 @@ public final Observable<T> sample(long period, TimeUnit unit, Scheduler schedule
96629729
@SchedulerSupport(SchedulerSupport.NONE)
96639730
public final <U> Observable<T> sample(ObservableSource<U> sampler) {
96649731
ObjectHelper.requireNonNull(sampler, "sampler is null");
9665-
return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable<T>(this, sampler));
9732+
return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable<T>(this, sampler, false));
9733+
}
9734+
9735+
/**
9736+
* Returns an Observable that, when the specified {@code sampler} ObservableSource emits an item or completes,
9737+
* emits the most recently emitted item (if any) emitted by the source ObservableSource since the previous
9738+
* emission from the {@code sampler} ObservableSource
9739+
* and optionally emit the very last upstream item when the upstream or other ObservableSource complete.
9740+
* <p>
9741+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png" alt="">
9742+
* <dl>
9743+
* ObservableSource to control data flow.</dd>
9744+
* <dt><b>Scheduler:</b></dt>
9745+
* <dd>This version of {@code sample} does not operate by default on a particular {@link Scheduler}.</dd>
9746+
* </dl>
9747+
*
9748+
* @param <U> the element type of the sampler ObservableSource
9749+
* @param sampler
9750+
* the ObservableSource to use for sampling the source ObservableSource
9751+
* @param emitLast
9752+
* if true and the upstream completes while there is still an unsampled item available,
9753+
* that item is emitted to downstream before completion
9754+
* if false, an unsampled last item is ignored.
9755+
* @return an Observable that emits the results of sampling the items emitted by this ObservableSource whenever
9756+
* the {@code sampler} ObservableSource emits an item or completes
9757+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
9758+
* @since 2.0.5 - experimental
9759+
*/
9760+
@CheckReturnValue
9761+
@SchedulerSupport(SchedulerSupport.NONE)
9762+
@Experimental
9763+
public final <U> Observable<T> sample(ObservableSource<U> sampler, boolean emitLast) {
9764+
ObjectHelper.requireNonNull(sampler, "sampler is null");
9765+
return RxJavaPlugins.onAssembly(new ObservableSampleWithObservable<T>(this, sampler, emitLast));
96669766
}
96679767

96689768
/**

0 commit comments

Comments
 (0)