Skip to content

Publish ClusterDescriptionChangedEvent on topology close #1738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
/**
* A cluster closed event.
*
* <p>This event is synonymous with TopologyClosedEvent</p>
*
* @since 3.3
*/
public final class ClusterClosedEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* An event signifying that the cluster description has changed.
*
* <p>This event is synonymous with TopologyDescriptionChangedEvent</p>
*
* @since 3.3
*/
public final class ClusterDescriptionChangedEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
/**
* A cluster opening event.
*
* <p>This event is synonymous with TopologyOpeningEvent</p>
*
* @since 3.3
*/
public final class ClusterOpeningEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import com.mongodb.event.ClusterClosedEvent;
import com.mongodb.event.ClusterDescriptionChangedEvent;
Expand All @@ -50,7 +49,6 @@
import com.mongodb.selector.CompositeServerSelector;
import com.mongodb.selector.ServerSelector;

import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
Expand All @@ -64,6 +62,7 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ClusterType.UNKNOWN;
import static com.mongodb.connection.ServerDescription.MAX_DRIVER_WIRE_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_SERVER_VERSION;
import static com.mongodb.connection.ServerDescription.MIN_DRIVER_WIRE_VERSION;
Expand All @@ -72,6 +71,7 @@
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION;
import static com.mongodb.internal.logging.LogMessage.Component.TOPOLOGY;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION_ID;
Expand All @@ -80,11 +80,16 @@
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_ID;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_NEW_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_PREVIOUS_DESCRIPTION;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static com.mongodb.internal.logging.LogMessage.Level.INFO;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_EXPIRED;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;
Expand All @@ -111,8 +116,10 @@ abstract class BaseCluster implements Cluster {
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
this.clusterListener = singleClusterListener(settings);
clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
ClusterOpeningEvent clusterOpeningEvent = new ClusterOpeningEvent(clusterId);
clusterListener.clusterOpening(clusterOpeningEvent);
logTopologyOpening(clusterId, clusterOpeningEvent);
description = new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(),
settings, serverFactory.getSettings());
}

Expand Down Expand Up @@ -210,7 +217,11 @@ public void close() {
if (!isClosed()) {
isClosed = true;
phase.get().countDown();
clusterListener.clusterClosed(new ClusterClosedEvent(clusterId));
fireChangeEvent(new ClusterDescription(settings.getMode(), UNKNOWN, emptyList(), settings, serverFactory.getSettings()),
description);
ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
clusterListener.clusterClosed(clusterClosedEvent);
logTopologyClosedEvent(clusterId, clusterClosedEvent);
stopWaitQueueHandler();
}
}
Expand All @@ -237,8 +248,9 @@ protected void updateDescription(final ClusterDescription newDescription) {
*/
protected void fireChangeEvent(final ClusterDescription newDescription, final ClusterDescription previousDescription) {
if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) {
clusterListener.clusterDescriptionChanged(
new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription));
ClusterDescriptionChangedEvent changedEvent = new ClusterDescriptionChangedEvent(getClusterId(), newDescription, previousDescription);
clusterListener.clusterDescriptionChanged(changedEvent);
logTopologyDescriptionChanged(getClusterId(), changedEvent);
}
}

Expand Down Expand Up @@ -619,4 +631,43 @@ static void logServerSelectionSucceeded(
+ " Selector: {}, topology description: {}"));
}
}

static void logTopologyOpening(
final ClusterId clusterId,
final ClusterOpeningEvent clusterOpeningEvent) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Starting topology monitoring", clusterId,
singletonList(new Entry(TOPOLOGY_ID, clusterId)),
"Starting monitoring for topology with ID {}"));
}
}

static void logTopologyDescriptionChanged(
final ClusterId clusterId,
final ClusterDescriptionChangedEvent clusterDescriptionChangedEvent) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Topology description changed", clusterId,
asList(
new Entry(TOPOLOGY_ID, clusterId),
new Entry(TOPOLOGY_PREVIOUS_DESCRIPTION,
clusterDescriptionChangedEvent.getPreviousDescription().getShortDescription()),
new Entry(TOPOLOGY_NEW_DESCRIPTION,
clusterDescriptionChangedEvent.getNewDescription().getShortDescription())),
"Description changed for topology with ID {}. Previous description: {}. New description: {}"));
}
}

