diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 9efc488657b..56f540d623f 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -27,6 +27,8 @@ import io.perfmark.PerfMark; import io.perfmark.TaskCloseable; import java.io.InputStream; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; /** @@ -34,6 +36,8 @@ * application thread. */ public abstract class AbstractStream implements Stream { + private static final Logger log = Logger.getLogger(AbstractStream.class.getName()); + /** The framer to use for sending messages. */ protected abstract Framer framer(); @@ -371,6 +375,12 @@ private void notifyIfReady() { boolean doNotify; synchronized (onReadyLock) { doNotify = isReady(); + if (!doNotify && log.isLoggable(Level.FINEST)) { + log.log(Level.FINEST, + "Stream not ready so skip notifying listener.\n" + + "details: allocated/deallocated:{0}/{3}, sent queued: {1}, ready thresh: {2}", + new Object[] {allocated, numSentBytesQueued, onReadyThreshold, deallocated}); + } } if (doNotify) { listener().onReady(); diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 9c7d744816a..a16ff26e5d4 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -449,16 +449,6 @@ private void handleRpcStreamClosed(Status status) { stopwatch.reset(); } - // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks - // to avoid TSAN races, since tests may wait until callbacks are called but then would run - // concurrently with the stopwatch and schedule. - - long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); - - rpcRetryTimer = - syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); - Status newStatus = status; if (responseReceived) { // A closed ADS stream after a successful response is not considered an error. Servers may @@ -486,9 +476,17 @@ private void handleRpcStreamClosed(Status status) { newStatus.getCode(), newStatus.getDescription(), newStatus.getCause()); } - closed = true; + close(newStatus.asException()); + + // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks + // to avoid TSAN races, since tests may wait until callbacks are called but then would run + // concurrently with the stopwatch and schedule. + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); + rpcRetryTimer = + syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); + xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived); - cleanUp(); } private void close(Exception error) { diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 791ba3cc62d..4304d1d9e6f 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -444,6 +444,9 @@ private Set getResourceKeys(XdsResourceType xdsResourceType) { // cpcForThisStream is null when doing shutdown private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) { Collection authoritiesForCpc = getActiveAuthorities(cpcForThisStream); + String target = cpcForThisStream == null ? "null" : cpcForThisStream.getServerInfo().target(); + logger.log(XdsLogLevel.DEBUG, "Cleaning up resource timers for CPC {0}, authorities {1}", + target, authoritiesForCpc); for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { @@ -957,6 +960,8 @@ public void handleStreamClosed(Status status, boolean shouldTryFallback) { ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo); if (cpcClosed == null) { + logger.log(XdsLogLevel.DEBUG, + "Couldn't find closing CPC for {0}, so skipping cleanup and reporting", serverInfo); return; }