Skip to content

Commit c5348bf

Browse files
committed
SAMZA-2041: add hdfs and kinesis descriptor
Author: Hai Lu <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes apache#857 from lhaiesp/master
1 parent 85830be commit c5348bf

File tree

11 files changed

+883
-70
lines changed

11 files changed

+883
-70
lines changed

docs/learn/documentation/versioned/connectors/hdfs.md

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,22 @@ While streaming sources like Kafka are unbounded, files on HDFS have finite data
4747

4848
#### Defining streams
4949

50-
Samza uses the notion of a _system_ to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - `HdfsSystemFactory`. You can then associate multiple streams with this _system_. Each stream should have a _physical name_, which should be set to the name of the directory on HDFS.
51-
52-
{% highlight jproperties %}
53-
systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
54-
55-
streams.hdfs-clickstream.samza.system=hdfs
56-
streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11
50+
In Samza high level API, you can use `HdfsSystemDescriptor` to create a HDFS system. The stream name should be set to the name of the directory on HDFS.
5751

52+
{% highlight java %}
53+
HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream");
54+
HdfsInputDescriptor hid = hsd.getInputDescriptor("/data/clickstream/2016/09/11");
5855
{% endhighlight %}
5956

6057
The above example defines a stream called `hdfs-clickstream` that reads data from the `/data/clickstream/2016/09/11` directory.
6158

6259
#### Whitelists & Blacklists
6360
If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the _whitelist_ selects the files to be filtered and the _blacklist_ is later applied on its results.
6461

65-
{% highlight jproperties %}
66-
systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
67-
systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
62+
{% highlight java %}
63+
HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
64+
.withConsumerWhiteList(".*avro")
65+
.withConsumerBlackList("somefile.avro");
6866
{% endhighlight %}
6967

7068

@@ -74,34 +72,34 @@ systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
7472

7573
Samza allows writing your output results to HDFS in AVRO format. You can either use avro's GenericRecords or have Samza automatically infer the schema for your object using reflection.
7674

77-
{% highlight jproperties %}
78-
# set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs'
79-
systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
80-
systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
75+
{% highlight java %}
76+
HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
77+
.withWriterClassName(AvroDataFileHdfsWriter.class.getName());
8178
{% endhighlight %}
8279

8380

84-
If your output is non-avro, you can describe its format by implementing your own serializer.
85-
{% highlight jproperties %}
86-
systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
87-
serializers.registry.my-serde-name.class=MySerdeFactory
88-
systems.hdfs.samza.msg.serde=my-serde-name
81+
If your output is non-avro, use `TextSequenceFileHdfsWriter`.
82+
{% highlight java %}
83+
HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
84+
.withWriterClassName(TextSequenceFileHdfsWriter.class.getName());
8985
{% endhighlight %}
9086

9187

9288
#### Output directory structure
9389

9490
Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter.
95-
{% highlight jproperties %}
96-
systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
97-
systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
91+
{% highlight java %}
92+
HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
93+
.withOutputBaseDir("/user/me/analytics/clickstream_data")
94+
.withDatePathFormat("yyyy_MM_dd");
9895
{% endhighlight %}
9996

10097
You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file.
10198

102-
{% highlight jproperties %}
103-
systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728
104-
systems.hdfs.producer.hdfs.write.batch.size.records=10000
99+
{% highlight java %}
100+
HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
101+
.withWriteBatchSizeBytes(134217728)
102+
.withWriteBatchSizeRecords(10000);
105103
{% endhighlight %}
106104

107105
### Security

docs/learn/documentation/versioned/connectors/kinesis.md

Lines changed: 68 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,16 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac
3636

3737
#### Basic Configuration
3838

39-
Here is the required configuration for consuming messages from Kinesis.
40-
41-
{% highlight jproperties %}
42-
// Define a Kinesis system factory with your identifier. eg: kinesis-system
43-
systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
44-
45-
// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory
46-
job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
47-
48-
// Define your streams
49-
task.inputs=kinesis-system.input0
50-
51-
// Define required properties for your streams
52-
systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
53-
systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
54-
sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
39+
Here is the required configuration for consuming messages from Kinesis, through `KinesisSystemDescriptor` and `KinesisInputDescriptor`.
40+
41+
{% highlight java %}
42+
KinesisSystemDescriptor ksd = new KinesisSystemDescriptor("kinesis");
43+
44+
KinesisInputDescriptor<KV<String, byte[]>> kid =
45+
ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>())
46+
.withRegion("STREAM-REGION")
47+
.withAccessKey("YOUR-ACCESS_KEY")
48+
.withSecretKey("YOUR-SECRET-KEY");
5549
{% endhighlight %}
5650

