Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import io.grpc.Status;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.function.Supplier;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CancellationException;
Expand All @@ -43,7 +46,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -196,7 +198,7 @@ private int pickEntryIndexLeastInFlight() {

for (int i = 0; i < localEntries.size(); i++) {
Entry entry = localEntries.get(i);
int rpcs = entry.outstandingRpcs.get();
int rpcs = entry.adjustedOutstandingRpcs();
if (rpcs < minRpcs) {
minRpcs = rpcs;
candidates.clear();
Expand All @@ -222,7 +224,9 @@ private int pickEntryIndexPowerOfTwoLeastInFlight() {

Entry entry1 = localEntries.get(choice1);
Entry entry2 = localEntries.get(choice2);
return entry1.outstandingRpcs.get() < entry2.outstandingRpcs.get() ? choice1 : choice2;
return entry1.adjustedOutstandingRpcs() < entry2.adjustedOutstandingRpcs()
? choice1
: choice2;
}

Channel getChannel(int index) {
Expand Down Expand Up @@ -334,7 +338,7 @@ void resize() {
List<Entry> localEntries = entries.get();
// Estimate the peak of RPCs in the last interval by summing the peak of RPCs per channel
int actualOutstandingRpcs =
localEntries.stream().mapToInt(Entry::getAndResetMaxOutstanding).sum();
localEntries.stream().mapToInt(Entry::adjustedOutstandingRpcs).sum();

// Number of channels if each channel operated at max capacity
int minChannels =
Expand Down Expand Up @@ -527,7 +531,29 @@ static class Entry {
*/
@VisibleForTesting final AtomicInteger outstandingRpcs = new AtomicInteger(0);

private final AtomicInteger maxOutstanding = new AtomicInteger();
/*
* Server errors that are likely to succeed on a different server and fail within a short period
* of time - these errors add a latency penalty to adjustedOutstandingRpcs. Note that
* DEADLINE_EXCEEDED is not included as it's typically caused by a slow request.
*/
EnumSet<Status.Code> FAST_SERVER_ERRORS = EnumSet.of(
Status.Code.UNKNOWN,
Status.Code.UNIMPLEMENTED,
Status.Code.INTERNAL,
Status.Code.UNAVAILABLE,
Status.Code.DATA_LOSS);
private static final java.time.Duration FAST_SERVER_ERROR_PENALTY =
java.time.Duration.ofSeconds(1);

/*
* Sometimes it's useful to pretend that failed requests take longer than they do.
* delayedOutstandingRequests keeps track of the simulated end time of these requests (in order).
*/
private final ConcurrentLinkedQueue<Instant> delayedOutstandingRequests =
new ConcurrentLinkedQueue<>();

/* Equivalent to delayedOutstandingRequests.size(), but with constant access time. */
private final AtomicInteger delayedOutstandingRequestsSize = new AtomicInteger(0);

/** Queue storing the last 5 minutes of probe results */
@VisibleForTesting
Expand Down Expand Up @@ -555,8 +581,26 @@ ManagedChannel getManagedChannel() {
return this.channel;
}

int getAndResetMaxOutstanding() {
return maxOutstanding.getAndSet(outstandingRpcs.get());
private void drainAdjustedOutstandingRpcs() {
Instant now = Instant.now();
Instant oldest = delayedOutstandingRequests.peek();

while (oldest != null && oldest.isBefore(now)) {
// poll() returns null if the queue became empty between peek() and poll()
if (delayedOutstandingRequests.poll() != null) {
delayedOutstandingRequestsSize.decrementAndGet();
}
oldest = delayedOutstandingRequests.peek();
}
}

/*
* Number of RPCs that would be outstanding if requests with a {@link FAST_SERVER_ERRORS} had
* extra latency of {@link FAST_SERVER_ERROR_PENALTY}. This is useful for load balancing.
*/
int adjustedOutstandingRpcs() {
drainAdjustedOutstandingRpcs();
return outstandingRpcs.get() + delayedOutstandingRequestsSize.get();
}

/**
Expand All @@ -567,17 +611,11 @@ int getAndResetMaxOutstanding() {
*/
private boolean retain() {
// register desire to start RPC
int currentOutstanding = outstandingRpcs.incrementAndGet();

// Rough bookkeeping
int prevMax = maxOutstanding.get();
if (currentOutstanding > prevMax) {
maxOutstanding.incrementAndGet();
}
outstandingRpcs.incrementAndGet();

// abort if the channel is closing
if (shutdownRequested.get()) {
release();
release(Status.CANCELLED);
return false;
}
return true;
Expand All @@ -587,11 +625,15 @@ private boolean retain() {
* Notify the channel that the number of outstanding RPCs has decreased. If shutdown has been
* previously requested, this method will shutdown the channel if its the last outstanding RPC.
*/
private void release() {
private void release(Status status) {
int newCount = outstandingRpcs.decrementAndGet();
if (newCount < 0) {
LOG.log(Level.WARNING, "Bug! Reference count is negative (" + newCount + ")!");
}
if (FAST_SERVER_ERRORS.contains(status.getCode())) {
delayedOutstandingRequests.add(Instant.now().plus(FAST_SERVER_ERROR_PENALTY));
delayedOutstandingRequestsSize.incrementAndGet();
}

// Must check outstandingRpcs after shutdownRequested (in reverse order of retain()) to ensure
// mutual exclusion.
Expand Down Expand Up @@ -673,7 +715,7 @@ public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);
} finally {
if (wasReleased.compareAndSet(false, true)) {
entry.release();
entry.release(status);
} else {
LOG.log(
Level.WARNING,
Expand All @@ -687,7 +729,7 @@ public void onClose(Status status, Metadata trailers) {
} catch (Exception e) {
// In case start failed, make sure to release
if (wasReleased.compareAndSet(false, true)) {
entry.release();
entry.release(Status.fromThrowable(e));
} else {
LOG.log(
Level.WARNING,
Expand Down
Loading