static void logTopologyClosedEvent(
final ClusterId clusterId,
final ClusterClosedEvent clusterClosedEvent) {
if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) {
STRUCTURED_LOGGER.log(new LogMessage(
TOPOLOGY, DEBUG, "Stopped topology monitoring", clusterId,
singletonList(new Entry(TOPOLOGY_ID, clusterId)),
"Stopped monitoring for topology with ID {}"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionStarted;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionSucceeded;
import static com.mongodb.internal.connection.BaseCluster.logTopologyClosedEvent;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -272,7 +273,9 @@ public void close() {
if (localServer != null) {
localServer.close();
}
clusterListener.clusterClosed(new ClusterClosedEvent(clusterId));
ClusterClosedEvent clusterClosedEvent = new ClusterClosedEvent(clusterId);
clusterListener.clusterClosed(clusterClosedEvent);
logTopologyClosedEvent(clusterId, clusterClosedEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public final class LogMessage {
public enum Component {
COMMAND("command"),
CONNECTION("connection"),
SERVER_SELECTION("serverSelection");
SERVER_SELECTION("serverSelection"),
TOPOLOGY("topology");

private static final Map<String, Component> INDEX;

Expand Down Expand Up @@ -124,7 +125,10 @@ public enum Name {
WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"),
SELECTOR("selector"),
TOPOLOGY_DESCRIPTION("topologyDescription"),
REMAINING_TIME_MS("remainingTimeMS");
REMAINING_TIME_MS("remainingTimeMS"),
TOPOLOGY_ID("topologyId"),
TOPOLOGY_PREVIOUS_DESCRIPTION("previousDescription"),
TOPOLOGY_NEW_DESCRIPTION("newDescription");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -42,6 +44,7 @@ public final class TestClusterListener implements ClusterListener {
private final ArrayList<ClusterDescriptionChangedEvent> clusterDescriptionChangedEvents = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition newClusterDescriptionChangedEventCondition = lock.newCondition();
private final CountDownLatch closedLatch = new CountDownLatch(1);

@Override
public void clusterOpening(final ClusterOpeningEvent event) {
Expand All @@ -52,6 +55,7 @@ public void clusterOpening(final ClusterOpeningEvent event) {
@Override
public void clusterClosed(final ClusterClosedEvent event) {
isTrue("clusterClosingEvent is null", clusterClosingEvent == null);
closedLatch.countDown();
clusterClosingEvent = event;
}

Expand Down Expand Up @@ -110,6 +114,17 @@ public void waitForClusterDescriptionChangedEvents(
}
}

/**
* Waits for the cluster to be closed, which is signaled by a {@link ClusterClosedEvent}.
*/
public void waitForClusterClosedEvent(final Duration duration)
throws InterruptedException, TimeoutException {
boolean await = closedLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
if (!await) {
throw new TimeoutException("Timed out waiting for cluster to close");
}
}

/**
* Must be guarded by {@link #lock}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package com.mongodb.client.unified;

import com.mongodb.MongoNamespace;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.event.ClusterClosedEvent;
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
Expand Down Expand Up @@ -130,6 +135,47 @@ public static ContextElement ofClusterDescriptionChangedEventCount(final String
return new EventCountContext("Cluster Description Changed Event Count", client, event, count);
}

public static ContextElement ofWaitForClusterClosedEvent(final String client) {
return new ContextElement() {
@Override
public String toString() {
return "Event MatchingContext\n"
+ " client: " + client + "\n"
+ " expected event: ClusterClosedEvent\n";
}
};
}

public static ContextElement ofTopologyEvents(final String client, final BsonArray expectedEvents,
final List<?> actualEvents) {
return new ContextElement() {
@Override
public String toString() {
return "Events MatchingContext: \n"
+ " client: '" + client + "'\n"
+ " Expected events:\n"
+ new BsonDocument("events", expectedEvents).toJson(JsonWriterSettings.builder().indent(true).build()) + "\n"
+ " Actual events:\n"
+ new BsonDocument("events",
new BsonArray(actualEvents.stream().map(ContextElement::topologyEventToDocument).collect(Collectors.toList())))
.toJson(JsonWriterSettings.builder().indent(true).build())
+ "\n";
}
};
}

public static ContextElement ofTopologyEvent(final BsonDocument expected, final Object actual, final int eventPosition) {
return new ContextElement() {
@Override
public String toString() {
return "Event Matching Context\n"
+ " event position: " + eventPosition + "\n"
+ " expected event: " + expected + "\n"
+ " actual event: " + topologyEventToDocument(actual) + "\n";
}
};
}

public static ContextElement ofWaitForServerMonitorEvents(final String client, final BsonDocument event, final int count) {
return new EventCountContext("Wait For Server Monitor Events", client, event, count);
}
Expand All @@ -152,11 +198,6 @@ public String toString() {
.toJson(JsonWriterSettings.builder().indent(true).build())
+ "\n";
}

private BsonDocument toDocument(final Object event) {
return new BsonDocument(EventMatcher.getEventType(event.getClass()),
new BsonDocument("awaited", BsonBoolean.valueOf(EventMatcher.getAwaitedFromServerMonitorEvent(event))));
}
};
}

Expand Down Expand Up @@ -467,4 +508,37 @@ private static BsonDocument serverMonitorEventToDocument(final Object event) {
return new BsonDocument(EventMatcher.getEventType(event.getClass()),
new BsonDocument("awaited", BsonBoolean.valueOf(EventMatcher.getAwaitedFromServerMonitorEvent(event))));
}

static BsonDocument topologyEventToDocument(final Object event) {
if (event != null && !(event instanceof ClusterOpeningEvent || event instanceof ClusterDescriptionChangedEvent || event instanceof ClusterClosedEvent)) {
throw new UnsupportedOperationException("Unsupported topology event: " + event.getClass().getName());
}
BsonDocument eventDocument = new BsonDocument();
if (event instanceof ClusterDescriptionChangedEvent) {
ClusterDescriptionChangedEvent changedEvent = (ClusterDescriptionChangedEvent) event;
eventDocument.put("previousDescription",
new BsonDocument("type", new BsonString(clusterDescriptionToString(changedEvent.getPreviousDescription()))));
eventDocument.put("newDescription",
new BsonDocument("type", new BsonString(clusterDescriptionToString(changedEvent.getNewDescription()))));
}
return new BsonDocument(EventMatcher.getEventType(event.getClass()), eventDocument);
}

static String clusterDescriptionToString(final ClusterDescription clusterDescription) {
switch (clusterDescription.getType()) {
case STANDALONE:
return "Single";
case REPLICA_SET:
return clusterDescription.getServerDescriptions().stream()
.anyMatch(ServerDescription::isPrimary) ? "ReplicaSetWithPrimary" : "ReplicaSetNoPrimary";
case SHARDED:
return "Sharded";
case LOAD_BALANCED:
return "LoadBalancer";
case UNKNOWN:
return "Unknown";
default:
throw new UnsupportedOperationException("Unexpected value: " + clusterDescription.getShortDescription());
}
}
}
Loading