5751
####Coordination
@@ -66,40 +60,57 @@ job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.str
6660

6761
Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS.
6862

69-
{% highlight jproperties %}
70-
systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
71-
systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
72-
sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
63+
{% highlight java %}
64+
KinesisInputDescriptor<KV<String, byte[]>> kid =
65+
ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>())
66+
.withRegion("STREAM-REGION")
67+
.withAccessKey("YOUR-ACCESS_KEY")
68+
.withSecretKey("YOUR-SECRET-KEY");
7369
{% endhighlight %}
7470

7571
### Advanced Configuration
7672

7773
#### Kinesis Client Library Configs
7874
Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
7975
(KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
80-
for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix.
76+
for a stream by configuring it through `KinesisInputDescriptor`.
8177

82-
{% highlight jproperties %}
83-
systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
78+
{% highlight java %}
79+
KinesisInputDescriptor<KV<String, byte[]>> kid = ...
80+
81+
Map<String, String> kclConfig = new HashMap<>;
82+
kclConfig.put("CONFIG-PARAM", "CONFIG-VALUE");
83+
84+
kid.withKCLConfig(kclConfig);
8485
{% endhighlight %}
8586

8687
As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance.
87-
{% highlight jproperties %}
88-
systems.system-name.streams.stream-name.aws.kcl.TableName=myTable
88+
{% highlight java %}
89+
KinesisInputDescriptor<KV<String, byte[]>> kid = ...
90+
91+
Map<String, String> kclConfig = new HashMap<>;
92+
kclConfig.put("TableName", "myTable");
93+
94+
kid.withKCLConfig(kclConfig);
8995
{% endhighlight %}
9096

9197
#### AWS Client configs
9298
Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance.
93-
You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix.
99+
You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) through `KinesisSystemDescriptor`.
94100

95-
{% highlight jproperties %}
96-
systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
101+
{% highlight java %}
102+
Map<String, String> awsConfig = new HashMap<>;
103+
awsConfig.put("CONFIG-PARAM", "CONFIG-VALUE");
104+
105+
KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName)
106+
.withAWSConfig(awsConfig);
97107
{% endhighlight %}
98108

99-
As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client:
100-
{% highlight jproperties %}
101-
systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
102-
systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
109+
Through `KinesisSystemDescriptor` you can also set the *proxy host* and *proxy port* to be used by the Kinesis Client:
110+
{% highlight java %}
111+
KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName)
112+
.withProxyHost("YOUR-PROXY-HOST")
113+
.withProxyPort(YOUR-PROXY-PORT);
103114
{% endhighlight %}
104115

105116
### Resetting Offsets
@@ -109,14 +120,37 @@ These checkpoints are stored and managed by the KCL library internally. You can
109120

110121
{% highlight jproperties %}
111122
// change the TableName to a unique name to reset checkpoints.
112-
systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
123+
systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName=my-app-table-name
124+
{% endhighlight %}
125+
126+
Or through `KinesisInputDescriptor`
127+
128+
{% highlight java %}
129+
KinesisInputDescriptor<KV<String, byte[]>> kid = ...
130+
131+
Map<String, String> kclConfig = new HashMap<>;
132+
kclConfig.put("TableName", "my-new-app-table-name");
133+
134+
kid.withKCLConfig(kclConfig);
113135
{% endhighlight %}
114136

137+
115138
When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream.
116139

117140
{% highlight jproperties %}
118141
// set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
119-
systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST
142+
systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream=LATEST
143+
{% endhighlight %}
144+
145+
Or through `KinesisInputDescriptor`
146+
147+
{% highlight java %}
148+
KinesisInputDescriptor<KV<String, byte[]>> kid = ...
149+
150+
Map<String, String> kclConfig = new HashMap<>;
151+
kclConfig.put("InitialPositionInStream", "LATEST");
152+
153+
kid.withKCLConfig(kclConfig);
120154
{% endhighlight %}
121155

