diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index eda2fddbec..fb840d9ad0 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -120,12 +120,12 @@ abstract class BaseCluster implements Cluster { this.settings = notNull("settings", settings); this.serverFactory = notNull("serverFactory", serverFactory); this.clusterListener = singleClusterListener(settings); - ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId); - this.clusterListener.clusterOpening(clusterOpeningEvent); - logTopologyOpening(clusterId, clusterOpeningEvent); this.description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()); this.clientMetadata = clientMetadata; + logTopologyMonitoringStarting(clusterId); + ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId); + clusterListener.clusterOpening(clusterOpeningEvent); } @Override @@ -229,9 +229,9 @@ public void close() { phase.get().countDown(); fireChangeEvent(new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()), description); + logTopologyMonitoringStopping(clusterId); ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId); clusterListener.clusterClosed(clusterClosedEvent); - logTopologyClosedEvent(clusterId, clusterClosedEvent); stopWaitQueueHandler(); } } @@ -259,8 +259,8 @@ protected void updateDescription(final ClusterDescription newDescription) { protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) { if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) { ClusterDescriptionChangedEvent changedEvent = new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription); - clusterListener.clusterDescriptionChanged(changedEvent); logTopologyDescriptionChanged(getClusterId(), changedEvent); + clusterListener.clusterDescriptionChanged(changedEvent); } } @@ -642,9 +642,7 @@ static void logServerSelectionSucceeded( } } - static void logTopologyOpening( - final ClusterId clusterId, - final ClusterOpeningEvent clusterOpeningEvent) { + static void logTopologyMonitoringStarting(final ClusterId clusterId) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( TOPOLOGY, DEBUG, "Starting topology monitoring", clusterId, @@ -669,9 +667,7 @@ static void logTopologyDescriptionChanged( } } - static void logTopologyClosedEvent( - final ClusterId clusterId, - final ClusterClosedEvent clusterClosedEvent) { + static void logTopologyMonitoringStopping(final ClusterId clusterId) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( TOPOLOGY, DEBUG, "Stopped topology monitoring", clusterId, diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java index b8de03e93e..80b1a5c0c2 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java @@ -75,7 +75,6 @@ class DefaultServer implements ClusterableServer { this.connectionPool = notNull("connectionPool", connectionPool); this.serverId = serverId; - serverListener.serverOpening(new ServerOpeningEvent(this.serverId)); this.serverMonitor = serverMonitor; diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index cd6dcd769d..4842a6c8a2 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -22,6 +22,7 @@ import com.mongodb.ServerApi; import com.mongodb.annotations.ThreadSafe; import com.mongodb.connection.ClusterConnectionMode; +import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerId; import com.mongodb.connection.ServerSettings; @@ -34,6 +35,8 @@ import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.inject.Provider; +import com.mongodb.internal.logging.LogMessage; +import com.mongodb.internal.logging.StructuredLogger; import com.mongodb.internal.validator.NoOpFieldNameValidator; import com.mongodb.lang.Nullable; import org.bson.BsonBoolean; @@ -63,7 +66,19 @@ import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription; import static com.mongodb.internal.connection.ServerDescriptionHelper.unknownConnectingServerDescription; import static com.mongodb.internal.event.EventListenerHelper.singleServerMonitorListener; +import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.AWAITED; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.DRIVER_CONNECTION_ID; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.DURATION_MS; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.REPLY; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_CONNECTION_ID; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID; +import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; import static java.lang.String.format; +import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -71,6 +86,7 @@ class DefaultServerMonitor implements ServerMonitor { private static final Logger LOGGER = Loggers.getLogger("cluster"); + private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); private final ServerId serverId; private final ServerMonitorListener serverMonitorListener; @@ -116,6 +132,7 @@ class DefaultServerMonitor implements ServerMonitor { @Override public void start() { + logStartedServerMonitoring(serverId); monitor.start(); } @@ -137,6 +154,9 @@ public void connect() { @SuppressWarnings("try") public void close() { withLock(lock, () -> { + if (!isClosed) { + logStoppedServerMonitoring(serverId); + } isClosed = true; //noinspection EmptyTryBlock try (ServerMonitor ignoredAutoClosed = monitor; @@ -160,6 +180,7 @@ class ServerMonitor extends Thread implements AutoCloseable { private volatile InternalConnection connection = null; private volatile boolean alreadyLoggedHeartBeatStarted = false; private volatile boolean currentCheckCancelled; + private volatile long lookupStartTimeNanos; ServerMonitor() { super("cluster-" + serverId.getClusterId() + "-" + serverId.getAddress()); @@ -221,66 +242,25 @@ public void run() { private ServerDescription lookupServerDescription(final ServerDescription currentServerDescription) { try { - boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); - if (connection == null || connection.isClosed()) { - alreadyLoggedHeartBeatStarted = true; - currentCheckCancelled = false; - InternalConnection newConnection = internalConnectionFactory.create(serverId); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - newConnection.getDescription().getConnectionId(), shouldStreamResponses)); - newConnection.open(operationContextFactory.create()); - connection = newConnection; - roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); - return connection.getInitialServerDescription(); - } - if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } - if (!alreadyLoggedHeartBeatStarted) { - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - connection.getDescription().getConnectionId(), shouldStreamResponses)); - } - alreadyLoggedHeartBeatStarted = false; - - long start = System.nanoTime(); - try { - OperationContext operationContext = operationContextFactory.create(); - if (!connection.hasMoreToCome()) { - BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1)) - .append("helloOk", BsonBoolean.TRUE); - if (shouldStreamResponses) { - helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument()); - helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS))); - } - connection.send(createCommandMessage(helloDocument, connection, currentServerDescription), new BsonDocumentCodec(), - operationContext); - } + boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); + lookupStartTimeNanos = System.nanoTime(); - BsonDocument helloResult; - if (shouldStreamResponses) { - helloResult = connection.receive(new BsonDocumentCodec(), operationContextWithAdditionalTimeout(operationContext)); - } else { - helloResult = connection.receive(new BsonDocumentCodec(), operationContext); - } + // Handle connection setup + if (connection == null || connection.isClosed()) { + return setupNewConnectionAndGetInitialDescription(shouldStreamResponses); + } - long elapsedTimeNanos = System.nanoTime() - start; - if (!shouldStreamResponses) { - roundTripTimeSampler.addSample(elapsedTimeNanos); - } - serverMonitorListener.serverHeartbeatSucceeded( - new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, - elapsedTimeNanos, shouldStreamResponses)); - - return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), - roundTripTimeSampler.getMin()); - } catch (Exception e) { - serverMonitorListener.serverHeartbeatFailed( - new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start, - shouldStreamResponses, e)); - throw e; + // Log heartbeat started if it hasn't been logged yet + if (!alreadyLoggedHeartBeatStarted) { + logAndNotifyHeartbeatStarted(shouldStreamResponses); } + + // Get existing connection + return doHeartbeat(currentServerDescription, shouldStreamResponses); } catch (Throwable t) { roundTripTimeSampler.reset(); InternalConnection localConnection = withLock(lock, () -> { @@ -295,6 +275,85 @@ private ServerDescription lookupServerDescription(final ServerDescription curren } } + private ServerDescription setupNewConnectionAndGetInitialDescription(final boolean shouldStreamResponses) { + connection = internalConnectionFactory.create(serverId); + logAndNotifyHeartbeatStarted(shouldStreamResponses); + + try { + connection.open(operationContextFactory.create()); + roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); + return connection.getInitialServerDescription(); + } catch (Exception e) { + logAndNotifyHeartbeatFailed(shouldStreamResponses, e); + throw e; + } + } + + /** + * Run hello command to get the server description. + */ + private ServerDescription doHeartbeat(final ServerDescription currentServerDescription, + final boolean shouldStreamResponses) { + try { + OperationContext operationContext = operationContextFactory.create(); + if (!connection.hasMoreToCome()) { + BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1)) + .append("helloOk", BsonBoolean.TRUE); + if (shouldStreamResponses) { + helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument()); + helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS))); + } + connection.send(createCommandMessage(helloDocument, connection, currentServerDescription), new BsonDocumentCodec(), + operationContext); + } + + BsonDocument helloResult; + if (shouldStreamResponses) { + helloResult = connection.receive(new BsonDocumentCodec(), operationContextWithAdditionalTimeout(operationContext)); + } else { + helloResult = connection.receive(new BsonDocumentCodec(), operationContext); + } + logAndNotifyHeartbeatSucceeded(shouldStreamResponses, helloResult); + return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(), + roundTripTimeSampler.getMin()); + } catch (Exception e) { + logAndNotifyHeartbeatFailed(shouldStreamResponses, e); + throw e; + } + } + + private void logAndNotifyHeartbeatStarted(final boolean shouldStreamResponses) { + alreadyLoggedHeartBeatStarted = true; + logHeartbeatStarted(serverId, connection.getDescription(), shouldStreamResponses); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + connection.getDescription().getConnectionId(), shouldStreamResponses)); + } + + private void logAndNotifyHeartbeatSucceeded(final boolean shouldStreamResponses, final BsonDocument helloResult) { + alreadyLoggedHeartBeatStarted = false; + long elapsedTimeNanos = getElapsedTimeNanos(); + if (!shouldStreamResponses) { + roundTripTimeSampler.addSample(elapsedTimeNanos); + } + logHeartbeatSucceeded(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, helloResult); + serverMonitorListener.serverHeartbeatSucceeded( + new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult, + elapsedTimeNanos, shouldStreamResponses)); + } + + private void logAndNotifyHeartbeatFailed(final boolean shouldStreamResponses, final Exception e) { + alreadyLoggedHeartBeatStarted = false; + long elapsedTimeNanos = getElapsedTimeNanos(); + logHeartbeatFailed(serverId, connection.getDescription(), shouldStreamResponses, elapsedTimeNanos, e); + serverMonitorListener.serverHeartbeatFailed( + new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), elapsedTimeNanos, + shouldStreamResponses, e)); + } + + private long getElapsedTimeNanos() { + return System.nanoTime() - lookupStartTimeNanos; + } + private OperationContext operationContextWithAdditionalTimeout(final OperationContext originalOperationContext) { TimeoutContext newTimeoutContext = originalOperationContext.getTimeoutContext() .withAdditionalReadTimeout(Math.toIntExact(serverSettings.getHeartbeatFrequency(MILLISECONDS))); @@ -515,4 +574,94 @@ private void waitForNext() throws InterruptedException { private String getHandshakeCommandName(final ServerDescription serverDescription) { return serverDescription.isHelloOk() ? HELLO : LEGACY_HELLO; } + + private static void logHeartbeatStarted( + final ServerId serverId, + final ConnectionDescription connectionDescription, + final boolean awaited) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Server heartbeat started", serverId.getClusterId(), + asList( + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), + new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), + new LogMessage.Entry(AWAITED, awaited)), + "Heartbeat started for {}:{} on connection with driver-generated ID {} and server-generated ID {} " + + "in topology with ID {}. Awaited: {}")); + } + } + + private static void logHeartbeatSucceeded( + final ServerId serverId, + final ConnectionDescription connectionDescription, + final boolean awaited, + final long elapsedTimeNanos, + final BsonDocument reply) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Server heartbeat succeeded", serverId.getClusterId(), + asList( + new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)), + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), + new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), + new LogMessage.Entry(AWAITED, awaited), + new LogMessage.Entry(REPLY, reply.toJson())), + "Heartbeat succeeded in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} " + + "in topology with ID {}. Awaited: {}. Reply: {}")); + } + } + + private static void logHeartbeatFailed( + final ServerId serverId, + final ConnectionDescription connectionDescription, + final boolean awaited, + final long elapsedTimeNanos, + final Exception failure) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Server heartbeat failed", serverId.getClusterId(), + asList( + new LogMessage.Entry(DURATION_MS, MILLISECONDS.convert(elapsedTimeNanos, NANOSECONDS)), + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(DRIVER_CONNECTION_ID, connectionDescription.getConnectionId().getLocalValue()), + new LogMessage.Entry(SERVER_CONNECTION_ID, connectionDescription.getConnectionId().getServerValue()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId()), + new LogMessage.Entry(AWAITED, awaited), + new LogMessage.Entry(FAILURE, failure.getMessage())), + "Heartbeat failed in {} ms for {}:{} on connection with driver-generated ID {} and server-generated ID {} " + + "in topology with ID {}. Awaited: {}. Failure: {}")); + } + } + + + private static void logStartedServerMonitoring(final ServerId serverId) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Starting server monitoring", serverId.getClusterId(), + asList( + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())), + "Starting monitoring for server {}:{} in topology with ID {}")); + } + } + + private static void logStoppedServerMonitoring(final ServerId serverId) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, serverId.getClusterId())) { + STRUCTURED_LOGGER.log(new LogMessage( + TOPOLOGY, DEBUG, "Stopped server monitoring", serverId.getClusterId(), + asList( + new LogMessage.Entry(SERVER_HOST, serverId.getAddress().getHost()), + new LogMessage.Entry(SERVER_PORT, serverId.getAddress().getPort()), + new LogMessage.Entry(TOPOLOGY_ID, serverId.getClusterId())), + "Stopped monitoring for server {}:{} in topology with ID {}")); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index b177bcb12d..588bd9f609 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -63,7 +63,7 @@ import static com.mongodb.connection.ServerConnectionState.CONNECTING; import static com.mongodb.internal.connection.BaseCluster.logServerSelectionStarted; import static com.mongodb.internal.connection.BaseCluster.logServerSelectionSucceeded; -import static com.mongodb.internal.connection.BaseCluster.logTopologyClosedEvent; +import static com.mongodb.internal.connection.BaseCluster.logTopologyMonitoringStopping; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; import static java.lang.String.format; import static java.util.Collections.emptyList; @@ -281,9 +281,9 @@ public void close() { if (localServer != null) { localServer.close(); } + logTopologyMonitoringStopping(clusterId); ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId); clusterListener.clusterClosed(clusterClosedEvent); - logTopologyClosedEvent(clusterId, clusterClosedEvent); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java index c21205559e..87b55e3f64 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java @@ -59,9 +59,9 @@ public SingleServerCluster(final ClusterId clusterId, final ClusterSettings sett // synchronized in the constructor because the change listener is re-entrant to this instance. // In other words, we are leaking a reference to "this" from the constructor. withLock(() -> { - server.set(createServer(settings.getHosts().get(0))); publishDescription(ServerDescription.builder().state(CONNECTING).address(settings.getHosts().get(0)) .build()); + server.set(createServer(settings.getHosts().get(0))); }); } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index cfd97f713e..ec769e4f7a 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -108,6 +108,7 @@ public enum Name { * Not supported. */ OPERATION("operation"), + AWAITED("awaited"), SERVICE_ID("serviceId"), SERVER_CONNECTION_ID("serverConnectionId"), DRIVER_CONNECTION_ID("driverConnectionId"), diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java index 40d607bb70..aad3df381d 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java @@ -17,7 +17,6 @@ package com.mongodb.reactivestreams.client.unified; import org.junit.jupiter.params.provider.Arguments; - import java.util.Collection; final class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedReactiveStreamsTest { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java index 0d5729a678..e81ebc025d 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java @@ -495,7 +495,7 @@ public String toString() { + new BsonDocument("messages", expectedMessages).toJson(JsonWriterSettings.builder().indent(true).build()) + "\n" + " actualMessages=" + new BsonDocument("messages", new BsonArray(actualMessages.stream() - .map(LogMatcher::asDocument).collect(Collectors.toList()))) + .map(LogMatcher::logMessageAsDocument).collect(Collectors.toList()))) .toJson(JsonWriterSettings.builder().indent(true).build()) + "\n"; } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 4c80ec66a7..61a418ad3a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -336,12 +336,12 @@ public void waitForServerMonitorEvents(final String client, final Class e BsonDocument expectedEventContents = getEventContents(expectedEvent); try { serverMonitorListener.waitForEvents(expectedEventType, - event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(10)); + event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(15)); context.pop(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (TimeoutException e) { - fail(context.getMessage("Timed out waiting for server monitor events")); + fail(context.getMessage(e.getMessage())); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java index a4410262b7..b1c56e50fa 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -47,15 +48,20 @@ final class LogMatcher { this.context = context; } - void assertLogMessageEquality(final String client, final boolean ignoreExtraMessages, final BsonArray expectedMessages, + void assertLogMessageEquality(final String client, final BsonArray ignoreMessages, + final boolean ignoreExtraMessages, final BsonArray expectedMessages, final List actualMessages, final Iterable tweaks) { context.push(ContextElement.ofLogMessages(client, expectedMessages, actualMessages)); + List logMessages = actualMessages.stream() + .filter(logMessage -> !ignoreMessages.contains(logMessageAsIgnoreMessageDocument(logMessage))) + .collect(Collectors.toList()); + if (ignoreExtraMessages) { assertTrue(context.getMessage("Number of messages must be greater than or equal to the expected number of messages"), - actualMessages.size() >= expectedMessages.size()); + logMessages.size() >= expectedMessages.size()); } else { - assertEquals(context.getMessage("Number of log messages must be the same"), expectedMessages.size(), actualMessages.size()); + assertEquals(context.getMessage("Number of log messages must be the same"), expectedMessages.size(), logMessages.size()); } for (int i = 0; i < expectedMessages.size(); i++) { @@ -64,14 +70,22 @@ void assertLogMessageEquality(final String client, final boolean ignoreExtraMess expectedMessage = tweak.apply(expectedMessage); } if (expectedMessage != null) { - valueMatcher.assertValuesMatch(expectedMessage, asDocument(actualMessages.get(i))); + valueMatcher.assertValuesMatch(expectedMessage, logMessageAsDocument(logMessages.get(i))); } } context.pop(); } - static BsonDocument asDocument(final LogMessage message) { + private static BsonDocument logMessageAsIgnoreMessageDocument(final LogMessage message) { + BsonDocument document = new BsonDocument(); + document.put("level", new BsonString(message.getLevel().name().toLowerCase())); + document.put("component", new BsonString(message.getComponent().getValue())); + document.put("data", new BsonDocument("message", new BsonString(message.getMessageId()))); + return document; + } + + static BsonDocument logMessageAsDocument(final LogMessage message) { BsonDocument document = new BsonDocument(); document.put("component", new BsonString(message.getComponent().getValue())); document.put("level", new BsonString(message.getLevel().name().toLowerCase())); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 008d49a314..0be87ee341 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -481,10 +481,11 @@ private void compareLogMessages(final UnifiedTestContext rootContext, final Bson for (BsonValue cur : definition.getArray("expectLogMessages")) { BsonDocument curLogMessagesForClient = cur.asDocument(); boolean ignoreExtraMessages = curLogMessagesForClient.getBoolean("ignoreExtraMessages", BsonBoolean.FALSE).getValue(); + BsonArray ignoreMessages = curLogMessagesForClient.getArray("ignoreMessages", new BsonArray()); String clientId = curLogMessagesForClient.getString("client").getValue(); TestLoggingInterceptor loggingInterceptor = entities.getClientLoggingInterceptor(clientId); - rootContext.getLogMatcher().assertLogMessageEquality(clientId, ignoreExtraMessages, + rootContext.getLogMatcher().assertLogMessageEquality(clientId, ignoreMessages, ignoreExtraMessages, curLogMessagesForClient.getArray("messages"), loggingInterceptor.getMessages(), tweaks); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 0f6233c5f7..8f43b58b7d 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -233,11 +233,6 @@ public static void applyCustomizations(final TestDef def) { def.skipJira("https://jira.mongodb.org/browse/JAVA-5230") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=auto >=4.4") .test("server-discovery-and-monitoring", "serverMonitoringMode", "connect with serverMonitoringMode=stream >=4.4"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-4770") - .file("server-discovery-and-monitoring", "standalone-logging") - .file("server-discovery-and-monitoring", "replicaset-logging") - .file("server-discovery-and-monitoring", "sharded-logging") - .file("server-discovery-and-monitoring", "loadbalanced-logging"); def.skipJira("https://jira.mongodb.org/browse/JAVA-5564") .test("server-discovery-and-monitoring", "serverMonitoringMode", "poll waits after successful heartbeat"); def.skipJira("https://jira.mongodb.org/browse/JAVA-4536")