Skip to content

Operators: Throttle and Debounce #368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 179 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package rx;

import static rx.util.functions.Functions.not;
import static rx.util.functions.Functions.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,6 +63,8 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationDebounce;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1809,6 +1810,182 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
return create(OperationInterval.interval(interval, unit, scheduler));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #throttleWithTimeout};
*/
public Observable<T> debounce(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #throttleWithTimeout};
*/
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler has no effect here...

}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
* <p>
* Information on debounce vs throttle:
* <p>
* <ul>
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
* </ul>
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
* @see {@link #debounce}
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return create(OperationDebounce.debounce(this, timeout, unit));
}

/**
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
* <p>
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
* @see {@link #debounce}
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
}

/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
*
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler does not appear in parameter list of this method...

* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
}

/**
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
* <p>
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
*
* @param skipDuration
* Time to wait before sending another value after emitting last value.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
}

/**
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit)}
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
return sample(intervalDuration, unit);
}

/**
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
* <p>
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
*
* @param intervalDuration
* Duration of windows within with the last value will be chosen.
* @param unit
* The unit of time for the specified interval.
* @return Observable which performs the throttle operation.
* @see {@link #sample(long, TimeUnit, Scheduler)}
*/
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
return sample(intervalDuration, unit, scheduler);
}

/**
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
* <p>
Expand Down
27 changes: 22 additions & 5 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

public class TestScheduler extends Scheduler {
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());

private static class TimedAction<T> {

private final long time;
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
private final T state;
private final TestScheduler scheduler;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
this.time = time;
Expand All @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
this.scheduler = scheduler;
}

public void cancel() {
isCancelled.set(true);
}

@Override
public String toString() {
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
Expand Down Expand Up @@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
}
time = current.time;
queue.remove();
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);

// Only execute if the TimedAction has not yet been cancelled
if (!current.isCancelled.get()) {
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
}
}
time = targetTimeInNanos;
}
Expand All @@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
queue.add(timedAction);

return new Subscription() {
@Override
public void unsubscribe() {
timedAction.cancel();
}
};
}
}
Loading