1+ package org .example ;
2+
3+ import com .mongodb .ConnectionString ;
4+ import com .mongodb .MongoClientSettings ;
5+
6+ import com .mongodb .event .*;
7+ import com .mongodb .reactivestreams .client .*;
8+ import org .bson .Document ;
9+ import reactor .core .publisher .Flux ;
10+
11+ import java .util .HashMap ;
12+ import java .util .Map ;
13+
14+ public class Monitoring {
15+
16+ private static final String COLLECTION = "test_collection" ;
17+ private static final String DATABASE = "test_db" ;
18+ private static final ConnectionString URI = new ConnectionString ("<connection string URI>" );
19+
20+ public static void main (String [] args ) {
21+ Monitoring examples = new Monitoring ();
22+ System .out .println ("\n ---Command Event---\n " );
23+ examples .monitorCommandEvent ();
24+ System .out .println ("\n ---Cluster Event---\n " );
25+ examples .monitorClusterEvent ();
26+ System .out .println ("\n ---Connection Pool Event---\n " );
27+ examples .monitorConnectionPoolEvent ();
28+ }
29+
30+ private void monitorCommandEvent () {
31+ // start-monitor-command-example
32+ MongoClientSettings settings = MongoClientSettings .builder ()
33+ .applyConnectionString (URI )
34+ .addCommandListener (new CommandCounter ())
35+ .build ();
36+
37+ try (MongoClient mongoClient = MongoClients .create (settings )) {
38+ MongoDatabase database = mongoClient .getDatabase (DATABASE );
39+ MongoCollection <Document > collection = database .getCollection (COLLECTION );
40+
41+ // Run some commands to test the counter
42+ FindPublisher <Document > findPublisher1 = collection .find ();
43+ FindPublisher <Document > findPublisher2 = collection .find ();
44+ Flux .from (findPublisher1 ).blockLast ();
45+ Flux .from (findPublisher2 ).blockLast ();
46+ }
47+ // end-monitor-command-example
48+ }
49+
50+ private void monitorClusterEvent () {
51+ // start-monitor-cluster-example
52+ MongoClientSettings settings = MongoClientSettings .builder ()
53+ .applyConnectionString (URI )
54+ .applyToClusterSettings (builder ->
55+ builder .addClusterListener (new IsWritable ()))
56+ .build ();
57+
58+ try (MongoClient mongoClient = MongoClients .create (settings )) {
59+ MongoDatabase database = mongoClient .getDatabase (DATABASE );
60+ MongoCollection <Document > collection = database .getCollection (COLLECTION );
61+
62+ // Run a command to trigger a ClusterDescriptionChangedEvent
63+ FindPublisher <Document > findPublisher = collection .find ();
64+ Flux .from (findPublisher ).blockLast ();
65+ }
66+ // end-monitor-cluster-example
67+ }
68+
69+ private void monitorConnectionPoolEvent () {
70+ // start-monitor-connection-pool-example
71+ MongoClientSettings settings = MongoClientSettings .builder ()
72+ .applyConnectionString (URI )
73+ .applyToConnectionPoolSettings (builder ->
74+ builder .addConnectionPoolListener (new ConnectionPoolLibrarian ()))
75+ .build ();
76+
77+ try (MongoClient mongoClient = MongoClients .create (settings )) {
78+ MongoDatabase database = mongoClient .getDatabase (DATABASE );
79+ MongoCollection <Document > collection = database .getCollection (COLLECTION );
80+
81+ // Run a command to trigger connection pool events
82+ FindPublisher <Document > findPublisher = collection .find ();
83+ Flux .from (findPublisher ).blockLast ();
84+ }
85+ // end-monitor-connection-pool-example
86+ }
87+ }
88+
89+ // start-command-listener
90+ class CommandCounter implements CommandListener {
91+ private final Map <String , Integer > commands = new HashMap <String , Integer >();
92+
93+ @ Override
94+ public synchronized void commandSucceeded (final CommandSucceededEvent event ) {
95+ String commandName = event .getCommandName ();
96+ int count = commands .getOrDefault (commandName , 0 );
97+ commands .put (commandName , count + 1 );
98+ System .out .println (commands );
99+ }
100+
101+ @ Override
102+ public void commandFailed (final CommandFailedEvent event ) {
103+ System .out .printf ("Failed execution of command '%s' with id %s%n" ,
104+ event .getCommandName (),
105+ event .getRequestId ());
106+ }
107+ }
108+ // end-command-listener
109+
110+ // start-cluster-listener
111+ class IsWritable implements ClusterListener {
112+ private boolean isWritable ;
113+
114+ @ Override
115+ public synchronized void clusterDescriptionChanged (final ClusterDescriptionChangedEvent event ) {
116+ if (!isWritable ) {
117+ if (event .getNewDescription ().hasWritableServer ()) {
118+ isWritable = true ;
119+ System .out .println ("Able to write to server" );
120+ }
121+ } else if (!event .getNewDescription ().hasWritableServer ()) {
122+ isWritable = false ;
123+ System .out .println ("Unable to write to server" );
124+ }
125+ }
126+ }
127+ // end-cluster-listener
128+
129+ // start-connection-pool-listener
130+ class ConnectionPoolLibrarian implements ConnectionPoolListener {
131+ @ Override
132+ public void connectionCheckedOut (final ConnectionCheckedOutEvent event ) {
133+ System .out .printf ("Fetching the connection with id %s...%n" ,
134+ event .getConnectionId ().getLocalValue ());
135+ }
136+
137+ @ Override
138+ public void connectionCheckOutFailed (final ConnectionCheckOutFailedEvent event ) {
139+ System .out .println ("Something went wrong! Failed to checkout connection." );
140+ }
141+ }
142+ // end-connection-pool-listener
0 commit comments