Skip to content

Commit 1e0c81b

Browse files
shanthooshjagadish-v0
authored andcommitted
SAMZA-1920: Standalone user guide.
Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish<[email protected]> Closes apache#670 from shanthoosh/getting_started_with_standalone
1 parent 2c4b6d5 commit 1e0c81b

File tree

3 files changed

+218
-1
lines changed

3 files changed

+218
-1
lines changed
32.8 KB
Loading

docs/learn/documentation/versioned/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ <h4>Deployment</h4>
4343
<li><a href="api/overview.html">Deployment overview</a></li>
4444
<li><a href="deployment/deployment-model.html">Deployment model</a></li>
4545
<li><a href="api/overview.html">Run on YARN</a></li>
46-
<li><a href="api/overview.html">Run as an embedded library</a></li>
46+
<li><a href="standalone/standalone.html">Run as an embedded library</a></li>
4747
</ul>
4848

4949
<h4>Connectors</h4>
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
---
2+
layout: page
3+
title: Run as embedded library.
4+
---
5+
<!--
6+
Licensed to the Apache Software Foundation (ASF) under one or more
7+
contributor license agreements. See the NOTICE file distributed with
8+
this work for additional information regarding copyright ownership.
9+
The ASF licenses this file to You under the Apache License, Version 2.0
10+
(the "License"); you may not use this file except in compliance with
11+
the License. You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
-->
21+
22+
- [Introduction](#introduction)
23+
- [User guide](#user-guide)
24+
- [Setup dependencies](#setup-dependencies)
25+
- [Configuration](#configuration)
26+
- [Code sample](#code-sample)
27+
- [Quick start guide](#quick-start-guide)
28+
- [Setup zookeeper](#setup-zookeeper)
29+
- [Setup kafka](#setup-kafka)
30+
- [Build binaries](#build-binaries)
31+
- [Deploy binaries](#deploy-binaries)
32+
- [Inspect results](#inspect-results)
33+
- [Coordinator internals](#coordinator-internals)
34+
35+
#
36+
37+
# Introduction
38+
39+
With Samza 0.13.0, the deployment model of samza jobs has been simplified and decoupled from YARN. _Standalone_ model provides the stream processing capabilities of samza packaged in the form of a library with pluggable coordination. This library model offers an easier integration path and promotes a flexible deployment model for an application. Using the standalone mode, you can leverage Samza processors directly in your application and deploy Samza applications to self-managed clusters.
40+
41+
A standalone application typically is comprised of multiple _stream processors_. A _stream processor_ encapsulates a user defined processing function and is responsible for processing a subset of input topic partitions. A stream processor of a standalone application is uniquely identified by a _processorId_.
42+
43+
Samza provides pluggable job _coordinator_ layer to perform leader election and assign work to the stream processors. Standalone supports Zookeeper coordination out of the box and uses it for distributed coordination between the stream processors of standalone application. A processor can become part of a standalone application by setting its app.name(Ex: app.name=group\_1) and joining the group.
44+
45+
In samza standalone, the input topic partitions are distributed between the available processors dynamically at runtime. In each standalone application, one stream processor will be chosen as a leader initially to mediate the assignment of input topic partitions to the stream processors. If the number of available processors changes(for example, if a processors is shutdown or added), then the leader processor will regenerate the partition assignments and re-distribute it to all the processors.
46+
47+
On processor group change, the act of re-assigning input topic partitions to the remaining live processors in the group is known as rebalancing the group. On failure of the leader processor of a standalone application, an another stream processor of the standalone application will be chosen as leader.
48+
49+
## User guide
50+
51+
Samza standalone is designed to help you to have more control over the deployment of the application. So it is your responsibility to configure and deploy the processors. In case of ZooKeeper coordination, you have to configure the URL for an instance of ZooKeeper.
52+
53+
A stream processor is identified by a unique processorID which is generated by the pluggable ProcessorIdGenerator abstraction. ProcessorId of the stream processor is used with the coordination service. Samza supports UUID based ProcessorIdGenerator out of the box.
54+
55+
The diagram below shows a input topic with three partitions and an standalone application with three processors consuming messages from it.
56+
57+
<img src="/img/versioned/learn/documentation/standalone/standalone-application.jpg" alt="Standalone application" height="550px" width="700px" align="middle">
58+
59+
When a group is first initialized, each stream processor typically starts processing messages from either the earliest or latest offset of the input topic partition. The messages in each partition are sequentially delivered to the user defined processing function. As the stream processor makes progress, it commits the offsets of the messages it has successfully processed. For example, in the figure above, the stream processor position is at offset 7 and its last committed offset is at offset 3.
60+
61+
When a input partition is reassigned to another processor in the group, the initial position is set to the last committed offset. If the processor-1 in the example above suddenly crashed, then the live processor taking over the partition would begin consumption from offset 3. In that case, it would not have to reprocess the messages up to the crashed processor's position of 3.
62+
63+
### Setup dependencies
64+
65+
Add the following samza-standalone maven dependencies to your project.
66+
67+
```xml
68+
<dependency>
69+
<groupId>org.apache.samza</groupId>
70+
<artifactId>samza-kafka_2.11</artifactId>
71+
<version>1.0</version>
72+
</dependency>
73+
<dependency>
74+
<groupId>org.apache.samza</groupId>
75+
<artifactId>samza-core_2.11</artifactId>
76+
<version>1.0</version>
77+
</dependency>
78+
<dependency>
79+
<groupId>org.apache.samza</groupId>
80+
<artifactId>samza-api</artifactId>
81+
<version>1.0</version>
82+
</dependency>
83+
```
84+
85+
### Configuration
86+
87+
A samza standalone application requires you to define the following mandatory configurations:
88+
89+
```bash
90+
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
91+
job.coordinator.zk.connect=your_zk_connection(for local zookeeper, use localhost:2181)
92+
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
93+
```
94+
95+
You have to configure the stream processor with the kafka brokers as defined in the following sample(we have assumed that the broker is running on localhost):
96+
97+
```bash
98+
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
99+
systems.kafka.samza.msg.serde=json
100+
systems.kafka.consumer.zookeeper.connect=localhost:2181
101+
systems.kafka.producer.bootstrap.servers=localhost:9092
102+
```
103+
104+
### Code sample
105+
106+
Here&#39;s a sample standalone application with app.name set to sample-test. Running this class would launch a stream processor.
107+
108+
```java
109+
public class PageViewEventExample implements StreamApplication {
110+
111+
public static void main(String[] args) {
112+
CommandLine cmdLine = new CommandLine();
113+
OptionSet options = cmdLine.parser().parse(args);
114+
Config config = cmdLine.loadConfig(options);
115+
116+
ApplicationRunner runner = ApplicationRunners.getApplicationRunner(ApplicationClassUtils.fromConfig(config), config);
117+
runner.run();
118+
runner.waitForFinish();
119+
}
120+
121+
@Override
122+
public void describe(StreamAppDescriptor appDesc) {
123+
MessageStream<PageViewEvent> pageViewEvents = null;
124+
pageViewEvents = appDesc.getInputStream("inputStream", new JsonSerdeV2<>(PageViewEvent.class));
125+
OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
126+
appDesc.getOutputStream("outputStream", new JsonSerdeV2<>(PageViewEvent.class));
127+
pageViewEvents.sendTo(pageViewEventPerMemberStream);
128+
}
129+
}
130+
```
131+
132+
## Quick start guide
133+
134+
The [Hello-samza](https://github.com/apache/samza-hello-samza/) project contains sample Samza standalone applications. Here are step by step instruction guide to install, build and run a standalone application binaries using the local zookeeper cluster for coordination. Check out the hello-samza project by running the following commands:
135+
136+
```bash
137+
git clone https://git.apache.org/samza-hello-samza.git hello-samza
138+
cd hello-samza
139+
```
140+
141+
### Setup Zookeeper
142+
143+
Run the following command to install and start a local zookeeper cluster.
144+
145+
```bash
146+
./bin/grid install zookeeper
147+
./bin/grid start zookeeper
148+
```
149+
150+
### Setup Kafka
151+
152+
Run the following command to install and start a local kafka cluster.
153+
154+
```bash
155+
./bin/grid install kafka
156+
./bin/grid start zookeeper
157+
```
158+
159+
### Build binaries
160+
161+
Before you can run the standalone job, you need to build a package for it using the following command.
162+
163+
```bash
164+
mvn clean package
165+
mkdir -p deploy/samza
166+
tar -xvf ./target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C deploy/samza
167+
```
168+
169+
### Deploy binaries
170+
171+
To run the sample standalone application [WikipediaZkLocalApplication](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java)
172+
173+
```bash
174+
./bin/deploy.sh
175+
./deploy/samza/bin/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application-local-runner.properties
176+
```
177+
178+
### Inspect results
179+
180+
The standalone application reads messages from the wikipedia-edits topic, and calculates counts, every ten seconds, for all edits that were made during that window. It outputs these counts to the local wikipedia-stats kafka topic. To inspect events in output topic, run the following command.
181+
182+
```bash
183+
./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wikipedia-stats
184+
```
185+
186+
Events produced to the output topic from the standalone application launched above will be of the following form:
187+
188+
```
189+
{"is-talk":2,"bytes-added":5276,"edits":13,"unique-titles":13}
190+
{"is-bot-edit":1,"is-talk":3,"bytes-added":4211,"edits":30,"unique-titles":30,"is-unpatrolled":1,"is-new":2,"is-minor":7}
191+
```
192+
193+
# Coordinator internals
194+
195+
A samza application is comprised of multiple stream processors. A processor can become part of a standalone application by setting its app.name(Ex: app.name=group\_1) and joining the group. In samza standalone, the input topic partitions are distributed between the available processors dynamically at runtime. If the number of available processors changes(for example, if some processors are shutdown or added), then the partition assignments will be regenerated and re-distributed to all the processors. One processor will be elected as leader and it will generate the partition assignments and distribute it to the other processors in the group.
196+
197+
To mediate the partition assignments between processors, samza standalone relies upon a coordination service. The main responsibilities of coordination service are the following:
198+
199+
**Leader Election** - Elects a single processor to generate the partition assignments and distribute it to other processors in the group.
200+
201+
**Distributed barrier** - Coordination primitive used by the processors to reach consensus(agree) on an partition assignment.
202+
203+
By default, embedded samza uses Zookeeper for coordinating between processors of an application and store the partition assignment state. Coordination sequence for a standalone application is listed below:
204+
205+
1. Each processor(participant) will register with the coordination service(e.g: Zookeeper) with its participant ID.
206+
207+
2. One of the participants will be elected as the leader.
208+
209+
3. The leader will monitor the list of all the active participants.
210+
211+
4. Whenever the list of the participants changes in a group, the leader will generate a new partition assignments for the current participants and persist it to a common storage.
212+
213+
5. Participants are notified that the new partition assignment is available. Notification is done through the coordination service(e.g. ZooKeeper).
214+
215+
6. The participants will stop processing, pick up the new partition assignment, and then resume processing.
216+
217+
In order to ensure that no two partitions are processed by different processors, processing is paused and all the processors will synchronize on a distributed barrier. Once all the processors are paused, the new partition assignments are applied, after which processing resumes.

0 commit comments

Comments
 (0)