Skip to content

Commit 7e62acf

Browse files
Chris Choschmalliso
authored andcommitted
DOCSP-12653: add additional source params (#61)
* DOCSP-12653: add new settings for source connector
1 parent 4b91947 commit 7e62acf

File tree

1 file changed

+112
-3
lines changed

1 file changed

+112
-3
lines changed

source/kafka-source.txt

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,19 +126,21 @@ an example source connector configuration file, see
126126
* - database
127127
- string
128128
- | Name of the database to watch for changes. If not set, all databases are watched.
129+
|
129130
| **Default**: ""
130131
| **Accepted Values**: A single database name
131132

132133
* - collection
133134
- string
134-
- | Name of the collection in the database to watch for changes.
135-
| The collection in the database to watch. If not set then all collections will be watched.
135+
- | Name of the collection in the database to watch for changes. If not set then all collections will be watched.
136+
|
136137
| **Default**: ""
137138
| **Accepted Values**: A single collection name
138139

139140
* - publish.full.document.only
140141
- boolean
141142
- | Only publish the changed document instead of the full change stream document. Sets the ``change.stream.full.document=updateLookup`` automatically so updated documents will be included.
143+
|
142144
| **Default**: false
143145
| **Accepted Values**: ``true`` or ``false``
144146

@@ -162,58 +164,131 @@ an example source connector configuration file, see
162164
* - collation
163165
- string
164166
- | A JSON :manual:`collation document </reference/collation/#collation-document>` that contains options to use for the change stream. Append ``.asDocument().toJson()`` to the collation document to create the JSON representation.
167+
|
165168
| **Default**: ""
166169
| **Accepted Values**: A valid JSON document representing a collection
167170

168171
* - output.format.key
169172
- string
170173
- | Determines which data format the source connector outputs for the key document.
174+
|
171175
| **Default**: ``json``
172176
| **Accepted Values**: ``bson``, ``json``, ``schema``
173177

174178
* - output.format.value
175179
- string
176180
- | Determines which data format the source connector outputs for the value document.
181+
|
177182
| **Default**: ``json``
178183
| **Accepted Values**: ``bson``, ``json``, ``schema``
179184

180185
* - output.json.formatter
181186
- string
182187
- | Full class name of the JSON formatter.
188+
|
183189
| **Default**: ``com.mongodb.kafka.connect.source.json.formatter.ExtendedJson``
184-
| **Accepted Values**:
190+
| **Accepted Values**:
185191
| - ``com.mongodb.kafka.connect.source.json.formatter.DefaultJson``
186192
| - ``com.mongodb.kafka.connect.source.json.formatter.ExtendedJson``
187193
| - ``com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson``
188194
| - Or other user-provided class name
189195

196+
* - output.schema.key
197+
- string
198+
- | The `Avro schema <https://avro.apache.org/docs/current/spec.html#schemas>`__ definition for the key document of the SourceRecord.
199+
| **Default**:
200+
201+
.. code-block:: json
202+
203+
{
204+
"type": "record",
205+
"name": "keySchema",
206+
"fields" : [ { "name": "_id", "type": "string" } ]"
207+
}
208+
209+
| **Accepted Values**: A valid JSON object
210+
211+
* - output.schema.value
212+
- string
213+
- | The `Avro schema <https://avro.apache.org/docs/current/spec.html#schemas>`__ definition for the value document of the SourceRecord.
214+
|
215+
| **Default**:
216+
217+
.. code-block:: json
218+
219+
{
220+
"name": "ChangeStream",
221+
"type": "record",
222+
"fields": [
223+
{ "name": "_id", "type": "string" },
224+
{ "name": "operationType", "type": ["string", "null"] },
225+
{ "name": "fullDocument", "type": ["string", "null"] },
226+
{ "name": "ns",
227+
"type": [{"name": "ns", "type": "record", "fields": [
228+
{"name": "db", "type": "string"},
229+
{"name": "coll", "type": ["string", "null"] } ]
230+
}, "null" ] },
231+
{ "name": "to",
232+
"type": [{"name": "to", "type": "record", "fields": [
233+
{"name": "db", "type": "string"},
234+
{"name": "coll", "type": ["string", "null"] } ]
235+
}, "null" ] },
236+
{ "name": "documentKey", "type": ["string", "null"] },
237+
{ "name": "updateDescription",
238+
"type": [{"name": "updateDescription", "type": "record", "fields": [
239+
{"name": "updatedFields", "type": ["string", "null"]},
240+
{"name": "removedFields",
241+
"type": [{"type": "array", "items": "string"}, "null"]
242+
}] }, "null"] },
243+
{ "name": "clusterTime", "type": ["string", "null"] },
244+
{ "name": "txnNumber", "type": ["long", "null"]},
245+
{ "name": "lsid", "type": [{"name": "lsid", "type": "record",
246+
"fields": [ {"name": "id", "type": "string"},
247+
{"name": "uid", "type": "string"}] }, "null"] }
248+
]
249+
}
250+
251+
| **Accepted Values**: A valid JSON object
252+
190253
* - output.schema.infer.value
191254
- boolean
192255
- | Whether the connector should infer the schema for the value. Since each document is processed in isolation, multiple schemas may result. Only valid when ``schema`` is specified in the ``output.format.value`` setting.
256+
|
193257
| **Default**: ``false``
194258
| **Accepted Values**: ``true`` or ``false``
195259

260+
* - offset.partition.name
261+
- string
262+
- | Custom partition name to use in which to store the offset values. The offset value stores information on where to resume processing if there is an issue that requires you to restart the connector. By choosing a new partition name, you can start processing without using a resume token. This can make it easier to restart the connector without reconfiguring the Kafka Connect service or manually deleting the old offset. The offset partition is automatically created if it does not exist.
263+
|
264+
| **Default**: ""
265+
| **Accepted Values**: A string
266+
196267
* - batch.size
197268
- int
198269
- | The cursor batch size.
270+
|
199271
| **Default**: 0
200272
| **Accepted Values**: An integer
201273

202274
* - change.stream.full.document
203275
- string
204276
- | Determines what to return for update operations when using a Change Stream. When set to 'updateLookup', the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from *some point in time* after the change occurred.
277+
|
205278
| **Default**: ""
206279
| **Accepted Values**: "" or ``default`` or ``updateLookup``
207280

208281
* - poll.await.time.ms
209282
- long
210283
- | The amount of time to wait before checking for new results on the change stream
284+
|
211285
| **Default**: 5000
212286
| **Accepted Values**: An integer
213287

214288
* - poll.max.batch.size
215289
- int
216290
- | Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.
291+
|
217292
| **Default**: 1000
218293
| **Accepted Values**: An integer
219294

@@ -229,12 +304,32 @@ an example source connector configuration file, see
229304
* - copy.existing
230305
- boolean
231306
- | Copy existing data from source collections and convert them to Change Stream events on their respective topics. Any changes to the data that occur during the copy process are applied once the copy is completed.
307+
|
232308
| **Default**: false
233309
| **Accepted Values**: ``true`` or ``false``
234310

311+
* - copy.existing.namespace.regex
312+
- string
313+
- | Regular expression that matches the namespaces from which to copy
314+
data. A namespace describes the database name and collection
315+
separated by a period, e.g. ``databaseName.collectionName``.
316+
317+
.. example::
318+
319+
In the following example, the setting matches all collections
320+
that start with "page" in the "stats" database.
321+
322+
.. code-block:: none
323+
324+
copy.existing.namespace.regex=stats\.page.*
325+
326+
| **Default**: ""
327+
| **Accepted Values**: A valid regular expression
328+
235329
* - copy.existing.max.threads
236330
- int
237331
- | The number of threads to use when performing the data copy. Defaults to the number of processors.
332+
|
238333
| **Default**: defaults to the number of processors
239334
| **Accepted Values**: An integer
240335

@@ -307,6 +402,20 @@ an example source connector configuration file, see
307402
| **Default:** ""
308403
| **Accepted Values**: A valid partition name
309404

405+
* - heartbeat.interval.ms
406+
- int
407+
- | The length of time in milliseconds between sending heartbeat messages to record a post batch resume token when no source records have been published. This can improve the resumability of the connector for low volume namespaces. Use ``0`` to disable.
408+
|
409+
| **Default**: ``0``
410+
| **Accepted Values**: An integer
411+
412+
* - heartbeat.topic.name
413+
- string
414+
- | The name of the topic to write heartbeat messages to.
415+
|
416+
| **Default**: ``__mongodb_heartbeats``
417+
| **Accepted Values**: A valid Kafka topic name
418+
310419
.. note::
311420

312421
The default maximum size for Kafka messages is 1MB. Update the

0 commit comments

Comments
 (0)