Skip to content

1.x: OnBackpressureBuffer: DROP_LATEST and DROP_OLDEST #3487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/main/java/rx/BackpressureOverflow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import rx.annotations.Experimental;
import rx.exceptions.MissingBackpressureException;

/**
* Generic strategy and default implementations to deal with backpressure buffer overflows.
*/
@Experimental
public final class BackpressureOverflow {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Experimental


public interface Strategy {

/**
* Whether the Backpressure manager should attempt to drop the oldest item, or simply
* drop the item currently causing backpressure.
*
* @return true to request drop of the oldest item, false to drop the newest.
* @throws MissingBackpressureException
*/
boolean mayAttemptDrop() throws MissingBackpressureException;
}

public static final BackpressureOverflow.Strategy ON_OVERFLOW_DEFAULT = Error.INSTANCE;
@SuppressWarnings("unused")
public static final BackpressureOverflow.Strategy ON_OVERFLOW_ERROR = Error.INSTANCE;
@SuppressWarnings("unused")
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_OLDEST = DropOldest.INSTANCE;
@SuppressWarnings("unused")
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_LATEST = DropLatest.INSTANCE;

/**
* Drop oldest items from the buffer making room for newer ones.
*/
static class DropOldest implements BackpressureOverflow.Strategy {
static final DropOldest INSTANCE = new DropOldest();

private DropOldest() {}

@Override
public boolean mayAttemptDrop() {
return true;
}
}

/**
* Drop most recent items, but not {@code onError} nor unsubscribe from source
* (as {code OperatorOnBackpressureDrop}).
*/
static class DropLatest implements BackpressureOverflow.Strategy {
static final DropLatest INSTANCE = new DropLatest();

private DropLatest() {}

@Override
public boolean mayAttemptDrop() {
return false;
}
}

/**
* {@code onError} a MissingBackpressureException and unsubscribe from source.
*/
static class Error implements BackpressureOverflow.Strategy {

static final Error INSTANCE = new Error();

private Error() {}

@Override
public boolean mayAttemptDrop() throws MissingBackpressureException {
throw new MissingBackpressureException("Overflowed buffer");
}
}
}
42 changes: 40 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6399,7 +6399,8 @@ public final Observable<T> onBackpressureBuffer() {
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity
* @param capacity number of slots available in the buffer.
* @return the source {@code Observable} modified to buffer items up to the given capacity.
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 1.1.0
*/
Expand All @@ -6419,14 +6420,51 @@ public final Observable<T> onBackpressureBuffer(long capacity) {
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @return the source {@code Observable} modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 1.1.0
*/
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer up to
* a given amount of items until they can be emitted. The resulting Observable will behave as determined
* by {@code overflowStrategy} if the buffer capacity is exceeded.
*
* <ul>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
* unsubscribing from the source, and notifying the producer with {@code onOverflow}. </li>
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow}
* to signal the overflow to the producer.</li>j
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
* room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke
* {@code onOverflow} to signal the overflow to the producer.</li>
* </ul>
*
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, where are @params?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copypasted from the old ones (which don't have it either). Adding them.

* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @param overflowStrategy how should the {@code Observable} react to buffer overflows. Null is not allowed.
* @return the source {@code Observable} modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow, BackpressureOverflow.Strategy overflowStrategy) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow, overflowStrategy));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing that some RxJava users like to pass null :trollface: in places where RxJava developers do not expect that I'd add null checks for onOverflow and overflowStrategy or at least mention it in javadoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import rx.BackpressureOverflow;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
Expand All @@ -27,15 +28,18 @@
import rx.functions.Action0;
import rx.internal.util.BackpressureDrainManager;

import static rx.BackpressureOverflow.*;

