From 08e1eb3c2979ecaa082cae4715bc10248a4b2865 Mon Sep 17 00:00:00 2001 From: Neil Date: Mon, 22 Sep 2025 21:48:39 +0100 Subject: [PATCH] feat: Add penalty to channel selection for fast server errors This change modifies the channel selection logic in BigtableChannelPool to add a penalty for channels that have recently experienced server errors that are likely to be transient. The new logic uses an "adjusted" outstanding RPC count, which includes a penalty for errors like UNKNOWN, UNIMPLEMENTED, INTERNAL, UNAVAILABLE, and DATA_LOSS. This will help to route traffic away from temporarily misbehaving servers and improve load balancing. --- .../gaxx/grpc/BigtableChannelPool.java | 78 ++++++++++++++----- 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java index 173722f2f4..860a893de3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java @@ -32,7 +32,10 @@ import io.grpc.Status; import java.io.IOException; import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; +import java.util.EnumSet; +import java.util.function.Supplier; import java.util.List; import java.util.Random; import java.util.concurrent.CancellationException; @@ -43,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -196,7 +198,7 @@ private int pickEntryIndexLeastInFlight() { for (int i = 0; i < localEntries.size(); i++) { Entry entry = localEntries.get(i); - int rpcs = entry.outstandingRpcs.get(); + int rpcs = entry.adjustedOutstandingRpcs(); if (rpcs < minRpcs) { minRpcs = rpcs; candidates.clear(); @@ -222,7 +224,9 @@ private int pickEntryIndexPowerOfTwoLeastInFlight() { Entry entry1 = localEntries.get(choice1); Entry entry2 = localEntries.get(choice2); - return entry1.outstandingRpcs.get() < entry2.outstandingRpcs.get() ? choice1 : choice2; + return entry1.adjustedOutstandingRpcs() < entry2.adjustedOutstandingRpcs() + ? choice1 + : choice2; } Channel getChannel(int index) { @@ -334,7 +338,7 @@ void resize() { List localEntries = entries.get(); // Estimate the peak of RPCs in the last interval by summing the peak of RPCs per channel int actualOutstandingRpcs = - localEntries.stream().mapToInt(Entry::getAndResetMaxOutstanding).sum(); + localEntries.stream().mapToInt(Entry::adjustedOutstandingRpcs).sum(); // Number of channels if each channel operated at max capacity int minChannels = @@ -527,7 +531,29 @@ static class Entry { */ @VisibleForTesting final AtomicInteger outstandingRpcs = new AtomicInteger(0); - private final AtomicInteger maxOutstanding = new AtomicInteger(); + /* + * Server errors that are likely to succeed on a different server and fail within a short period + * of time - these errors add a latency penalty to adjustedOutstandingRpcs. Note that + * DEADLINE_EXCEEDED is not included as it's typically caused by a slow request. + */ + EnumSet FAST_SERVER_ERRORS = EnumSet.of( + Status.Code.UNKNOWN, + Status.Code.UNIMPLEMENTED, + Status.Code.INTERNAL, + Status.Code.UNAVAILABLE, + Status.Code.DATA_LOSS); + private static final java.time.Duration FAST_SERVER_ERROR_PENALTY = + java.time.Duration.ofSeconds(1); + + /* + * Sometimes it's useful to pretend that failed requests take longer than they do. + * delayedOutstandingRequests keeps track of the simulated end time of these requests (in order). + */ + private final ConcurrentLinkedQueue delayedOutstandingRequests = + new ConcurrentLinkedQueue<>(); + + /* Equivalent to delayedOutstandingRequests.size(), but with constant access time. */ + private final AtomicInteger delayedOutstandingRequestsSize = new AtomicInteger(0); /** Queue storing the last 5 minutes of probe results */ @VisibleForTesting @@ -555,8 +581,26 @@ ManagedChannel getManagedChannel() { return this.channel; } - int getAndResetMaxOutstanding() { - return maxOutstanding.getAndSet(outstandingRpcs.get()); + private void drainAdjustedOutstandingRpcs() { + Instant now = Instant.now(); + Instant oldest = delayedOutstandingRequests.peek(); + + while (oldest != null && oldest.isBefore(now)) { + // poll() returns null if the queue became empty between peek() and poll() + if (delayedOutstandingRequests.poll() != null) { + delayedOutstandingRequestsSize.decrementAndGet(); + } + oldest = delayedOutstandingRequests.peek(); + } + } + + /* + * Number of RPCs that would be outstanding if requests with a {@link FAST_SERVER_ERRORS} had + * extra latency of {@link FAST_SERVER_ERROR_PENALTY}. This is useful for load balancing. + */ + int adjustedOutstandingRpcs() { + drainAdjustedOutstandingRpcs(); + return outstandingRpcs.get() + delayedOutstandingRequestsSize.get(); } /** @@ -567,17 +611,11 @@ int getAndResetMaxOutstanding() { */ private boolean retain() { // register desire to start RPC - int currentOutstanding = outstandingRpcs.incrementAndGet(); - - // Rough bookkeeping - int prevMax = maxOutstanding.get(); - if (currentOutstanding > prevMax) { - maxOutstanding.incrementAndGet(); - } + outstandingRpcs.incrementAndGet(); // abort if the channel is closing if (shutdownRequested.get()) { - release(); + release(Status.CANCELLED); return false; } return true; @@ -587,11 +625,15 @@ private boolean retain() { * Notify the channel that the number of outstanding RPCs has decreased. If shutdown has been * previously requested, this method will shutdown the channel if its the last outstanding RPC. */ - private void release() { + private void release(Status status) { int newCount = outstandingRpcs.decrementAndGet(); if (newCount < 0) { LOG.log(Level.WARNING, "Bug! Reference count is negative (" + newCount + ")!"); } + if (FAST_SERVER_ERRORS.contains(status.getCode())) { + delayedOutstandingRequests.add(Instant.now().plus(FAST_SERVER_ERROR_PENALTY)); + delayedOutstandingRequestsSize.incrementAndGet(); + } // Must check outstandingRpcs after shutdownRequested (in reverse order of retain()) to ensure // mutual exclusion. @@ -673,7 +715,7 @@ public void onClose(Status status, Metadata trailers) { super.onClose(status, trailers); } finally { if (wasReleased.compareAndSet(false, true)) { - entry.release(); + entry.release(status); } else { LOG.log( Level.WARNING, @@ -687,7 +729,7 @@ public void onClose(Status status, Metadata trailers) { } catch (Exception e) { // In case start failed, make sure to release if (wasReleased.compareAndSet(false, true)) { - entry.release(); + entry.release(Status.fromThrowable(e)); } else { LOG.log( Level.WARNING,