122156
Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table.

samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,20 @@
5454
public class KinesisConfig extends MapConfig {
5555
private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
5656

57-
private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
58-
private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
57+
public static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
58+
public static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
5959

60-
private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
61-
private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
60+
public static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
61+
public static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
6262

63-
private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
64-
private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
65-
private static final String DEFAULT_CONFIG_PROXY_HOST = "";
66-
private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
67-
private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
63+
public static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
64+
public static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
65+
public static final String DEFAULT_CONFIG_PROXY_HOST = "";
66+
public static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
67+
public static final int DEFAULT_CONFIG_PROXY_PORT = 0;
6868

69-
private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
70-
private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
69+
public static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
70+
public static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
7171

7272
public KinesisConfig(Config config) {
7373
super(config);
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.system.kinesis.descriptors;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
26+
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.samza.serializers.KVSerde;
28+
import org.apache.samza.serializers.NoOpSerde;
29+
import org.apache.samza.serializers.Serde;
30+
import org.apache.samza.system.descriptors.InputDescriptor;
31+
import org.apache.samza.system.descriptors.SystemDescriptor;
32+
import org.apache.samza.system.kinesis.KinesisConfig;
33+
34+
35+
/**
36+
* A {@link KinesisInputDescriptor} can be used for specifying Samza and Kinesis specific properties of Kinesis
37+
* input streams.
38+
* <p>
39+
* Use {@link KinesisSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
40+
* <p>
41+
* Stream properties provided in configuration override corresponding properties specified using a descriptor.
42+
*
43+
* @param <StreamMessageType> type of messages in this stream
44+
*/
45+
public class KinesisInputDescriptor<StreamMessageType>
46+
extends InputDescriptor<StreamMessageType, KinesisInputDescriptor<StreamMessageType>> {
47+
private Optional<String> accessKey = Optional.empty();
48+
private Optional<String> secretKey = Optional.empty();
49+
private Optional<String> region = Optional.empty();
50+
private Map<String, String> kclConfig = Collections.emptyMap();
51+
52+
53+
/**
54+
* Constructs an {@link InputDescriptor} instance.
55+
*
56+
* @param streamId id of the stream
57+
* @param valueSerde serde the values in the messages in the stream
58+
* @param systemDescriptor system descriptor this stream descriptor was obtained from
59+
*/
60+
<T> KinesisInputDescriptor(String streamId, Serde<T> valueSerde, SystemDescriptor systemDescriptor) {
61+
super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null);
62+
}
63+
64+
/**
65+
* Kinesis region for the system stream.
66+
* @param region Kinesis region
67+
* @return this input descriptor
68+
*/
69+
public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
70+
this.region = Optional.of(StringUtils.stripToNull(region));
71+
return this;
72+
}
73+
74+
/**
75+
* Kinesis access key name for the system stream.
76+
* @param accessKey Kinesis access key name
77+
* @return this input descriptor
78+
*/
79+
public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) {
80+
this.accessKey = Optional.of(StringUtils.stripToNull(accessKey));
81+
return this;
82+
}
83+
84+
/**
85+
* Kinesis secret key name for the system stream.
86+
* @param secretKey Kinesis secret key
87+
* @return this input descriptor
88+
*/
89+
public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) {
90+
this.secretKey = Optional.of(StringUtils.stripToNull(secretKey));
91+
return this;
92+
}
93+
94+
/**
95+
* KCL (Kinesis Client Library) config for the system stream. This is not required by default.
96+
* @param kclConfig A map of specified KCL configs
97+
* @return this input descriptor
98+
*/
99+
public KinesisInputDescriptor<StreamMessageType> withKCLConfig(Map<String, String> kclConfig) {
100+
this.kclConfig = kclConfig;
101+
return this;
102+
}
103+
104+
@Override
105+
public Map<String, String> toConfig() {
106+
Map<String, String> config = new HashMap<>(super.toConfig());
107+
108+
String systemName = getSystemName();
109+
String streamId = getStreamId();
110+
String clientConfigPrefix =
111+
String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
112+
113+
this.region.ifPresent(
114+
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
115+
this.accessKey.ifPresent(
116+
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
117+
this.secretKey.ifPresent(
118+
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
119+
this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
120+
121+
return config;
122+
}
123+
}

0 commit comments

Comments
 (0)