public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {

private final Long capacity;
private final Action0 onOverflow;
private final BackpressureOverflow.Strategy overflowStrategy;

private static class Holder {
static final OperatorOnBackpressureBuffer<?> INSTANCE = new OperatorOnBackpressureBuffer<Object>();
}

@SuppressWarnings("unchecked")
public static <T> OperatorOnBackpressureBuffer<T> instance() {
return (OperatorOnBackpressureBuffer<T>) Holder.INSTANCE;
Expand All @@ -44,33 +48,65 @@ public static <T> OperatorOnBackpressureBuffer<T> instance() {
OperatorOnBackpressureBuffer() {
this.capacity = null;
this.onOverflow = null;
this.overflowStrategy = ON_OVERFLOW_DEFAULT;
}

/**
* Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
* following behavior config:
*
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
*/
public OperatorOnBackpressureBuffer(long capacity) {
this(capacity, null);
this(capacity, null, ON_OVERFLOW_DEFAULT);
}

/**
* Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
* following behavior config:
*
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
* @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
*/
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
this(capacity, onOverflow, ON_OVERFLOW_DEFAULT);
}

/**
* Construct a new instance feeding the following behavior config:
*
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
* @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
* @param overflowStrategy the {@code BackpressureOverflow.Strategy} to handle overflows, it must not be null.
*/
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow,
BackpressureOverflow.Strategy overflowStrategy) {
if (capacity <= 0) {
throw new IllegalArgumentException("Buffer capacity must be > 0");
}
if (overflowStrategy == null) {
throw new NullPointerException("The BackpressureOverflow strategy must not be null");
}
this.capacity = capacity;
this.onOverflow = onOverflow;
this.overflowStrategy = overflowStrategy;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {

// don't pass through subscriber as we are async and doing queue draining
// a parent being unsubscribed should not affect the children
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow);
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow,
overflowStrategy);

// if child unsubscribes it should unsubscribe the parent, but not the other way around
child.add(parent);
child.setProducer(parent.manager());

return parent;
}

private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferSubscriber is private here but the public BackpressureOverflowStrategy exposes it, which makes it not extendable outside this package, which is not better than having a BackpressureOverflowStrategy enum in the first place. Plus, leaving BufferSubscriber private generates accessibility bridges.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I replaced with the parent interface implemented by BufferSubscriber, which is public.

// TODO get a different queue implementation
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
Expand All @@ -81,14 +117,18 @@ private static final class BufferSubscriber<T> extends Subscriber<T> implements
private final BackpressureDrainManager manager;
private final NotificationLite<T> on = NotificationLite.instance();
private final Action0 onOverflow;
private final BackpressureOverflow.Strategy overflowStrategy;

public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow) {
public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow,
BackpressureOverflow.Strategy overflowStrategy) {
this.child = child;
this.baseCapacity = capacity;
this.capacity = capacity != null ? new AtomicLong(capacity) : null;
this.onOverflow = onOverflow;
this.manager = new BackpressureDrainManager(this);
this.overflowStrategy = overflowStrategy;
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
Expand Down Expand Up @@ -141,7 +181,7 @@ public Object poll() {
}
return value;
}

private boolean assertCapacity() {
if (capacity == null) {
return true;
Expand All @@ -151,24 +191,30 @@ private boolean assertCapacity() {
do {
currCapacity = capacity.get();
if (currCapacity <= 0) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(new MissingBackpressureException(
"Overflowed buffer of "
+ baseCapacity));
if (onOverflow != null) {
try {
onOverflow.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
manager.terminateAndDrain(e);
// this line not strictly necessary but nice for clarity
// and in case of future changes to code after this catch block
return false;
}
boolean hasCapacity = false;
try {
// ok if we're allowed to drop, and there is indeed an item to discard
hasCapacity = overflowStrategy.mayAttemptDrop() && poll() != null;
} catch (MissingBackpressureException e) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(e);
}
}
return false;
if (onOverflow != null) {
try {
onOverflow.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
manager.terminateAndDrain(e);
// this line not strictly necessary but nice for clarity
// and in case of future changes to code after this catch block
return false;
}
}
if (!hasCapacity) {
return false;
}
}
// ensure no other thread stole our slot, or retry
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
Expand Down
Loading