Skip to content

Commit 902ef08

Browse files
committed
(DOCSP-21085) Structured streaming (#90)
1 parent a9344ea commit 902ef08

File tree

8 files changed

+336
-48
lines changed

8 files changed

+336
-48
lines changed

snooty.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ intersphinx = ["https://www.mongodb.com/docs/manual/objects.inv"]
66
toc_landing_pages = ["configuration"]
77

88
[constants]
9-
current-version = "10"
9+
current-version = "10.0"
1010
spark-core-version = "3.0.1"
1111
spark-sql-version = "3.0.1"
1212
scala-version = "2.12"

source/getting-started.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,4 @@ Tutorials
5050

5151
- :doc:`write-to-mongodb`
5252
- :doc:`read-from-mongodb`
53-
- :doc:`streaming`
53+
- :doc:`structured-streaming`
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.. important::
2+
3+
`Spark Structured Streaming <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>`__ and `Spark Streaming with DStreams <https://spark.apache.org/docs/latest/streaming-programming-guide.html>`__ are different.

source/index.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ versions of Apache Spark and MongoDB:
107107
getting-started
108108
write-to-mongodb
109109
read-from-mongodb
110-
streaming
111-
tutorials
110+
structured-streaming
112111
faq
113112
release-notes
114113
API Docs <https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_{+scala-version+}/{+current-version+}>

source/scala/streaming.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
.. include:: includes/streaming-distinction.rst
2+
13
Spark Streaming allows on-the-fly analysis of live data streams with
24
MongoDB. See the `Apache documentation
35
<http://spark.apache.org/docs/latest/streaming-programming-guide.html>`_
46
for a detailed description of Spark Streaming functionality.
57

6-
This tutorial uses the Spark Shell.For more information about starting
8+
This tutorial uses the Spark Shell. For more information about starting
79
the Spark Shell and configuring it for use with MongoDB, see
810
:ref:`Getting Started <scala-getting-started>`.
911

source/streaming.txt

Lines changed: 0 additions & 29 deletions
This file was deleted.

source/structured-streaming.txt

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
.. _spark-structured-streaming:
2+
3+
=================================
4+
Structured Streaming with MongoDB
5+
=================================
6+
7+
.. default-domain:: mongodb
8+
9+
.. contents:: On this page
10+
:local:
11+
:backlinks: none
12+
:depth: 2
13+
:class: singlecol
14+
15+
Overview
16+
--------
17+
18+
Spark Structured Streaming is a data stream processing engine you can
19+
use through the Dataset or DataFrame API. The MongoDB Spark Connector
20+
enables you to stream to and from MongoDB using Spark Structured
21+
Streaming.
22+
23+
.. include:: includes/streaming-distinction.rst
24+
25+
To learn more about Structured Streaming, see the
26+
`Spark Programming Guide
27+
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>`__.
28+
29+
.. _write-structured-stream:
30+
31+
Configuring a Write Stream to MongoDB
32+
-------------------------------------
33+
34+
.. tabs-drivers::
35+
36+
tabs:
37+
- id: java-sync
38+
content: |
39+
40+
- id: python
41+
content: |
42+
43+
Specify write stream configuration settings on your streaming
44+
Dataset or DataFrame using the ``writeStream`` property. You
45+
must specify the following configuration settings to write
46+
to MongoDB:
47+
48+
.. list-table::
49+
:header-rows: 1
50+
:stub-columns: 1
51+
:widths: 10 40
52+
53+
* - Setting
54+
- Description
55+
56+
* - ``writeStream.format()``
57+
- The format to use for write stream data. Use
58+
``mongodb``.
59+
60+
* - ``writeStream.option()``
61+
- Use the ``option`` method to specify your MongoDB
62+
deployment connection string with the
63+
``spark.mongodb.connection.uri`` option key.
64+
65+
You must specify a database and collection, either as
66+
part of your connection string or with additional
67+
``option`` methods using the following keys:
68+
69+
- ``spark.mongodb.database``
70+
- ``spark.mongodb.collection``
71+
72+
* - ``writeStream.outputMode()``
73+
- The output mode to use. To view a list of all supported
74+
output modes, see `the pyspark outputMode documentation <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.outputMode.html#pyspark.sql.streaming.DataStreamWriter.outputMode>`__.
75+
76+
77+
The following code snippet shows how to use the preceding
78+
configuration settings to stream data to MongoDB:
79+
80+
.. code-block:: python
81+
:copyable: true
82+
:emphasize-lines: 3-4, 7
83+
84+
<streaming Dataset/ DataFrame> \
85+
.writeStream \
86+
.format("mongodb") \
87+
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
88+
.option("spark.mongodb.database", <database-name>) \
89+
.option("spark.mongodb.collection", <collection-name>) \
90+
.outputMode("append")
91+
92+
For a complete list of methods, see the
93+
`pyspark Structured Streaming reference <https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss.html>`__.
94+
95+
- id: scala
96+
content: |
97+
98+
.. _read-structured-stream:
99+
.. _continuous-processing:
100+
101+
Configuring a Read Stream from MongoDB
102+
--------------------------------------
103+
104+
Reading a stream from a MongoDB database requires
105+
*continuous processing*,
106+
an experimental feature introduced in Spark version 2.3. To learn
107+
more about continuous processing, see the `Spark documentation <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing>`__.
108+
109+
.. tabs-drivers::
110+
111+
tabs:
112+
- id: java-sync
113+
content: |
114+
115+
- id: python
116+
content: |
117+
118+
To use continuous processing with the MongoDB Spark Connector,
119+
add the ``trigger()`` method to the ``writeStream`` property
120+
of the streaming Dataset or DataFrame that you create from
121+
your MongoDB read stream. In your ``trigger()``, specify the
122+
``continuous`` parameter.
123+
124+
.. note::
125+
126+
The connector populates its read stream from your MongoDB
127+
deployment's change stream. To populate your change stream,
128+
perform update operations on your database.
129+
130+
To learn more about change streams, see
131+
:manual:`Change Streams </changeStreams>` in the MongoDB
132+
manual.
133+
134+
Specify read stream configuration settings on your local
135+
SparkSession ``readStream``. You must specify the following
136+
configuration settings to read from MongoDB:
137+
138+
.. list-table::
139+
:header-rows: 1
140+
:stub-columns: 1
141+
:widths: 10 40
142+
143+
* - Setting
144+
- Description
145+
146+
* - ``readStream.format()``
147+
- The format to use for read stream data. Use ``mongodb``.
148+
149+
* - ``writeStream.trigger()``
150+
- Enables continuous processing for your read stream. Use
151+
the ``continuous`` parameter.
152+
153+
The following code snippet shows how to use the preceding
154+
configuration settings to stream data from MongoDB:
155+
156+
.. code-block:: python
157+
:copyable: true
158+
:emphasize-lines: 3, 9
159+
160+
streamingDataFrame = (<local SparkSession>
161+
.readStream
162+
.format("mongodb")
163+
.load()
164+
)
165+
166+
query = (streamingDataFrame
167+
.writeStream
168+
.trigger(continuous="1 second")
169+
.format("memory")
170+
.outputMode("append")
171+
)
172+
173+
query.start()
174+
175+
.. note::
176+
177+
Spark does not begin streaming until you call the
178+
``start()`` method on a streaming query.
179+
180+
For a complete list of methods, see the
181+
`pyspark Structured Streaming reference <https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss.html>`__.
182+
183+
- id: scala
184+
content: |
185+
186+
Examples
187+
--------
188+
189+
The following examples show Spark Structured Streaming configurations
190+
for streaming between MongoDB and a ``.csv`` file.
191+
192+
Stream to MongoDB from a CSV File
193+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
194+
195+
.. tabs-drivers::
196+
197+
tabs:
198+
- id: java-sync
199+
content: |
200+
201+
.. code-block:: java
202+
:copyable: true
203+
204+
- id: python
205+
content: |
206+
207+
To create a :ref:`write stream <write-structured-stream>` to
208+
MongoDB from a ``.csv`` file, first create a `DataStreamReader <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html>`__
209+
from the ``.csv`` file, then use that ``DataStreamReader`` to
210+
create a `DataStreamWriter <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html>`__
211+
to MongoDB. Finally, use the ``start()`` method to begin the
212+
stream.
213+
214+
As streaming data is read from the ``.csv`` file, it is added
215+
to MongoDB in the `outputMode <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.outputMode.html#pyspark.sql.streaming.DataStreamWriter.outputMode>`__
216+
you specify.
217+
218+
.. code-block:: python
219+
:copyable: true
220+
:emphasize-lines: 11, 17
221+
222+
# create a local SparkSession
223+
spark = SparkSession \
224+
.builder \
225+
.appName("writeExample") \
226+
.master("spark://spark-master:<port>") \
227+
.config("spark.jars", "<mongodb-spark-connector-{+current-version+}>.jar") \
228+
.getOrCreate()
229+
230+
# define a streaming query
231+
query = (spark
232+
.readStream
233+
.format("csv")
234+
.option("header", "true")
235+
.schema(<csv-schema>)
236+
.load(<csv-file-name>)
237+
# manipulate your streaming data
238+
.writeStream
239+
.format("mongodb")
240+
.option("checkpointLocation", "/tmp/pyspark/")
241+
.option("forceDeleteTempCheckpointLocation", "true")
242+
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
243+
.option('spark.mongodb.database', <database-name>)
244+
.option('spark.mongodb.collection', <collection-name>)
245+
.outputMode("append")
246+
)
247+
248+
# run the query
249+
query.start()
250+
251+
- id: scala
252+
content: |
253+
254+
.. code-block:: scala
255+
:copyable: true
256+
257+
Stream to a CSV File from MongoDB
258+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
259+
260+
.. tabs-drivers::
261+
262+
tabs:
263+
- id: java-sync
264+
content: |
265+
266+
.. code-block:: java
267+
:copyable: true
268+
269+
- id: python
270+
content: |
271+
272+
To create a :ref:`read stream <read-structured-stream>` to a
273+
``.csv`` file from MongoDB, first create a `DataStreamReader <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html>`__
274+
from MongoDB, then use that ``DataStreamReader`` to
275+
create a `DataStreamWriter <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html>`__
276+
to a new ``.csv`` file. Finally, use the ``start()`` method
277+
to begin the stream.
278+
279+
As new data is inserted into MongoDB, MongoDB streams that
280+
data out to a ``.csv`` file in the `outputMode <https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.outputMode.html#pyspark.sql.streaming.DataStreamWriter.outputMode>`__
281+
you specify.
282+
283+
.. code-block:: python
284+
:copyable: true
285+
:emphasize-lines: 19, 27, 30
286+
287+
# create a local SparkSession
288+
spark = SparkSession \
289+
.builder \
290+
.appName("readExample") \
291+
.master("spark://spark-master:<port>") \
292+
.config("spark.jars", "<mongodb-spark-connector-{+current-version+}>.jar") \
293+
.getOrCreate()
294+
295+
# define the schema of the source collection
296+
readSchema = (StructType()
297+
.add('company_symbol', StringType())
298+
.add('company_name', StringType())
299+
.add('price', DoubleType())
300+
.add('tx_time', TimestampType())
301+
)
302+
303+
# define a streaming query
304+
query = (spark
305+
.readStream
306+
.format("mongodb")
307+
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
308+
.option('spark.mongodb.database', <database-name>)
309+
.option('spark.mongodb.collection', <collection-name>)
310+
.schema(readSchema)
311+
.load()
312+
# manipulate your streaming data
313+
.writeStream
314+
.format("csv")
315+
.option("path", "/output/")
316+
.trigger(continuous="1 second")
317+
.outputMode("append")
318+
)
319+
320+
# run the query
321+
query.start()
322+
323+
- id: scala
324+
content: |
325+
326+
.. code-block:: scala
327+
:copyable: true

0 commit comments

Comments
 (0)