Skip to content

Commit 159f385

Browse files
biniona-mongodbChris Cho
authored andcommitted
(DOCSP-18607) Usage Example Schema (#164)
Co-authored-by: Chris Cho <[email protected]>
1 parent bb16cc9 commit 159f385

File tree

10 files changed

+235
-25
lines changed

10 files changed

+235
-25
lines changed

snooty.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ mkc = "MongoDB Kafka Connector"
88
connector-long = "MongoDB Connector for Apache Kafka"
99
ak = "Apache Kafka"
1010
kc = "Kafka Connect"
11+
avro = "Apache Avro"
12+
avro-short = "Avro"
1113
connector_version="1.6"
1214
connector_driver_version="4.3"
1315
connector_driver_url_base="https://docs.mongodb.com/drivers/java/sync/v{+connector_driver_version+}/"
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"name": "Zola",
3+
"visits": [
4+
{
5+
"$date": "2021-07-25T17:30:00.000Z"
6+
},
7+
{
8+
"$date": "2021-10-03T14:06:00.000Z"
9+
}
10+
],
11+
"goods_purchased": {
12+
"apples": 1,
13+
"bananas": 10
14+
}
15+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"type": "record",
3+
"name": "Customer",
4+
"fields": [{
5+
"name": "name",
6+
"type": "string"
7+
},{
8+
"name": "visits",
9+
"type": {
10+
"type": "array",
11+
"items": {
12+
"type": "long",
13+
"logicalType": "timestamp-millis"
14+
}
15+
}
16+
},{
17+
"name": "goods_purchased",
18+
"type": {
19+
"type": "map",
20+
"values": "int"
21+
}
22+
}
23+
]
24+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
connector.class="com.mongodb.kafka.connect.MongoSourceConnector"
2+
connection.uri="<your MongoDB connection URI>"
3+
database="customers"
4+
collection="purchases"
5+
publish.full.document.only=true
6+
output.format.value="schema"
7+
output.schema.value="{\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}"
8+
value.converter.schemas.enable=true
9+
value.converter="org.apache.kafka.connect.json.JsonConverter"
10+
key.converter="org.apache.kafka.connect.storage.StringConverter"

source/introduction/converters.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.. _intro-converters:
2+
13
==========
24
Converters
35
==========

source/introduction/data-formats.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ You use JSON Schema when you apply JSON Schema converters to your connectors.
120120
For more information, see the official
121121
`JSON Schema website <https://json-schema.org/>`__.
122122

123+
.. _data-formats-avro:
124+
123125
Avro
124126
----
125127

source/source-connector/fundamentals/change-streams.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ For a list of aggregation operators you can use with a change stream, see
5454
the guide on :manual:`Modify Change Stream Output <changeStreams/#modify-change-stream-output>`
5555
in the MongoDB manual.
5656

57+
.. _source-connector-fundamentals-change-event:
58+
5759
Change Event Structure
5860
~~~~~~~~~~~~~~~~~~~~~~
5961

source/source-connector/fundamentals/specify-schema.txt

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.. _kafka-source-apply-schemas:
2+
13
=============
24
Apply Schemas
35
=============
@@ -15,20 +17,20 @@ Overview
1517
--------
1618

1719
In this guide, you can learn how to apply schemas to incoming
18-
documents in a {+mkc+} source connector.
20+
documents in a {+mkc+} source connector.
1921

20-
There are two types of schema in Kafka Connect, **key schema** and
22+
There are two types of schema in Kafka Connect, **key schema** and
2123
**value schema**. Kafka Connect sends messages to Apache Kafka containing both
2224
your value and a key. A key schema enforces a structure for keys in messages
2325
sent to Apache Kafka. A value schema enforces a structure for values in messages
24-
sent to Apache Kafka.
26+
sent to Apache Kafka.
2527

2628
.. important:: Note on Terminology
2729

2830
The word "key" has a slightly different meaning in the context
2931
of BSON and Apache Kafka. In BSON, a "key" is a unique string identifier for
30-
a field in a document.
31-
32+
a field in a document.
33+
3234
In Apache Kafka, a "key" is a byte array sent in a message used to determine
3335
what partition of a topic to write the message to. Kafka keys can be
3436
duplicates of other keys or ``null``.
@@ -49,7 +51,7 @@ following combinations of schemas:
4951
To see full properties files for specifying a schema, see our specify a schema
5052
usage example. <TODO: link to example>
5153

52-
To learn more about keys and values in Apache Kafka, see the
54+
To learn more about keys and values in Apache Kafka, see the
5355
`official Apache Kafka introduction <http://kafka.apache.org/intro#intro_concepts_and_terms>`__.
5456

5557
Default Schemas
@@ -60,10 +62,10 @@ The {+mkc+} provides two default schemas:
6062
- :ref:`A key schema for the _id field of MongoDB change event documents. <source-default-key-schema>`
6163
- :ref:`A value schema for MongoDB change event documents. <source-default-value-schema>`
6264

63-
To learn more about change events, see our
65+
To learn more about change events, see our
6466
:doc:`guide on change streams </source-connector/fundamentals/change-streams>`.
6567

66-
To learn more about default schemas, see the default schemas
68+
To learn more about default schemas, see the default schemas
6769
:github:`here in the MongoDB Kafka Connector source code <mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/schema/AvroSchemaDefaults.java>`.
6870

6971
.. _source-default-key-schema:
@@ -73,16 +75,16 @@ Key Schema
7375

7476
The {+mkc+} provides a default key schema for the ``_id`` field of change
7577
event documents. You should use the default key schema unless you remove the
76-
``_id`` field from your change event document using either of the transformations
77-
:ref:`described in this guide here <source-schema-for-modified-document>`.
78+
``_id`` field from your change event document using either of the transformations
79+
:ref:`described in this guide here <source-schema-for-modified-document>`.
7880

7981
If you specify either of these transformations and would like to use a key
8082
schema for your incoming documents, you must specify a key schema
81-
:ref:`as described in the specify a schema section of this guide <source-specify-avro-schema>`.
83+
:ref:`as described in the specify a schema section of this guide <source-specify-avro-schema>`.
8284

83-
You can enable the default key schema with the following option:
85+
You can enable the default key schema with the following option:
8486

85-
.. code-block:: java
87+
.. code-block:: java
8688

8789
output.format.key=schema
8890

@@ -93,16 +95,16 @@ Value Schema
9395

9496
The {+mkc+} provides a default value schema for change event documents. You
9597
should use the default value schema unless you transform your change event
96-
documents
97-
:ref:`as described in this guide here <source-schema-for-modified-document>`.
98+
documents
99+
:ref:`as described in this guide here <source-schema-for-modified-document>`.
98100

99101
If you specify either of these transformations and would like to use a value schema for your
100-
incoming documents, you must use one of the mechanisms described in the
101-
:ref:`schemas for transformed documents section of this guide <source-schema-for-modified-document>`.
102+
incoming documents, you must use one of the mechanisms described in the
103+
:ref:`schemas for transformed documents section of this guide <source-schema-for-modified-document>`.
102104

103-
You can enable the default value schema with the following option:
105+
You can enable the default value schema with the following option:
104106

105-
.. code-block:: properties
107+
.. code-block:: properties
106108

107109
output.format.value=schema
108110

@@ -115,7 +117,7 @@ There are two ways you can transform your change event documents in a
115117
source connector:
116118

117119
- The ``publish.full.document.only=true`` option
118-
- An aggregation pipeline that modifies the structure of change event documents
120+
- An aggregation pipeline that modifies the structure of change event documents
119121

120122
If you transform your MongoDB change event documents,
121123
you must do the following to apply schemas:
@@ -164,7 +166,7 @@ Infer a Schema
164166

165167
You can have your source connector infer a schema for incoming documents. This
166168
option works well for development and for data sources that do not
167-
frequently change structure, but for most production deployments we recommend that you
169+
frequently change structure, but for most production deployments we recommend that you
168170
:ref:`specify a schema <source-specify-avro-schema>`.
169171

170172
You can have the MongoDB Kafka Connector infer a schema by specifying the
@@ -179,8 +181,8 @@ following options:
179181

180182
The {+mkc+} does not support key schema inference. If you want to use a key
181183
schema and transform your MongoDB change event documents, you must specify a
182-
key schema as described in
183-
:ref:`the specify schemas section of this guide <source-specify-avro-schema>`.
184+
key schema as described in
185+
:ref:`the specify schemas section of this guide <source-specify-avro-schema>`.
184186

185187
Properties Files
186188
----------------

source/source-connector/usage-examples.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,5 @@ Source Connector Usage Examples
1010
Multiple Sources </source-connector/usage-examples/multiple-sources>
1111
Topic Naming </source-connector/usage-examples/topic-naming>
1212
Copy Existing Data </source-connector/usage-examples/copy-existing-data>
13-
13+
Specify a Schema </source-connector/usage-examples/schema>
1414

15-
asdf
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
.. _source-usage-example-schema:
2+
3+
================
4+
Specify a Schema
5+
================
6+
7+
.. default-domain:: mongodb
8+
9+
This usage example demonstrates how you can configure your MongoDB Kafka
10+
source connector to apply a custom **schema** to your data. A schema is a
11+
definition that specifies the structure and type information about data in an
12+
{+ak+} topic. Use a schema when you need to ensure the data on the topic populated
13+
by your source connector has a consistent structure.
14+
15+
To learn more about using schemas with the {+mkc+}, see the
16+
:ref:`Apply Schemas <kafka-source-apply-schemas>` guide.
17+
18+
Example
19+
-------
20+
21+
Suppose your application keeps track of customer data in a MongoDB
22+
collection, and you need to publish this data to a Kafka topic. You want
23+
the subscribers of the customer data to receive consistently formatted data.
24+
You choose to apply a schema to your data.
25+
26+
Your requirements and your solutions are as follows:
27+
28+
.. list-table::
29+
:header-rows: 1
30+
:widths: 50 50
31+
32+
* - Requirement
33+
- Solution
34+
35+
* - Receive customer data from a MongoDB collection
36+
- | Configure a MongoDB source connector to receive updates to data
37+
from a specific database and collection.
38+
| See :ref:`<usage-example-schema-read-collection>`.
39+
40+
* - Provide the customer data schema
41+
- | Specify a schema that corresponds to the structure and data types of
42+
the customer data.
43+
| See :ref:`<usage-example-schema-custom-schema>`.
44+
45+
* - Omit Kafka metadata from the customer data
46+
- | Include only the data from the ``fullDocument`` field.
47+
| See :ref:`<usage-example-schema-omit-metadata>`.
48+
49+
For the full configuration file that meets the requirements above, see
50+
:ref:`<usage-example-schema-config>`.
51+
52+
.. _usage-example-schema-read-collection:
53+
54+
Receive Data from a Collection
55+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
56+
57+
To configure your source connector to receive data from a MongoDB collection,
58+
specify the database and collection name. For this example, you can
59+
configure the connector to read from the ``purchases`` collection in the
60+
``customers`` database as follows:
61+
62+
.. code-block:: ini
63+
64+
database="customers"
65+
collection="purchases"
66+
67+
.. _usage-example-schema-custom-schema:
68+
69+
Create a Custom Schema
70+
~~~~~~~~~~~~~~~~~~~~~~
71+
72+
A sample customer data document from your collection contains the following
73+
information:
74+
75+
.. literalinclude:: /includes/source-connector/usage-examples/customer.json
76+
:language: json
77+
78+
From the sample document, you decide your schema should present the fields
79+
using the following data types:
80+
81+
.. list-table::
82+
:header-rows: 1
83+
:widths: 25 30 45
84+
85+
* - Field name
86+
- Data types
87+
- Description
88+
89+
* - **name**
90+
- `string <https://avro.apache.org/docs/current/spec.html#schema_primitive>`__
91+
- | Name of the customer
92+
93+
* - **visits**
94+
- `array <https://avro.apache.org/docs/current/spec.html#Arrays>`__
95+
of `timestamps <https://avro.apache.org/docs/current/spec.html#Timestamp+%28millisecond+precision%29>`__
96+
- Dates the customer visited
97+
98+
* - **goods_purchased**
99+
- `map <https://avro.apache.org/docs/current/spec.html#Maps>`__
100+
of string (the assumed type) to
101+
`integer <https://avro.apache.org/docs/current/spec.html#schema_primitive>`__
102+
values
103+
- Names of goods and quantity of each item the customer purchased
104+
105+
You can describe your data using the {+avro+} schema format as shown in
106+
the example schema below:
107+
108+
.. literalinclude:: /includes/source-connector/usage-examples/customers.avro
109+
:language: json
110+
111+
.. important:: Converters
112+
113+
If you want to send your data through {+ak+} with {+avro-short+} binary encoding,
114+
you must use an {+avro-short+} converter. For more information, see the guide on
115+
:ref:`Converters <intro-converters>`.
116+
117+
.. _usage-example-schema-omit-metadata:
118+
119+
Omit Metadata from Published Records
120+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
121+
122+
The connector publishes the customer data documents and metadata
123+
that describes the document to a Kafka topic. You can set the connector to
124+
include only the document data contained in the ``fullDocument`` field of the
125+
record using the following setting:
126+
127+
.. code-block:: ini
128+
129+
publish.full.document.only=true
130+
131+
For more information on the ``fullDocument`` field, see the
132+
:ref:`Change Streams <source-connector-fundamentals-change-event>` guide.
133+
134+
.. _usage-example-schema-config:
135+
136+
Custom Schema Configuration
137+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
138+
139+
Your custom schema connector configuration should resemble the following:
140+
141+
.. literalinclude:: /includes/source-connector/usage-examples/schema.properties
142+
:language: properties
143+
:emphasize-lines: 3,4,5,6,7
144+
145+
.. note:: Embedded Schema
146+
147+
In the preceding configuration, the JSON Schema converter embeds the custom
148+
schema in your messages. To learn more about the JSON Schema converter, see the
149+
:ref:`Converters <json-schema-converter-sample-properties>` guide.
150+
151+
For more information on specifying schemas, see the :ref:`Apply
152+
Schemas <kafka-source-apply-schemas>` guide.

0 commit comments

Comments
 (0)