Skip to content

Commit 1147167

Browse files
atoomulajagadish-v0
authored andcommitted
Updated doc for Kinesis Connector
Author: Aditya Toomula <[email protected]> Reviewers: Jagadish<[email protected]> Closes apache#667 from atoomula/kinesisdocs
1 parent 1e0c81b commit 1147167

File tree

2 files changed

+63
-43
lines changed

2 files changed

+63
-43
lines changed

docs/learn/documentation/versioned/aws/kinesis.md

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
layout: page
3-
title: Connecting to Kinesis
3+
title: Kinesis Connector
44
---
55
<!--
66
Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,86 +19,106 @@ title: Connecting to Kinesis
1919
limitations under the License.
2020
-->
2121

22-
You can configure your Samza jobs to process data from [AWS Kinesis](https://aws.amazon.com/kinesis/data-streams), Amazon's data streaming service. A `Kinesis data stream` is similar to a Kafka topic and can have multiple partitions. Each message consumed from a Kinesis data stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
22+
## Overview
2323

24-
### Consuming from Kinesis:
24+
The Samza Kinesis connector provides access to [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams),
25+
Amazon’s data streaming service. A Kinesis Data Stream is similar to a Kafka topic and can have multiple partitions.
26+
Each message consumed from a Kinesis Data Stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
27+
Samza’s [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java)
28+
wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java).
2529

26-
Samza's [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java) wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java). The key of the message is set to partition key of the Record. The message is obtained from the Record body.
30+
## Consuming from Kinesis
2731

28-
To configure Samza to consume from Kinesis streams:
32+
### Basic Configuration
33+
34+
You can configure your Samza jobs to process data from Kinesis Streams. To configure Samza job to consume from Kinesis
35+
streams, please add the below configuration:
2936

3037
{% highlight jproperties %}
31-
# define a kinesis system factory with your identifier. eg: kinesis-system
38+
// define a kinesis system factory with your identifier. eg: kinesis-system
3239
systems.kinesis-system.samza.factory=org.apache.samza.system.eventhub.KinesisSystemFactory
3340

34-
# kinesis system consumer works with only AllSspToSingleTaskGrouperFactory
41+
// kinesis system consumer works with only AllSspToSingleTaskGrouperFactory
3542
job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
3643

37-
# define your streams
44+
// define your streams
3845
task.inputs=kinesis-system.input0
3946

40-
# define required properties for your streams
47+
// define required properties for your streams
4148
systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
4249
systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
4350
sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
4451
{% endhighlight %}
4552

46-
The tuple required to access the Kinesis data stream must be provided, namely the fields `YOUR-STREAM-REGION`, `YOUR-ACCESS-KEY`, `YOUR-SECRET-KEY`.
53+
The tuple required to access the Kinesis data stream must be provided, namely the following fields:<br>
54+
**YOUR-STREAM-REGION**, **YOUR-ACCESS-KEY**, **YOUR-SECRET-KEY**.
55+
4756

48-
#### Advanced Configuration:
57+
### Advanced Configuration
4958

50-
##### AWS Client Configs:
59+
#### AWS Client configs
60+
You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html)
61+
with the prefix **systems.system-name.aws.clientConfig.***
5162

52-
You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the prefix `system.system-name.aws.clientConfig.*`
5363
{% highlight jproperties %}
54-
system.system-name.aws.clientConfig.CONFIG-NAME=CONFIG-VALUE
64+
systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
5565
{% endhighlight %}
5666

57-
As an example, to set a proxy host and proxy port for the AWS Client:
67+
As an example, to set a *proxy host* and *proxy port* for the AWS Client:
68+
5869
{% highlight jproperties %}
5970
systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
6071
systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
6172
{% endhighlight %}
6273

