+ *
* @since 3.3
*/
public final class ClusterClosedEvent {
diff --git a/driver-core/src/main/com/mongodb/event/ClusterDescriptionChangedEvent.java b/driver-core/src/main/com/mongodb/event/ClusterDescriptionChangedEvent.java
index 49debf5438a..c92dc864826 100644
--- a/driver-core/src/main/com/mongodb/event/ClusterDescriptionChangedEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ClusterDescriptionChangedEvent.java
@@ -24,6 +24,8 @@
/**
* An event signifying that the cluster description has changed.
*
+ *
This event is synonymous with TopologyDescriptionChangedEvent
+ *
* @since 3.3
*/
public final class ClusterDescriptionChangedEvent {
diff --git a/driver-core/src/main/com/mongodb/event/ClusterOpeningEvent.java b/driver-core/src/main/com/mongodb/event/ClusterOpeningEvent.java
index d4a6aac2239..55c9b4ee82e 100644
--- a/driver-core/src/main/com/mongodb/event/ClusterOpeningEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ClusterOpeningEvent.java
@@ -23,6 +23,8 @@
/**
* A cluster opening event.
*
+ *
This event is synonymous with TopologyOpeningEvent
+ *
* @since 3.3
*/
public final class ClusterOpeningEvent {
diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java
index df3e4d1c1fe..8cdc9951293 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java
@@ -27,7 +27,6 @@
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ClusterSettings;
-import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import com.mongodb.event.ClusterClosedEvent;
import com.mongodb.event.ClusterDescriptionChangedEvent;
@@ -50,7 +49,6 @@
import com.mongodb.selector.CompositeServerSelector;
import com.mongodb.selector.ServerSelector;
-import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -64,6 +62,7 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
+import static com.mongodb.connection.ClusterType.UNKNOWN;
import static com.mongodb.connection.ServerDescription.MAX_DRIVER_WIRE_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_SERVER_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION;
@@ -72,6 +71,7 @@
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION;
+import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION_ID;
@@ -80,11 +80,16 @@
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_DESCRIPTION;
+import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID;
+import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_NEW_DESCRIPTION;
+import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_PREVIOUS_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static com.mongodb.internal.logging.LogMessage.Level.INFO;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_EXPIRED;
import static java.lang.String.format;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;
@@ -111,8 +116,10 @@ abstract class BaseCluster implements Cluster {
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
this.clusterListener = singleClusterListener(settings);
- clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
- description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
+ ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId);
+ clusterListener.clusterOpening(clusterOpeningEvent);
+ logTopologyOpening(clusterId, clusterOpeningEvent);
+ description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(),
settings, serverFactory.getSettings());
}
@@ -210,7 +217,11 @@ public void close() {
if (!isClosed()) {
isClosed = true;
phase.get().countDown();
- clusterListener.clusterClosed(new ClusterClosedEvent(clusterId));
+ fireChangeEvent(new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()),
+ description);
+ ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
+ clusterListener.clusterClosed(clusterClosedEvent);
+ logTopologyClosedEvent(clusterId, clusterClosedEvent);
stopWaitQueueHandler();
}
}
@@ -237,8 +248,9 @@ protected void updateDescription(final ClusterDescription newDescription) {
*/
protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) {
if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) {
- clusterListener.clusterDescriptionChanged(
- new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription));
+ ClusterDescriptionChangedEvent changedEvent = new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription);
+ clusterListener.clusterDescriptionChanged(changedEvent);
+ logTopologyDescriptionChanged(getClusterId(), changedEvent);
}
}
@@ -619,4 +631,43 @@ static void logServerSelectionSucceeded(
+ " Selector: {}, topology description: {}"));
}
}
+
+ static void logTopologyOpening(
+ final ClusterId clusterId,
+ final ClusterOpeningEvent clusterOpeningEvent) {
+ if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
+ STRUCTURED_LOGGER.log(new LogMessage(
+ TOPOLOGY, DEBUG, "Starting topology monitoring", clusterId,
+ singletonList(new Entry(TOPOLOGY_ID, clusterId)),
+ "Starting monitoring for topology with ID {}"));
+ }
+ }
+
+ static void logTopologyDescriptionChanged(
+ final ClusterId clusterId,
+ final ClusterDescriptionChangedEvent clusterDescriptionChangedEvent) {
+ if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
+ STRUCTURED_LOGGER.log(new LogMessage(
+ TOPOLOGY, DEBUG, "Topology description changed", clusterId,
+ asList(
+ new Entry(TOPOLOGY_ID, clusterId),
+ new Entry(TOPOLOGY_PREVIOUS_DESCRIPTION,
+ clusterDescriptionChangedEvent.getPreviousDescription().getShortDescription()),
+ new Entry(TOPOLOGY_NEW_DESCRIPTION,
+ clusterDescriptionChangedEvent.getNewDescription().getShortDescription())),
+ "Description changed for topology with ID {}. Previous description: {}. New description: {}"));
+ }
+ }
+
+ static void logTopologyClosedEvent(
+ final ClusterId clusterId,
+ final ClusterClosedEvent clusterClosedEvent) {
+ if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
+ STRUCTURED_LOGGER.log(new LogMessage(
+ TOPOLOGY, DEBUG, "Stopped topology monitoring", clusterId,
+ singletonList(new Entry(TOPOLOGY_ID, clusterId)),
+ "Stopped monitoring for topology with ID {}"));
+ }
+ }
+
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java
index ba47236cf4f..9eac751943c 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java
@@ -63,6 +63,7 @@
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionStarted;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionSucceeded;
+import static com.mongodb.internal.connection.BaseCluster.logTopologyClosedEvent;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
@@ -272,7 +273,9 @@ public void close() {
if (localServer != null) {
localServer.close();
}
- clusterListener.clusterClosed(new ClusterClosedEvent(clusterId));
+ ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
+ clusterListener.clusterClosed(clusterClosedEvent);
+ logTopologyClosedEvent(clusterId, clusterClosedEvent);
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java
index 214e58b9d59..cfd97f713e2 100644
--- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java
+++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java
@@ -48,7 +48,8 @@ public final class LogMessage {
public enum Component {
COMMAND("command"),
CONNECTION("connection"),
- SERVER_SELECTION("serverSelection");
+ SERVER_SELECTION("serverSelection"),
+ TOPOLOGY("topology");
private static final Map INDEX;
@@ -124,7 +125,10 @@ public enum Name {
WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"),
SELECTOR("selector"),
TOPOLOGY_DESCRIPTION("topologyDescription"),
- REMAINING_TIME_MS("remainingTimeMS");
+ REMAINING_TIME_MS("remainingTimeMS"),
+ TOPOLOGY_ID("topologyId"),
+ TOPOLOGY_PREVIOUS_DESCRIPTION("previousDescription"),
+ TOPOLOGY_NEW_DESCRIPTION("newDescription");
private final String value;
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java
index 89ca0088a77..7a11b360046 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java
@@ -25,6 +25,8 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -42,6 +44,7 @@ public final class TestClusterListener implements ClusterListener {
private final ArrayList clusterDescriptionChangedEvents = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition newClusterDescriptionChangedEventCondition = lock.newCondition();
+ private final CountDownLatch closedLatch = new CountDownLatch(1);
@Override
public void clusterOpening(final ClusterOpeningEvent event) {
@@ -52,6 +55,7 @@ public void clusterOpening(final ClusterOpeningEvent event) {
@Override
public void clusterClosed(final ClusterClosedEvent event) {
isTrue("clusterClosingEvent is null", clusterClosingEvent == null);
+ closedLatch.countDown();
clusterClosingEvent = event;
}
@@ -110,6 +114,17 @@ public void waitForClusterDescriptionChangedEvents(
}
}
+ /**
+ * Waits for the cluster to be closed, which is signaled by a {@link ClusterClosedEvent}.
+ */
+ public void waitForClusterClosedEvent(final Duration duration)
+ throws InterruptedException, TimeoutException {
+ boolean await = closedLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
+ if (!await) {
+ throw new TimeoutException("Timed out waiting for cluster to close");
+ }
+ }
+
/**
* Must be guarded by {@link #lock}.
*/
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java
index 403d112e3a5..0d5729a6781 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java
@@ -17,6 +17,11 @@
package com.mongodb.client.unified;
import com.mongodb.MongoNamespace;
+import com.mongodb.connection.ClusterDescription;
+import com.mongodb.connection.ServerDescription;
+import com.mongodb.event.ClusterClosedEvent;
+import com.mongodb.event.ClusterDescriptionChangedEvent;
+import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
@@ -130,6 +135,47 @@ public static ContextElement ofClusterDescriptionChangedEventCount(final String
return new EventCountContext("Cluster Description Changed Event Count", client, event, count);
}
+ public static ContextElement ofWaitForClusterClosedEvent(final String client) {
+ return new ContextElement() {
+ @Override
+ public String toString() {
+ return "Event MatchingContext\n"
+ + " client: " + client + "\n"
+ + " expected event: ClusterClosedEvent\n";
+ }
+ };
+ }
+
+ public static ContextElement ofTopologyEvents(final String client, final BsonArray expectedEvents,
+ final List> actualEvents) {
+ return new ContextElement() {
+ @Override
+ public String toString() {
+ return "Events MatchingContext: \n"
+ + " client: '" + client + "'\n"
+ + " Expected events:\n"
+ + new BsonDocument("events", expectedEvents).toJson(JsonWriterSettings.builder().indent(true).build()) + "\n"
+ + " Actual events:\n"
+ + new BsonDocument("events",
+ new BsonArray(actualEvents.stream().map(ContextElement::topologyEventToDocument).collect(Collectors.toList())))
+ .toJson(JsonWriterSettings.builder().indent(true).build())
+ + "\n";
+ }
+ };
+ }
+
+ public static ContextElement ofTopologyEvent(final BsonDocument expected, final Object actual, final int eventPosition) {
+ return new ContextElement() {
+ @Override
+ public String toString() {
+ return "Event Matching Context\n"
+ + " event position: " + eventPosition + "\n"
+ + " expected event: " + expected + "\n"
+ + " actual event: " + topologyEventToDocument(actual) + "\n";
+ }
+ };
+ }
+
public static ContextElement ofWaitForServerMonitorEvents(final String client, final BsonDocument event, final int count) {
return new EventCountContext("Wait For Server Monitor Events", client, event, count);
}
@@ -152,11 +198,6 @@ public String toString() {
.toJson(JsonWriterSettings.builder().indent(true).build())
+ "\n";
}
-
- private BsonDocument toDocument(final Object event) {
- return new BsonDocument(EventMatcher.getEventType(event.getClass()),
- new BsonDocument("awaited", BsonBoolean.valueOf(EventMatcher.getAwaitedFromServerMonitorEvent(event))));
- }
};
}
@@ -467,4 +508,37 @@ private static BsonDocument serverMonitorEventToDocument(final Object event) {
return new BsonDocument(EventMatcher.getEventType(event.getClass()),
new BsonDocument("awaited", BsonBoolean.valueOf(EventMatcher.getAwaitedFromServerMonitorEvent(event))));
}
+
+ static BsonDocument topologyEventToDocument(final Object event) {
+ if (event != null && !(event instanceof ClusterOpeningEvent || event instanceof ClusterDescriptionChangedEvent || event instanceof ClusterClosedEvent)) {
+ throw new UnsupportedOperationException("Unsupported topology event: " + event.getClass().getName());
+ }
+ BsonDocument eventDocument = new BsonDocument();
+ if (event instanceof ClusterDescriptionChangedEvent) {
+ ClusterDescriptionChangedEvent changedEvent = (ClusterDescriptionChangedEvent) event;
+ eventDocument.put("previousDescription",
+ new BsonDocument("type", new BsonString(clusterDescriptionToString(changedEvent.getPreviousDescription()))));
+ eventDocument.put("newDescription",
+ new BsonDocument("type", new BsonString(clusterDescriptionToString(changedEvent.getNewDescription()))));
+ }
+ return new BsonDocument(EventMatcher.getEventType(event.getClass()), eventDocument);
+ }
+
+ static String clusterDescriptionToString(final ClusterDescription clusterDescription) {
+ switch (clusterDescription.getType()) {
+ case STANDALONE:
+ return "Single";
+ case REPLICA_SET:
+ return clusterDescription.getServerDescriptions().stream()
+ .anyMatch(ServerDescription::isPrimary) ? "ReplicaSetWithPrimary" : "ReplicaSetNoPrimary";
+ case SHARDED:
+ return "Sharded";
+ case LOAD_BALANCED:
+ return "LoadBalancer";
+ case UNKNOWN:
+ return "Unknown";
+ default:
+ throw new UnsupportedOperationException("Unexpected value: " + clusterDescription.getShortDescription());
+ }
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
index 41ada275a67..4c80ec66a78 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
@@ -18,7 +18,9 @@
import com.mongodb.assertions.Assertions;
import com.mongodb.connection.ServerType;
+import com.mongodb.event.ClusterClosedEvent;
import com.mongodb.event.ClusterDescriptionChangedEvent;
+import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
@@ -49,6 +51,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static com.mongodb.client.unified.ContextElement.clusterDescriptionToString;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
@@ -272,7 +275,19 @@ public void waitForClusterDescriptionChangedEvents(final String client, final Bs
BsonDocument expectedEventContents = getEventContents(expectedEvent);
try {
clusterListener.waitForClusterDescriptionChangedEvents(
- event -> clusterDescriptionChangedEventMatches(expectedEventContents, event), count, Duration.ofSeconds(10));
+ event -> clusterDescriptionChangedEventMatches(expectedEventContents, event, context), count, Duration.ofSeconds(10));
+ context.pop();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ fail(context.getMessage("Timed out waiting for cluster description changed events"));
+ }
+ }
+
+ public void waitForClusterClosedEvent(final String client, final TestClusterListener clusterListener) {
+ context.push(ContextElement.ofWaitForClusterClosedEvent(client));
+ try {
+ clusterListener.waitForClusterClosedEvent(Duration.ofSeconds(10));
context.pop();
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -285,11 +300,36 @@ public void assertClusterDescriptionChangeEventCount(final String client, final
final List events) {
BsonDocument expectedEventContents = getEventContents(expectedEvent);
context.push(ContextElement.ofClusterDescriptionChangedEventCount(client, expectedEvent, count));
- long matchCount = events.stream().filter(event -> clusterDescriptionChangedEventMatches(expectedEventContents, event)).count();
+ long matchCount =
+ events.stream().filter(event -> clusterDescriptionChangedEventMatches(expectedEventContents, event, context)).count();
assertEquals(context.getMessage("Expected cluster description changed event counts to match"), count, matchCount);
context.pop();
}
+ public void assertTopologyEventsEquality(
+ final String client,
+ final boolean ignoreExtraEvents,
+ final BsonArray expectedEventDocuments,
+ final List> events) {
+ context.push(ContextElement.ofTopologyEvents(client, expectedEventDocuments, events));
+ if (ignoreExtraEvents) {
+ assertTrue(context.getMessage("Number of events must be greater than or equal to the expected number of events"),
+ events.size() >= expectedEventDocuments.size());
+ } else {
+ assertEquals(context.getMessage("Number of events must be the same"), expectedEventDocuments.size(), events.size());
+ }
+ for (int i = 0; i < expectedEventDocuments.size(); i++) {
+ Object actualEvent = events.get(i);
+ BsonDocument expectedEventDocument = expectedEventDocuments.get(i).asDocument();
+ String expectedEventType = expectedEventDocument.getFirstKey();
+ context.push(ContextElement.ofTopologyEvent(expectedEventDocument, actualEvent, i));
+ assertEquals(context.getMessage("Expected event type to match"), expectedEventType, getEventType(actualEvent.getClass()));
+ assertTopologyEventEquality(expectedEventType, expectedEventDocument, actualEvent, context);
+ context.pop();
+ }
+ context.pop();
+ }
+
public void waitForServerMonitorEvents(final String client, final Class expectedEventType, final BsonDocument expectedEvent,
final int count, final TestServerMonitorListener serverMonitorListener) {
context.push(ContextElement.ofWaitForServerMonitorEvents(client, expectedEvent, count));
@@ -354,6 +394,7 @@ private BsonDocument getEventContents(final BsonDocument expectedEvent) {
if (expectedEventContents.isEmpty()) {
return expectedEventContents;
}
+
HashSet emptyEventTypes = new HashSet<>(singleton("topologyDescriptionChangedEvent"));
if (emptyEventTypes.contains(expectedEventType)) {
throw new UnsupportedOperationException("Contents of " + expectedEventType + " must be empty");
@@ -383,7 +424,7 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e
}
private static boolean clusterDescriptionChangedEventMatches(final BsonDocument expectedEventContents,
- final ClusterDescriptionChangedEvent event) {
+ final ClusterDescriptionChangedEvent event, @Nullable final AssertionContext context) {
if (!expectedEventContents.isEmpty()) {
throw new UnsupportedOperationException(
"Contents of " + ClusterDescriptionChangedEvent.class.getSimpleName() + " must be empty");
@@ -391,6 +432,42 @@ private static boolean clusterDescriptionChangedEventMatches(final BsonDocument
return true;
}
+ private static void assertTopologyEventEquality(
+ final String expectedEventType,
+ final BsonDocument expectedEventDocument,
+ final T actualEvent,
+ final AssertionContext context) {
+
+ switch (expectedEventType) {
+ case "topologyOpeningEvent":
+ assertTrue(context.getMessage("Expected ClusterOpeningEvent"), actualEvent instanceof ClusterOpeningEvent);
+ break;
+ case "topologyClosedEvent":
+ assertTrue(context.getMessage("Expected ClusterClosedEvent"), actualEvent instanceof ClusterClosedEvent);
+ break;
+ case "topologyDescriptionChangedEvent":
+ assertTrue(context.getMessage("Expected ClusterDescriptionChangedEvent"), actualEvent instanceof ClusterDescriptionChangedEvent);
+ ClusterDescriptionChangedEvent event = (ClusterDescriptionChangedEvent) actualEvent;
+ BsonDocument topologyChangeDocument = expectedEventDocument.getDocument(expectedEventType, new BsonDocument());
+
+ if (!topologyChangeDocument.isEmpty()) {
+ if (topologyChangeDocument.containsKey("previousDescription")) {
+ String previousDescription = topologyChangeDocument.getDocument("previousDescription").getString("type").getValue();
+ assertEquals(context.getMessage("Expected ClusterDescriptionChangedEvent with previousDescription: " + previousDescription),
+ previousDescription, clusterDescriptionToString(event.getPreviousDescription()));
+ }
+ if (topologyChangeDocument.containsKey("newDescription")) {
+ String newDescription = topologyChangeDocument.getDocument("newDescription").getString("type").getValue();
+ assertEquals(context.getMessage("Expected ClusterDescriptionChangedEvent with newDescription: " + newDescription),
+ newDescription, clusterDescriptionToString(event.getNewDescription()));
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported topology event: " + expectedEventType);
+ }
+ }
+
/**
* @param context Not {@code null} iff mismatch must result in an error, that is, this method works as an assertion.
*/
@@ -427,7 +504,9 @@ static boolean getAwaitedFromServerMonitorEvent(final Object event) {
static String getEventType(final Class> eventClass) {
String eventClassName = eventClass.getSimpleName();
- if (eventClassName.startsWith("ConnectionPool")) {
+ if (eventClassName.startsWith("Cluster")) {
+ return eventClassName.replace("Cluster", "topology");
+ } else if (eventClassName.startsWith("ConnectionPool")) {
return eventClassName.replace("ConnectionPool", "pool");
} else if (eventClassName.startsWith("Connection")) {
return eventClassName.replace("Connection", "connection");
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
index 11686d45410..83a3e75d956 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
@@ -17,7 +17,6 @@
package com.mongodb.client.unified;
import org.junit.jupiter.params.provider.Arguments;
-
import java.util.Collection;
public final class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedSyncTest {
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index 84eb40b4e29..b47f396f535 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -36,6 +36,7 @@
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.TestServerMonitorListener;
+import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.internal.logging.LogMessage;
@@ -69,6 +70,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -88,6 +90,7 @@
import static com.mongodb.client.unified.UnifiedTestModifications.applyCustomizations;
import static com.mongodb.client.unified.UnifiedTestModifications.testDef;
import static java.lang.String.format;
+import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -114,6 +117,10 @@ public abstract class UnifiedTest {
.map(Integer::parseInt)
.collect(Collectors.toList());
+ private static final String TOPOLOGY_CLOSED_EVENT = "topologyClosedEvent";
+ private static final List TOPOLOGY_EVENT_NAMES = asList("topologyOpeningEvent", "topologyDescriptionChangedEvent",
+ TOPOLOGY_CLOSED_EVENT);
+
public static final int RETRY_ATTEMPTS = 3;
public static final int FORCE_FLAKY_ATTEMPTS = 10;
private static final Set ATTEMPTED_TESTS_TO_HENCEFORTH_IGNORE = new HashSet<>();
@@ -414,8 +421,34 @@ private void compareEvents(final UnifiedTestContext context, final BsonDocument
context.getEventMatcher().assertConnectionPoolEventsEquality(client, ignoreExtraEvents, expectedEvents,
listener.getEvents());
} else if (eventType.equals("sdam")) {
- TestServerMonitorListener listener = entities.getServerMonitorListener(client);
- context.getEventMatcher().assertServerMonitorEventsEquality(client, ignoreExtraEvents, expectedEvents, listener.getEvents());
+
+ // SDAM tests also include topology events, so we need to separate them to be able to assert them separately.
+ // Partition the expected events into two lists with the key being if it's a topology based event or not.
+ Map> partitionedEventsMap = expectedEvents.stream()
+ .map(BsonValue::asDocument)
+ .collect(Collectors.partitioningBy(doc -> TOPOLOGY_EVENT_NAMES.stream().anyMatch(doc::containsKey)));
+
+ BsonArray expectedTopologyEvents = new BsonArray(partitionedEventsMap.get(true));
+ if (!expectedTopologyEvents.isEmpty()) {
+ TestClusterListener clusterListener = entities.getClusterListener(client);
+ // Unfortunately, some tests expect the cluster to be closed, but do not define it as a waitForEvent in the spec -
+ // causing a race condition in the test.
+ if (expectedTopologyEvents.stream().anyMatch(doc -> doc.asDocument().containsKey(TOPOLOGY_CLOSED_EVENT))) {
+ context.getEventMatcher().waitForClusterClosedEvent(client, clusterListener);
+ }
+
+ List