@@ -12,25 +12,89 @@ Kafka Sink Change Data Capture
1212 :depth: 1
1313 :class: singlecols
1414
15- Change Data Capture Mode
16- ------------------------
15+ Overview
16+ --------
1717
1818Change data capture (CDC) is an architecture that converts changes in
19- a database into event streams. The MongoDB Kafka sink connector can
20- process event streams using `Debezium <https://debezium.io/>`_ as an event
21- producer for the following source databases:
19+ a source database into event streams. You can capture CDC events with the
20+ MongoDB Kafka sink connector and perform corresponding insert, update, and
21+ delete operations to a destination MongoDB cluster.
2222
23- * `MongoDB <http://debezium.io/docs/connectors/mongodb/>`_
24- * `MySQL <http://debezium.io/docs/connectors/mysql/>`_
25- * `PostgreSQL <http://debezium.io/docs/connectors/postgresql/>`_
23+ You can also handle CDC using the following event producers:
2624
27- CDC Handler Configuration
28- -------------------------
25+ - :ref:`Debezium <cdc-debezium>`
26+ - Qlik Replicate (coming soon)
27+
28+ Change Data Capture Using the MongoDB Sink Connector
29+ ----------------------------------------------------
30+
31+ To configure your MongoDB Kafka sink connector to handle CDC events from
32+ a Kafka topic, update your configuration to include the following:
33+
34+ .. code-block:: properties
35+
36+ change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
37+
38+ The ``ChangeStreamHandler`` class instructs the sink connector to process
39+ change events that are in the :manual:`change stream response document format </reference/change-events/#change-stream-output>`.
40+ You can use a :doc:`MongoDB Kafka source connector </kafka-source>` to
41+ configure the change stream data that you want to publish to specific topics.
42+
43+ Remember to specify the topic and the destination in the following
44+ configuration properties:
45+
46+ - ``topics``
47+ - ``connection.uri``
48+ - ``database``
49+ - ``collection``
50+
51+ For more information on the properties above, see our guide on
52+ :doc:`Kafka Sink Connector Configuration Properties </kafka-sink-properties>`.
53+
54+
55+ ChangeStreamHandler Example
56+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
57+
58+ The following sample JSON payload instantiates a new connector that uses
59+ the ``ChangeStreamHandler`` with a specified CDC configuration when posted
60+ to the `Kafka Connect REST endpoint <https://docs.confluent.io/current/connect/references/restapi.html>`__:
61+
62+ .. code-block:: json
63+
64+ {
65+ "name": "mongo-sink-changestreamhandler-cdc",
66+ "config": {
67+ "connection.uri": "mongodb://<hostname>:27017/kafkaconnect?w=1&journal=true",
68+ "topics": "myreplset.kafkaconnect.mongosrc",
69+ "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
70+ "connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
71+ "key.converter": "io.confluent.connect.avro.AvroConverter",
72+ "key.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
73+ "value.converter": "io.confluent.connect.avro.AvroConverter",
74+ "value.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
75+ "collection": "mongosink"
76+ }
77+ }
78+
79+ .. include:: /includes/externalize-secrets.rst
80+
81+ .. _cdc-debezium:
82+
83+ Change Data Capture Using Debezium
84+ ----------------------------------
85+
86+ The MongoDB Kafka sink connector can also process event streams using
87+ `Debezium <https://debezium.io/>`__ as an event producer for the following
88+ source databases:
89+
90+ * `MongoDB <http://debezium.io/docs/connectors/mongodb/>`__
91+ * `MySQL <http://debezium.io/docs/connectors/mysql/>`__
92+ * `PostgreSQL <http://debezium.io/docs/connectors/postgresql/>`__
2993
3094You can configure the sink connector to process data from a CDC stream
3195using one of the included handlers for Debezium or a custom handler that
3296extends the abstract class `CdcHandler
33- <https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcHandler.java>`_ .
97+ <https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcHandler.java>`__ .
3498
3599To create a CDC handler, specify the following configuration information:
36100
@@ -40,31 +104,32 @@ To create a CDC handler, specify the following configuration information:
40104- Topics on which the connector should listen for data in the ``topics``
41105 property.
42106- MongoDB collection to write data to in the ``collection`` property.
43- - The :doc:`converters </kafka-sink-data-formats>` required to
44- handle data formats in the ``[key|value].converter`` properties. Both
45- JSON + Schema and AVRO formats are supported.
46- - Any :doc:`post processors </kafka-sink-postprocessors>` necessary
47- to modify the record before saving it to MongoDB.
48107
49- The following sample JSON payload instantiates a new connector with
50- a specified CDC configuration when posted to the `Kafka Connect REST endpoint
51- <https://docs.confluent.io/current/connect/references/restapi.html>`_:
108+ .. _cdc-debezium-example:
109+
110+ Debezium Example
111+ ~~~~~~~~~~~~~~~~
112+
113+ The following sample JSON payload instantiates a new connector using
114+ Debezium with a specified CDC configuration when posted to the
115+ `Kafka Connect REST endpoint <https://docs.confluent.io/current/connect/references/restapi.html>`__:
52116
53117.. code-block:: json
54118
55119 {
56120 "name": "mongo-sink-debezium-cdc",
57121 "config": {
58- "connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
122+ "connection.uri": "mongodb://< mongodb-hostname> :27017/kafkaconnect?w=1&journal=true",
59123 "topics": "myreplset.kafkaconnect.mongosrc",
60124 "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
61125 "connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
62126 "key.converter": "io.confluent.connect.avro.AvroConverter",
63- "key.converter.schema.registry.url": "http://localhost :8081",
127+ "key.converter.schema.registry.url": "http://<schema-registry-hostname> :8081",
64128 "value.converter": "io.confluent.connect.avro.AvroConverter",
65- "value.converter.schema.registry.url": "http://localhost :8081",
129+ "value.converter.schema.registry.url": "http://<schema-registry-hostname> :8081",
66130 "collection": "mongosink"
67131 }
68132 }
69133
70134.. include:: /includes/externalize-secrets.rst
135+
0 commit comments