63-
##### KCL Configs:
74+
#### Kinesis Client Library Configs
75+
Samza Kinesis Connector uses [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
76+
(KCL) to access the Kinesis data streams. You can set any [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
77+
for a stream by configuring it under **systems.system-name.streams.stream-name.aws.kcl.***
6478

65-
Similarly, you can set any [Kinesis Client Library config](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java) for a stream by configuring it under `systems.system-name.streams.stream-name.aws.kcl.*`
6679
{% highlight jproperties %}
67-
systems.system-name.streams.stream-name.aws.kcl.CONFIG-NAME=CONFIG-VALUE
80+
systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
6881
{% endhighlight %}
6982

70-
As an example, to reset the checkpoint and set the starting position for a stream:
83+
Obtain the config param from the public functions in [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
84+
by removing the *"with"* prefix. For example: config param corresponding to **withTableName()** is **TableName**.
85+
86+
### Resetting Offsets
87+
88+
The source of truth for checkpointing while using Kinesis Connector is not the Samza checkpoint topic but Kinesis itself.
89+
The Kinesis Client Library (KCL) [uses DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html)
90+
to store it’s checkpoints. By default, Kinesis Connector reads from the latest offset in the stream.
91+
92+
To reset the checkpoints and consume from earliest/latest offset of a Kinesis data stream, please change the KCL TableName
93+
and set the appropriate starting position for the stream as shown below.
94+
7195
{% highlight jproperties %}
96+
// change the TableName to a unique name to reset checkpoint.
7297
systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
73-
# set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
98+
// set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
7499
systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=my-start-position
75100
{% endhighlight %}
76101

77-
#### Limitations
102+
To manipulate checkpoints to start from a particular position in the Kinesis stream, in lieu of Samza CheckpointTool,
103+
please login to the AWS Console and change the offsets in the DynamoDB Table with the table name that you have specified
104+
in the config above. By default, the table name has the following format:
105+
"\<job name\>-\<job id\>-\<kinesis stream\>".
78106

79-
The following limitations apply for Samza jobs consuming from Kinesis streams using the Samza consumer:
80-
* Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the data from Kafka.
81-
* Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html) or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams.
82-
* Kinesis streams must be used with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java). No other grouper is supported.
83-
* A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.
107+
### Known Limitations
84108

85-
### Producing to Kinesis:
109+
The following limitations apply to Samza jobs consuming from Kinesis streams using the Samza consumer:
86110

87-
The KinesisSystemProducer for Samza is not yet implemented.
88-
89-
### How to configure Samza job to consume from Kinesis data stream ?
90-
91-
This tutorial uses [hello samza](../../../startup/hello-samza/{{site.version}}/) to illustrate running a Samza job on Yarn that consumes from Kinesis. We will use the [KinesisHelloSamza](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java) example.
92-
93-
#### Update properties file
111+
- Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by
112+
chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the
113+
data from Kafka.
114+
- Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html)
115+
or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams.
116+
- Kinesis streams must be used ONLY with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java)
117+
as the Kinesis consumer does the partition management by itself. No other grouper is supported.
118+
- A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results
119+
to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.
94120

95-
Update the following properties in the kinesis-hello-samza.properties file:
121+
## Producing to Kinesis
96122

97-
{% highlight jproperties %}
98-
task.inputs=kinesis.<kinesis-stream>
99-
systems.kinesis.streams.<kinesis-stream>.aws.region=<kinesis-stream-region>
100-
systems.kinesis.streams.<kinesis-stream>.aws.accessKey=<your-access-key>
101-
sensitive.systems.kinesis.streams.<kinesis-stream>.aws.region=<your-secret-key>
102-
{% endhighlight %}
123+
The KinesisSystemProducer for Samza is not yet implemented.
103124

104-
Now, you are ready to run your Samza application on Yarn as described [here](../../../startup/hello-samza/{{site.version}}/). Check the log file for messages read from your Kinesis stream.

docs/learn/documentation/versioned/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ <h4>Connectors</h4>
5353
<li><a href="jobs/configuration.html">Apache Kafka</a></li>
5454
<li><a href="jobs/packaging.html">Apache Hadoop</a></li>
5555
<li><a href="jobs/yarn-jobs.html">Azure EventHubs</a></li>
56-
<li><a href="jobs/logging.html">AWS Kinesis</a></li>
56+
<li><a href="aws/kinesis.html">AWS Kinesis</a></li>
5757
</ul>
5858

5959
<h4>Operations</h4>

0 commit comments

Comments
 (0)