From 8e94d554a057747eddc2c771061695b393efada2 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 16 Jun 2025 14:33:31 +0100 Subject: [PATCH 01/15] Added logging specification tests JAVA-4770 --- .../internal/connection/BaseCluster.java | 12 +- .../internal/connection/DefaultServer.java | 3 +- .../connection/DefaultServerMonitor.java | 117 +++++++++++++++++- .../connection/LoadBalancedCluster.java | 4 +- .../connection/SingleServerCluster.java | 2 +- .../mongodb/internal/logging/LogMessage.java | 1 + ...ifiedServerDiscoveryAndMonitoringTest.java | 1 - .../client/unified/ContextElement.java | 2 +- .../mongodb/client/unified/LogMatcher.java | 24 +++- .../mongodb/client/unified/UnifiedTest.java | 3 +- .../unified/UnifiedTestModifications.java | 5 - 11 files changed, 148 insertions(+), 26 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 8cdc9951293..9e640e8dcba 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -118,9 +118,9 @@ abstract class BaseCluster implements Cluster { this.clusterListener = singleClusterListener(settings); ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId); clusterListener.clusterOpening(clusterOpeningEvent); - logTopologyOpening(clusterId, clusterOpeningEvent); description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()); + logTopologyMonitoringStarting(clusterId); } @Override @@ -221,7 +221,7 @@ public void close() { description); ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId); clusterListener.clusterClosed(clusterClosedEvent); - logTopologyClosedEvent(clusterId, clusterClosedEvent); + logTopologyMonitoringStopping(clusterId); stopWaitQueueHandler(); } } @@ -632,9 +632,7 @@ static void logServerSelectionSucceeded( } } - static void logTopologyOpening( - final ClusterId clusterId, - final ClusterOpeningEvent clusterOpeningEvent) { + static void logTopologyMonitoringStarting(final ClusterId clusterId) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( TOPOLOGY, DEBUG, "Starting topology monitoring", clusterId, @@ -659,9 +657,7 @@ static void logTopologyDescriptionChanged( } } - static void logTopologyClosedEvent( - final ClusterId clusterId, - final ClusterClosedEvent clusterClosedEvent) { + static void logTopologyMonitoringStopping(final ClusterId clusterId) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( TOPOLOGY, DEBUG, "Stopped topology monitoring", clusterId, diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java index b8de03e93ea..b4aa3af1e06 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java @@ -31,6 +31,7 @@ import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.internal.logging.StructuredLogger; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -48,6 +49,7 @@ class DefaultServer implements ClusterableServer { private static final Logger LOGGER = Loggers.getLogger("connection"); + private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("connection"); private final ServerId serverId; private final ConnectionPool connectionPool; private final ClusterConnectionMode clusterConnectionMode; @@ -75,7 +77,6 @@ class DefaultServer implements ClusterableServer { this.connectionPool = notNull("connectionPool", connectionPool); this.serverId = serverId; - serverListener.serverOpening(new ServerOpeningEvent(this.serverId)); this.serverMonitor = serverMonitor; diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index cd6dcd769dd..2bfcc67279f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -22,6 +22,7 @@ import com.mongodb.ServerApi; import com.mongodb.annotations.ThreadSafe; import com.mongodb.connection.ClusterConnectionMode; +import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerId; import com.mongodb.connection.ServerSettings; @@ -34,6 +35,8 @@ import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.inject.Provider; +import com.mongodb.internal.logging.LogMessage; +import com.mongodb.internal.logging.StructuredLogger; import com.mongodb.internal.validator.NoOpFieldNameValidator; import com.mongodb.lang.Nullable; import org.bson.BsonBoolean; @@ -63,7 +66,19 @@ import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription; import static com.mongodb.internal.connection.ServerDescriptionHelper.unknownConnectingServerDescription; import static com.mongodb.internal.event.EventListenerHelper.singleServerMonitorListener; +import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.AWAITED; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.DRIVER_CONNECTION_ID; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.DURATION_MS; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.REPLY; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_CONNECTION_ID; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID; +import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; import static java.lang.String.format; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -71,6 +86,7 @@ class DefaultServerMonitor implements ServerMonitor { private static final Logger LOGGER = Loggers.getLogger("cluster"); + private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); private final ServerId serverId; private final ServerMonitorListener serverMonitorListener; @@ -116,6 +132,7 @@ class DefaultServerMonitor implements ServerMonitor { @Override public void start() { + logStartedServerMonitoring(serverId); monitor.start(); } @@ -137,6 +154,9 @@ public void connect() { @SuppressWarnings("try") public void close() { withLock(lock, () -> { + if (!isClosed) { + logStoppedServerMonitoring(serverId); + } isClosed = true; //noinspection EmptyTryBlock try (ServerMonitor ignoredAutoClosed = monitor; @@ -226,6 +246,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren alreadyLoggedHeartBeatStarted = true; currentCheckCancelled = false; InternalConnection newConnection = internalConnectionFactory.create(serverId); + logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses); serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( newConnection.getDescription().getConnectionId(), shouldStreamResponses)); newConnection.open(operationContextFactory.create()); @@ -238,6 +259,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } if (!alreadyLoggedHeartBeatStarted) { + logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( connection.getDescription().getConnectionId(), shouldStreamResponses)); } @@ -269,6 +291,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren if (!shouldStreamResponses) { roundTripTimeSampler.addSample(elapsedTimeNanos); } + logHeartbeatSucceeded(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, helloResult); serverMonitorListener.serverHeartbeatSucceeded( new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, elapsedTimeNanos, shouldStreamResponses)); @@ -276,8 +299,10 @@ private ServerDescription lookupServerDescription(final ServerDescription curren return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), roundTripTimeSampler.getMin()); } catch (Exception e) { + long elapsedTimeNanos = System.nanoTime() - start; + logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); serverMonitorListener.serverHeartbeatFailed( - new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start, + new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, shouldStreamResponses, e)); throw e; } @@ -515,4 +540,94 @@ private void waitForNext() throws InterruptedException { private String getHandshakeCommandName(final ServerDescription serverDescription) { return serverDescription.isHelloOk() ? HELLO : LEGACY_HELLO; } + + private static void logHeartbeatStarted( + final ServerId serverId, + final ConnectionDescription connectionDescription, + final boolean awaited) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Server heartbeat started", serverId.getClusterId(), + asList( + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), + new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), + new LogMessage.Entry(AWAITED, awaited)), + "Heartbeat started for {}:{} on connection with driver-generated ID {} and server-generated ID {} " + + "in topology with ID {}. Awaited: {}")); + } + } + + private static void logHeartbeatSucceeded( + final ServerId serverId, + final ConnectionDescription connectionDescription, + final boolean awaited, + final long elapsedTimeNanos, + final BsonDocument reply) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Server heartbeat succeeded", serverId.getClusterId(), + asList( + new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)), + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), + new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), + new LogMessage.Entry(AWAITED, awaited), + new LogMessage.Entry(REPLY, reply.toJson())), + "Heartbeat succeeded in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} " + + "in topology with ID {}. Awaited: {}. Reply: {}")); + } + } + + private static void logHeartbeatFailed( + final ServerId serverId, + final ConnectionDescription connectionDescription, + final boolean awaited, + final long elapsedTimeNanos, + final Exception failure) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Server heartbeat failed", serverId.getClusterId(), + asList( + new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)), + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), + new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), + new LogMessage.Entry(AWAITED, awaited), + new LogMessage.Entry(FAILURE, failure.getMessage())), + "Heartbeat failed in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} " + + "in topology with ID {}. Awaited: {}. Failure: {}")); + } + } + + + private static void logStartedServerMonitoring(final ServerId serverId) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Starting server monitoring", serverId.getClusterId(), + asList( + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())), + "Starting monitoring for server {}:{} in topology with ID {}")); + } + } + + private static void logStoppedServerMonitoring(final ServerId serverId) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Stopped server monitoring", serverId.getClusterId(), + asList( + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())), + "Stopped monitoring for server {}:{} in topology with ID {}")); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 9eac751943c..e1f57222708 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -63,7 +63,7 @@ import static com.mongodb.connection.ServerConnectionState.CONNECTING; import static com.mongodb.internal.connection.BaseCluster.logServerSelectionStarted; import static com.mongodb.internal.connection.BaseCluster.logServerSelectionSucceeded; -import static com.mongodb.internal.connection.BaseCluster.logTopologyClosedEvent; +import static com.mongodb.internal.connection.BaseCluster.logTopologyMonitoringStopping; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; import static java.lang.String.format; import static java.util.Collections.emptyList; @@ -275,7 +275,7 @@ public void close() { } ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId); clusterListener.clusterClosed(clusterClosedEvent); - logTopologyClosedEvent(clusterId, clusterClosedEvent); + logTopologyMonitoringStopping(clusterId); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java index daeb67be54d..3968f84f554 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java @@ -58,9 +58,9 @@ public SingleServerCluster(final ClusterId clusterId, final ClusterSettings sett // synchronized in the constructor because the change listener is re-entrant to this instance. // In other words, we are leaking a reference to "this" from the constructor. withLock(() -> { - server.set(createServer(settings.getHosts().get(0))); publishDescription(ServerDescription.builder().state(CONNECTING).address(settings.getHosts().get(0)) .build()); + server.set(createServer(settings.getHosts().get(0))); }); } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index cfd97f713e2..ec769e4f7a6 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -108,6 +108,7 @@ public enum Name { * Not supported. */ OPERATION("operation"), + AWAITED("awaited"), SERVICE_ID("serviceId"), SERVER_CONNECTION_ID("serverConnectionId"), DRIVER_CONNECTION_ID("driverConnectionId"), diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java index 40d607bb706..aad3df381d2 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java @@ -17,7 +17,6 @@ package com.mongodb.reactivestreams.client.unified; import org.junit.jupiter.params.provider.Arguments; - import java.util.Collection; final class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedReactiveStreamsTest { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java index 0d5729a6781..e81ebc025df 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java @@ -495,7 +495,7 @@ public String toString() { + new BsonDocument("messages", expectedMessages).toJson(JsonWriterSettings.builder().indent(true).build()) + "\n" + " actualMessages=" + new BsonDocument("messages", new BsonArray(actualMessages.stream() - .map(LogMatcher::asDocument).collect(Collectors.toList()))) + .map(LogMatcher::logMessageAsDocument).collect(Collectors.toList()))) .toJson(JsonWriterSettings.builder().indent(true).build()) + "\n"; } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java index a4410262b79..b1c56e50fac 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -47,15 +48,20 @@ final class LogMatcher { this.context = context; } - void assertLogMessageEquality(final String client, final boolean ignoreExtraMessages, final BsonArray expectedMessages, + void assertLogMessageEquality(final String client, final BsonArray ignoreMessages, + final boolean ignoreExtraMessages, final BsonArray expectedMessages, final List actualMessages, final Iterable tweaks) { context.push(ContextElement.ofLogMessages(client, expectedMessages, actualMessages)); + List logMessages = actualMessages.stream() + .filter(logMessage -> !ignoreMessages.contains(logMessageAsIgnoreMessageDocument(logMessage))) + .collect(Collectors.toList()); + if (ignoreExtraMessages) { assertTrue(context.getMessage("Number of messages must be greater than or equal to the expected number of messages"), - actualMessages.size() >= expectedMessages.size()); + logMessages.size() >= expectedMessages.size()); } else { - assertEquals(context.getMessage("Number of log messages must be the same"), expectedMessages.size(), actualMessages.size()); + assertEquals(context.getMessage("Number of log messages must be the same"), expectedMessages.size(), logMessages.size()); } for (int i = 0; i < expectedMessages.size(); i++) { @@ -64,14 +70,22 @@ void assertLogMessageEquality(final String client, final boolean ignoreExtraMess expectedMessage = tweak.apply(expectedMessage); } if (expectedMessage != null) { - valueMatcher.assertValuesMatch(expectedMessage, asDocument(actualMessages.get(i))); + valueMatcher.assertValuesMatch(expectedMessage, logMessageAsDocument(logMessages.get(i))); } } context.pop(); } - static BsonDocument asDocument(final LogMessage message) { + private static BsonDocument logMessageAsIgnoreMessageDocument(final LogMessage message) { + BsonDocument document = new BsonDocument(); + document.put("level", new BsonString(message.getLevel().name().toLowerCase())); + document.put("component", new BsonString(message.getComponent().getValue())); + document.put("data", new BsonDocument("message", new BsonString(message.getMessageId()))); + return document; + } + + static BsonDocument logMessageAsDocument(final LogMessage message) { BsonDocument document = new BsonDocument(); document.put("component", new BsonString(message.getComponent().getValue())); document.put("level", new BsonString(message.getLevel().name().toLowerCase())); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index b47f396f535..c089231e9e3 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -481,10 +481,11 @@ private void compareLogMessages(final UnifiedTestContext rootContext, final Bson for (BsonValue cur : definition.getArray("expectLogMessages")) { BsonDocument curLogMessagesForClient = cur.asDocument(); boolean ignoreExtraMessages = curLogMessagesForClient.getBoolean("ignoreExtraMessages", BsonBoolean.FALSE).getValue(); + BsonArray ignoreMessages = curLogMessagesForClient.getArray("ignoreMessages", new BsonArray()); String clientId = curLogMessagesForClient.getString("client").getValue(); TestLoggingInterceptor loggingInterceptor = entities.getClientLoggingInterceptor(clientId); - rootContext.getLogMatcher().assertLogMessageEquality(clientId, ignoreExtraMessages, + rootContext.getLogMatcher().assertLogMessageEquality(clientId, ignoreMessages, ignoreExtraMessages, curLogMessagesForClient.getArray("messages"), loggingInterceptor.getMessages(), tweaks); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 3d54e23efa6..48038552b4b 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -238,11 +238,6 @@ public static void applyCustomizations(final TestDef def) { def.skipJira("https://jira.mongodb.org/browse/JAVA-5230") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=auto >=4.4") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=stream >=4.4"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-4770") - .file("server-discovery-and-monitoring", "standalone-logging") - .file("server-discovery-and-monitoring", "replicaset-logging") - .file("server-discovery-and-monitoring", "sharded-logging") - .file("server-discovery-and-monitoring", "loadbalanced-logging"); def.skipJira("https://jira.mongodb.org/browse/JAVA-5564") .test("server-discovery-and-monitoring", "serverMonitoringMode", "poll waits after successful heartbeat"); def.skipJira("https://jira.mongodb.org/browse/JAVA-4536") From cde8e0b2c0f5f1df19e6db2fd3a659a6ddd48b3a Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 16 Jun 2025 16:11:41 +0100 Subject: [PATCH 02/15] Retry flaky / racy failing heartbeat tests --- .../mongodb/client/unified/UnifiedTestModifications.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 48038552b4b..d7f712bc46e 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -235,6 +235,13 @@ public static void applyCustomizations(final TestDef def) { // server-discovery-and-monitoring (SDAM) + // These tests can be flaky, due to a race waiting for the failed heartbeat. + def.modify(RETRY) + .test("server-discovery-and-monitoring", "standalone-logging", "Failing heartbeat") + .test("server-discovery-and-monitoring", "replicaset-logging", "Failing heartbeat") + .test("server-discovery-and-monitoring", "sharded-logging", "Failing heartbeat") + .test("server-discovery-and-monitoring", "loadbalanced-logging", "Failing heartbeat"); + def.skipJira("https://jira.mongodb.org/browse/JAVA-5230") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=auto >=4.4") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=stream >=4.4"); From 0b57fd805563b40a3cd919d7231529bbcb90a92b Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 16 Jun 2025 16:55:36 +0100 Subject: [PATCH 03/15] Provide reason for the retry --- .../com/mongodb/client/unified/UnifiedTestModifications.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index d7f712bc46e..d4224684533 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -235,8 +235,7 @@ public static void applyCustomizations(final TestDef def) { // server-discovery-and-monitoring (SDAM) - // These tests can be flaky, due to a race waiting for the failed heartbeat. - def.modify(RETRY) + def.retry("Flaky test,due to a race waiting for the failed heartbeat.") .test("server-discovery-and-monitoring", "standalone-logging", "Failing heartbeat") .test("server-discovery-and-monitoring", "replicaset-logging", "Failing heartbeat") .test("server-discovery-and-monitoring", "sharded-logging", "Failing heartbeat") From 1513156c26d5d570434c3328d1cb0d2aed1bb2a5 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 17 Jun 2025 09:35:48 +0100 Subject: [PATCH 04/15] Bump timeout for server monitor events --- .../functional/com/mongodb/client/unified/EventMatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 4c80ec66a78..0548e11a29b 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -336,7 +336,7 @@ public void waitForServerMonitorEvents(final String client, final Class e BsonDocument expectedEventContents = getEventContents(expectedEvent); try { serverMonitorListener.waitForEvents(expectedEventType, - event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(10)); + event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(30)); context.pop(); } catch (InterruptedException e) { throw new RuntimeException(e); From 17d20d14c5077016ce5f0e9acd6685bbf9fa37ee Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 19 Jun 2025 16:58:14 +0100 Subject: [PATCH 05/15] Update EventMatcher.java Changed to 15 to ensure servermonitor runs at least once. --- .../functional/com/mongodb/client/unified/EventMatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 0548e11a29b..8e79324fcc7 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -336,7 +336,7 @@ public void waitForServerMonitorEvents(final String client, final Class e BsonDocument expectedEventContents = getEventContents(expectedEvent); try { serverMonitorListener.waitForEvents(expectedEventType, - event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(30)); + event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(15)); context.pop(); } catch (InterruptedException e) { throw new RuntimeException(e); From c3815861cdf2e80e2ec6fe352cfdc8a49d75e1bc Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 19 Jun 2025 17:00:13 +0100 Subject: [PATCH 06/15] Update DefaultServerMonitor.java Nit removed --- .../com/mongodb/internal/connection/DefaultServerMonitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 2bfcc67279f..d0bc49c81fb 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -556,7 +556,7 @@ private static void logHeartbeatStarted( new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), new LogMessage.Entry(AWAITED, awaited)), "Heartbeat started for {}:{} on connection with driver-generated ID {} and server-generated ID {} " - + "in topology with ID {}. Awaited: {}")); + + "in topology with ID {}. Awaited: {}")); } } From 4212fcd1120f2ab2b5e9ad39a6199d7c7b140090 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 24 Jun 2025 13:07:20 +0100 Subject: [PATCH 07/15] Log messages before publishing events --- .../main/com/mongodb/internal/connection/BaseCluster.java | 6 +++--- .../mongodb/internal/connection/LoadBalancedCluster.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 9e640e8dcba..811b66bfeed 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -116,11 +116,11 @@ abstract class BaseCluster implements Cluster { this.settings = notNull("settings", settings); this.serverFactory = notNull("serverFactory", serverFactory); this.clusterListener = singleClusterListener(settings); + logTopologyMonitoringStarting(clusterId); ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId); clusterListener.clusterOpening(clusterOpeningEvent); description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()); - logTopologyMonitoringStarting(clusterId); } @Override @@ -219,9 +219,9 @@ public void close() { phase.get().countDown(); fireChangeEvent(new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()), description); + logTopologyMonitoringStopping(clusterId); ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId); clusterListener.clusterClosed(clusterClosedEvent); - logTopologyMonitoringStopping(clusterId); stopWaitQueueHandler(); } } @@ -249,8 +249,8 @@ protected void updateDescription(final ClusterDescription newDescription) { protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) { if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) { ClusterDescriptionChangedEvent changedEvent = new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription); - clusterListener.clusterDescriptionChanged(changedEvent); logTopologyDescriptionChanged(getClusterId(), changedEvent); + clusterListener.clusterDescriptionChanged(changedEvent); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index e1f57222708..04ec1a123aa 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -273,9 +273,9 @@ public void close() { if (localServer != null) { localServer.close(); } + logTopologyMonitoringStopping(clusterId); ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId); clusterListener.clusterClosed(clusterClosedEvent); - logTopologyMonitoringStopping(clusterId); } } From 9f5a5a8ed9878a331916e7272401e4d8b898726c Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 24 Jun 2025 15:00:18 +0100 Subject: [PATCH 08/15] Temp remove flaky tests to see outcome --- .../mongodb/client/unified/UnifiedTestModifications.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index d4224684533..48038552b4b 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -235,12 +235,6 @@ public static void applyCustomizations(final TestDef def) { // server-discovery-and-monitoring (SDAM) - def.retry("Flaky test,due to a race waiting for the failed heartbeat.") - .test("server-discovery-and-monitoring", "standalone-logging", "Failing heartbeat") - .test("server-discovery-and-monitoring", "replicaset-logging", "Failing heartbeat") - .test("server-discovery-and-monitoring", "sharded-logging", "Failing heartbeat") - .test("server-discovery-and-monitoring", "loadbalanced-logging", "Failing heartbeat"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5230") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=auto >=4.4") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=stream >=4.4"); From 31cc6f2a5799b9843174e90e05735d156f3e9a97 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 24 Jun 2025 15:45:57 +0100 Subject: [PATCH 09/15] Output all seen events when timing out --- .../unit/com/mongodb/event/TestServerMonitorListener.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java index b009b5094f0..bdd6960d385 100644 --- a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java +++ b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java @@ -42,6 +42,7 @@ public final class TestServerMonitorListener implements ServerMonitorListener { private final Lock lock; private final Condition condition; private final List events; + private final List allEvents; public TestServerMonitorListener(final Iterable listenableEventTypes) { this.listenableEventTypes = unmodifiableSet(stream(listenableEventTypes.spliterator(), false) @@ -51,6 +52,7 @@ public TestServerMonitorListener(final Iterable listenableEventTypes) { lock = new ReentrantLock(); condition = lock.newCondition(); events = new ArrayList<>(); + allEvents = new ArrayList<>(); } public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { @@ -74,8 +76,8 @@ public void waitForEvents(final Class type, final Predicate ma long observedCount = countEvents(type, matcher); while (observedCount < count) { if (remainingNanos <= 0) { - throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.", - count, type.getSimpleName(), observedCount)); + throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d. Seen: %s", + count, type.getSimpleName(), observedCount, allEvents)); } remainingNanos = condition.awaitNanos(remainingNanos); observedCount = countEvents(type, matcher); @@ -135,6 +137,7 @@ private boolean listenable(final Class eventType) { } private void register(final Object event) { + allEvents.add(event); if (!listenable(event.getClass())) { return; } From 963a9e9425600f3a4bb787d3374175f522f93569 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Tue, 24 Jun 2025 17:33:10 +0100 Subject: [PATCH 10/15] Propogate the error message --- .../functional/com/mongodb/client/unified/EventMatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 8e79324fcc7..0abb874ddcd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -341,7 +341,7 @@ public void waitForServerMonitorEvents(final String client, final Class e } catch (InterruptedException e) { throw new RuntimeException(e); } catch (TimeoutException e) { - fail(context.getMessage("Timed out waiting for server monitor events")); + fail(context.getMessage("Timed out waiting for server monitor events: " + e.getMessage())); } } From 158c072b982b83b69f9835bfff398fd2688c4de7 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 25 Jun 2025 13:21:09 +0100 Subject: [PATCH 11/15] Added more debugging. Previous failing Failing heartbeat test showed that multiple heartbeat started calls but no success or failure calls. --- .../internal/connection/DefaultServerMonitor.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index d0bc49c81fb..8fabada34a1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -258,15 +258,16 @@ private ServerDescription lookupServerDescription(final ServerDescription curren if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } - if (!alreadyLoggedHeartBeatStarted) { - logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - connection.getDescription().getConnectionId(), shouldStreamResponses)); - } - alreadyLoggedHeartBeatStarted = false; long start = System.nanoTime(); try { + if (!alreadyLoggedHeartBeatStarted) { + logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + connection.getDescription().getConnectionId(), shouldStreamResponses)); + } + alreadyLoggedHeartBeatStarted = false; + OperationContext operationContext = operationContextFactory.create(); if (!connection.hasMoreToCome()) { BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1)) @@ -385,6 +386,7 @@ private long waitForSignalOrTimeout() throws InterruptedException { } public void cancelCurrentCheck() { + System.out.println("Canceling current check for server " + serverId.getAddress()); InternalConnection localConnection = withLock(lock, () -> { if (connection != null && !currentCheckCancelled) { InternalConnection result = connection; From cd7e72652029b7fcb79d0790ff7b74c7db7364f2 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 25 Jun 2025 17:33:07 +0100 Subject: [PATCH 12/15] Fix heartbeat bug and remove debugging logic When openining a socket that throws an exception the heartbeat started message is created but no heartbeat failed message. --- .../connection/DefaultServerMonitor.java | 28 +++++++++++++------ .../event/TestServerMonitorListener.java | 7 ++--- .../mongodb/client/unified/EventMatcher.java | 2 +- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 8fabada34a1..9d3ba4f926f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -249,7 +249,18 @@ private ServerDescription lookupServerDescription(final ServerDescription curren logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses); serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( newConnection.getDescription().getConnectionId(), shouldStreamResponses)); - newConnection.open(operationContextFactory.create()); + long start = System.nanoTime(); + try { + newConnection.open(operationContextFactory.create()); + } catch (Exception e) { + alreadyLoggedHeartBeatStarted = false; + long elapsedTimeNanos = System.nanoTime() - start; + logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); + serverMonitorListener.serverHeartbeatFailed( + new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, + shouldStreamResponses, e)); + throw e; + } connection = newConnection; roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); return connection.getInitialServerDescription(); @@ -259,15 +270,15 @@ private ServerDescription lookupServerDescription(final ServerDescription curren LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } + if (!alreadyLoggedHeartBeatStarted) { + logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + connection.getDescription().getConnectionId(), shouldStreamResponses)); + } + alreadyLoggedHeartBeatStarted = false; + long start = System.nanoTime(); try { - if (!alreadyLoggedHeartBeatStarted) { - logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - connection.getDescription().getConnectionId(), shouldStreamResponses)); - } - alreadyLoggedHeartBeatStarted = false; - OperationContext operationContext = operationContextFactory.create(); if (!connection.hasMoreToCome()) { BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1)) @@ -386,7 +397,6 @@ private long waitForSignalOrTimeout() throws InterruptedException { } public void cancelCurrentCheck() { - System.out.println("Canceling current check for server " + serverId.getAddress()); InternalConnection localConnection = withLock(lock, () -> { if (connection != null && !currentCheckCancelled) { InternalConnection result = connection; diff --git a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java index bdd6960d385..b009b5094f0 100644 --- a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java +++ b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java @@ -42,7 +42,6 @@ public final class TestServerMonitorListener implements ServerMonitorListener { private final Lock lock; private final Condition condition; private final List events; - private final List allEvents; public TestServerMonitorListener(final Iterable listenableEventTypes) { this.listenableEventTypes = unmodifiableSet(stream(listenableEventTypes.spliterator(), false) @@ -52,7 +51,6 @@ public TestServerMonitorListener(final Iterable listenableEventTypes) { lock = new ReentrantLock(); condition = lock.newCondition(); events = new ArrayList<>(); - allEvents = new ArrayList<>(); } public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { @@ -76,8 +74,8 @@ public void waitForEvents(final Class type, final Predicate ma long observedCount = countEvents(type, matcher); while (observedCount < count) { if (remainingNanos <= 0) { - throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d. Seen: %s", - count, type.getSimpleName(), observedCount, allEvents)); + throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.", + count, type.getSimpleName(), observedCount)); } remainingNanos = condition.awaitNanos(remainingNanos); observedCount = countEvents(type, matcher); @@ -137,7 +135,6 @@ private boolean listenable(final Class eventType) { } private void register(final Object event) { - allEvents.add(event); if (!listenable(event.getClass())) { return; } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 0abb874ddcd..61a418ad3a9 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -341,7 +341,7 @@ public void waitForServerMonitorEvents(final String client, final Class e } catch (InterruptedException e) { throw new RuntimeException(e); } catch (TimeoutException e) { - fail(context.getMessage("Timed out waiting for server monitor events: " + e.getMessage())); + fail(context.getMessage(e.getMessage())); } } From e7dfa0c4f49a88099a3942d592e8218ec9c4bcb2 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 25 Jun 2025 18:22:35 +0100 Subject: [PATCH 13/15] Fix time reporting for heartbeats --- .../connection/DefaultServerMonitor.java | 69 ++++++++++--------- ...ifiedServerDiscoveryAndMonitoringTest.java | 6 +- 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 9d3ba4f926f..fc75297c267 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -180,6 +180,7 @@ class ServerMonitor extends Thread implements AutoCloseable { private volatile InternalConnection connection = null; private volatile boolean alreadyLoggedHeartBeatStarted = false; private volatile boolean currentCheckCancelled; + private volatile long lookupStartTimeNanos; ServerMonitor() { super("cluster-" + serverId.getClusterId() + "-" + serverId.getAddress()); @@ -241,43 +242,24 @@ public void run() { private ServerDescription lookupServerDescription(final ServerDescription currentServerDescription) { try { - boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); - if (connection == null || connection.isClosed()) { - alreadyLoggedHeartBeatStarted = true; - currentCheckCancelled = false; - InternalConnection newConnection = internalConnectionFactory.create(serverId); - logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - newConnection.getDescription().getConnectionId(), shouldStreamResponses)); - long start = System.nanoTime(); - try { - newConnection.open(operationContextFactory.create()); - } catch (Exception e) { - alreadyLoggedHeartBeatStarted = false; - long elapsedTimeNanos = System.nanoTime() - start; - logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); - serverMonitorListener.serverHeartbeatFailed( - new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, - shouldStreamResponses, e)); - throw e; - } - connection = newConnection; - roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); - return connection.getInitialServerDescription(); - } - if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } + boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); + if (connection == null || connection.isClosed()) { + lookupStartTimeNanos = System.nanoTime(); + return openNewConnectionAndGetInitialDescription(shouldStreamResponses); + } + if (!alreadyLoggedHeartBeatStarted) { + lookupStartTimeNanos = System.nanoTime(); logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( connection.getDescription().getConnectionId(), shouldStreamResponses)); } alreadyLoggedHeartBeatStarted = false; - long start = System.nanoTime(); try { OperationContext operationContext = operationContextFactory.create(); if (!connection.hasMoreToCome()) { @@ -287,7 +269,6 @@ private ServerDescription lookupServerDescription(final ServerDescription curren helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument()); helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS))); } - connection.send(createCommandMessage(helloDocument, connection, currentServerDescription), new BsonDocumentCodec(), operationContext); } @@ -298,8 +279,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren } else { helloResult = connection.receive(new BsonDocumentCodec(), operationContext); } - - long elapsedTimeNanos = System.nanoTime() - start; + long elapsedTimeNanos = getElapsedTimeNanos(); if (!shouldStreamResponses) { roundTripTimeSampler.addSample(elapsedTimeNanos); } @@ -307,11 +287,10 @@ private ServerDescription lookupServerDescription(final ServerDescription curren serverMonitorListener.serverHeartbeatSucceeded( new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, elapsedTimeNanos, shouldStreamResponses)); - return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), roundTripTimeSampler.getMin()); } catch (Exception e) { - long elapsedTimeNanos = System.nanoTime() - start; + long elapsedTimeNanos = getElapsedTimeNanos(); logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); serverMonitorListener.serverHeartbeatFailed( new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, @@ -332,6 +311,34 @@ private ServerDescription lookupServerDescription(final ServerDescription curren } } + private ServerDescription openNewConnectionAndGetInitialDescription(final boolean shouldStreamResponses) { + currentCheckCancelled = false; + alreadyLoggedHeartBeatStarted = true; + InternalConnection newConnection = internalConnectionFactory.create(serverId); + logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + newConnection.getDescription().getConnectionId(), shouldStreamResponses)); + + try { + newConnection.open(operationContextFactory.create()); + connection = newConnection; + roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); + return connection.getInitialServerDescription(); + } catch (Exception e) { + alreadyLoggedHeartBeatStarted = false; + long elapsedTimeNanos = getElapsedTimeNanos(); + logHeartbeatFailed(serverId, newConnection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); + serverMonitorListener.serverHeartbeatFailed( + new ServerHeartbeatFailedEvent(newConnection.getDescription().getConnectionId(), elapsedTimeNanos, + shouldStreamResponses, e)); + throw e; + } + } + + private long getElapsedTimeNanos() { + return System.nanoTime() - lookupStartTimeNanos; + } + private OperationContext operationContextWithAdditionalTimeout(final OperationContext originalOperationContext) { TimeoutContext newTimeoutContext = originalOperationContext.getTimeoutContext() .withAdditionalReadTimeout(Math.toIntExact(serverSettings.getHeartbeatFrequency(MILLISECONDS))); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java index 83a3e75d956..696ced0e2df 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java @@ -19,8 +19,12 @@ import org.junit.jupiter.params.provider.Arguments; import java.util.Collection; +import static java.util.stream.Collectors.toList; + public final class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedSyncTest { private static Collection data() { - return getTestData("server-discovery-and-monitoring"); + return getTestData("server-discovery-and-monitoring") + .stream().filter(arguments -> arguments.get()[1].toString().endsWith("serverMonitoringMode")) + .collect(toList()); } } From bec47fdaa0f34ec545db783ba69c245ac55f6068 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 26 Jun 2025 17:43:52 +0100 Subject: [PATCH 14/15] PR fixes --- .../main/com/mongodb/internal/connection/DefaultServer.java | 2 -- .../unified/UnifiedServerDiscoveryAndMonitoringTest.java | 6 +----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java index b4aa3af1e06..80b1a5c0c27 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java @@ -31,7 +31,6 @@ import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; -import com.mongodb.internal.logging.StructuredLogger; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -49,7 +48,6 @@ class DefaultServer implements ClusterableServer { private static final Logger LOGGER = Loggers.getLogger("connection"); - private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("connection"); private final ServerId serverId; private final ConnectionPool connectionPool; private final ClusterConnectionMode clusterConnectionMode; diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java index 696ced0e2df..83a3e75d956 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java @@ -19,12 +19,8 @@ import org.junit.jupiter.params.provider.Arguments; import java.util.Collection; -import static java.util.stream.Collectors.toList; - public final class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedSyncTest { private static Collection data() { - return getTestData("server-discovery-and-monitoring") - .stream().filter(arguments -> arguments.get()[1].toString().endsWith("serverMonitoringMode")) - .collect(toList()); + return getTestData("server-discovery-and-monitoring"); } } From 7c0efa360fbc59a189f61ed41a01692e1444db51 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 30 Jun 2025 11:06:44 +0100 Subject: [PATCH 15/15] Copilot based refactor / DRY of lookupServerDescription --- .../connection/DefaultServerMonitor.java | 133 ++++++++++-------- 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index fc75297c267..4842a6c8a27 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -247,56 +247,20 @@ private ServerDescription lookupServerDescription(final ServerDescription curren } boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); + lookupStartTimeNanos = System.nanoTime(); + + // Handle connection setup if (connection == null || connection.isClosed()) { - lookupStartTimeNanos = System.nanoTime(); - return openNewConnectionAndGetInitialDescription(shouldStreamResponses); + return setupNewConnectionAndGetInitialDescription(shouldStreamResponses); } + // Log heartbeat started if it hasn't been logged yet if (!alreadyLoggedHeartBeatStarted) { - lookupStartTimeNanos = System.nanoTime(); - logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - connection.getDescription().getConnectionId(), shouldStreamResponses)); + logAndNotifyHeartbeatStarted(shouldStreamResponses); } - alreadyLoggedHeartBeatStarted = false; - - try { - OperationContext operationContext = operationContextFactory.create(); - if (!connection.hasMoreToCome()) { - BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1)) - .append("helloOk", BsonBoolean.TRUE); - if (shouldStreamResponses) { - helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument()); - helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS))); - } - connection.send(createCommandMessage(helloDocument, connection, currentServerDescription), new BsonDocumentCodec(), - operationContext); - } - BsonDocument helloResult; - if (shouldStreamResponses) { - helloResult = connection.receive(new BsonDocumentCodec(), operationContextWithAdditionalTimeout(operationContext)); - } else { - helloResult = connection.receive(new BsonDocumentCodec(), operationContext); - } - long elapsedTimeNanos = getElapsedTimeNanos(); - if (!shouldStreamResponses) { - roundTripTimeSampler.addSample(elapsedTimeNanos); - } - logHeartbeatSucceeded(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, helloResult); - serverMonitorListener.serverHeartbeatSucceeded( - new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, - elapsedTimeNanos, shouldStreamResponses)); - return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), - roundTripTimeSampler.getMin()); - } catch (Exception e) { - long elapsedTimeNanos = getElapsedTimeNanos(); - logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); - serverMonitorListener.serverHeartbeatFailed( - new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, - shouldStreamResponses, e)); - throw e; - } + // Get existing connection + return doHeartbeat(currentServerDescription, shouldStreamResponses); } catch (Throwable t) { roundTripTimeSampler.reset(); InternalConnection localConnection = withLock(lock, () -> { @@ -311,30 +275,81 @@ private ServerDescription lookupServerDescription(final ServerDescription curren } } - private ServerDescription openNewConnectionAndGetInitialDescription(final boolean shouldStreamResponses) { - currentCheckCancelled = false; - alreadyLoggedHeartBeatStarted = true; - InternalConnection newConnection = internalConnectionFactory.create(serverId); - logHeartbeatStarted(serverId, newConnection.getDescription(), shouldStreamResponses); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - newConnection.getDescription().getConnectionId(), shouldStreamResponses)); + private ServerDescription setupNewConnectionAndGetInitialDescription(final boolean shouldStreamResponses) { + connection = internalConnectionFactory.create(serverId); + logAndNotifyHeartbeatStarted(shouldStreamResponses); try { - newConnection.open(operationContextFactory.create()); - connection = newConnection; + connection.open(operationContextFactory.create()); roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); return connection.getInitialServerDescription(); } catch (Exception e) { - alreadyLoggedHeartBeatStarted = false; - long elapsedTimeNanos = getElapsedTimeNanos(); - logHeartbeatFailed(serverId, newConnection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); - serverMonitorListener.serverHeartbeatFailed( - new ServerHeartbeatFailedEvent(newConnection.getDescription().getConnectionId(), elapsedTimeNanos, - shouldStreamResponses, e)); + logAndNotifyHeartbeatFailed(shouldStreamResponses, e); throw e; } } + /** + * Run hello command to get the server description. + */ + private ServerDescription doHeartbeat(final ServerDescription currentServerDescription, + final boolean shouldStreamResponses) { + try { + OperationContext operationContext = operationContextFactory.create(); + if (!connection.hasMoreToCome()) { + BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1)) + .append("helloOk", BsonBoolean.TRUE); + if (shouldStreamResponses) { + helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument()); + helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS))); + } + connection.send(createCommandMessage(helloDocument, connection, currentServerDescription), new BsonDocumentCodec(), + operationContext); + } + + BsonDocument helloResult; + if (shouldStreamResponses) { + helloResult = connection.receive(new BsonDocumentCodec(), operationContextWithAdditionalTimeout(operationContext)); + } else { + helloResult = connection.receive(new BsonDocumentCodec(), operationContext); + } + logAndNotifyHeartbeatSucceeded(shouldStreamResponses, helloResult); + return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), + roundTripTimeSampler.getMin()); + } catch (Exception e) { + logAndNotifyHeartbeatFailed(shouldStreamResponses, e); + throw e; + } + } + + private void logAndNotifyHeartbeatStarted(final boolean shouldStreamResponses) { + alreadyLoggedHeartBeatStarted = true; + logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + connection.getDescription().getConnectionId(), shouldStreamResponses)); + } + + private void logAndNotifyHeartbeatSucceeded(final boolean shouldStreamResponses, final BsonDocument helloResult) { + alreadyLoggedHeartBeatStarted = false; + long elapsedTimeNanos = getElapsedTimeNanos(); + if (!shouldStreamResponses) { + roundTripTimeSampler.addSample(elapsedTimeNanos); + } + logHeartbeatSucceeded(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, helloResult); + serverMonitorListener.serverHeartbeatSucceeded( + new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, + elapsedTimeNanos, shouldStreamResponses)); + } + + private void logAndNotifyHeartbeatFailed(final boolean shouldStreamResponses, final Exception e) { + alreadyLoggedHeartBeatStarted = false; + long elapsedTimeNanos = getElapsedTimeNanos(); + logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); + serverMonitorListener.serverHeartbeatFailed( + new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, + shouldStreamResponses, e)); + } + private long getElapsedTimeNanos() { return System.nanoTime() - lookupStartTimeNanos; }