Skip to content

Commit e6c1eed

Browse files
author
Jacob Maes
committed
SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments
Author: Jacob Maes <[email protected]> Reviewers: Yi Pan (Data Infrastructure) <[email protected]>,Xinyu Liu <[email protected]>,Navina Ramesh <[email protected]> Closes apache#74 from jmakes/samza-1096
1 parent d104013 commit e6c1eed

File tree

15 files changed

+810
-113
lines changed

15 files changed

+810
-113
lines changed

docs/learn/documentation/versioned/jobs/configuration-table.html

Lines changed: 168 additions & 57 deletions
Large diffs are not rendered by default.

docs/learn/documentation/versioned/jobs/logging.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ to log4j.xml and define the system name by specifying the config:
139139
task.log4j.system="<system-name>"
140140
{% endhighlight %}
141141

142+
The default stream name for logger is generated using the following convention, though you can override it using the `StreamName` property in the log4j.xml as shown above.
143+
```java
144+
"__samza_%s_%s_logs" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
145+
```
146+
142147
Configuring the StreamAppender will automatically encode messages using logstash's [Log4J JSON format](https://github.com/logstash/log4j-jsonevent-layout). Samza also supports pluggable serialization for those that prefer non-JSON logging events. This can be configured the same way other stream serializers are defined:
143148

144149
{% highlight jproperties %}

samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.samza.system;
2020

21+
import java.lang.reflect.Constructor;
2122
import org.apache.samza.annotation.InterfaceStability;
2223
import org.apache.samza.config.ConfigException;
2324
import org.apache.samza.operators.StreamGraphBuilder;
@@ -26,6 +27,9 @@
2627

2728
/**
2829
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
30+
*
31+
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
32+
* to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor.
2933
*/
3034
@InterfaceStability.Unstable
3135
public interface ExecutionEnvironment {
@@ -46,13 +50,17 @@ static ExecutionEnvironment getLocalEnvironment(Config config) {
4650
/**
4751
* Static method to load the non-standalone environment.
4852
*
53+
* Requires the implementation class to define a constructor with a single {@link Config} as the argument.
54+
*
4955
* @param config configuration passed in to initialize the Samza processes
5056
* @return the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
5157
*/
5258
static ExecutionEnvironment fromConfig(Config config) {
5359
try {
54-
if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) {
55-
return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance();
60+
Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS));
61+
if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
62+
Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh*
63+
return (ExecutionEnvironment) constructor.newInstance(config);
5664
}
5765
} catch (Exception e) {
5866
throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
@@ -70,4 +78,24 @@ static ExecutionEnvironment fromConfig(Config config) {
7078
*/
7179
void run(StreamGraphBuilder graphBuilder, Config config);
7280

81+
/**
82+
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
83+
*
84+
* The stream configurations are read from the following properties in the config:
85+
* {@code streams.{$streamId}.*}
86+
* <br>
87+
* All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
88+
* properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
89+
*
90+
* <ul>
91+
* <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
92+
* the stream will be associated with the System defined in {@code job.default.system}</li>
93+
* <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
94+
* If this property isn't defined the physical.name will be set to the streamId</li>
95+
* </ul>
96+
*
97+
* @param streamId The logical identifier for the stream in Samza.
98+
* @return The {@link StreamSpec} instance.
99+
*/
100+
StreamSpec streamFromConfig(String streamId);
73101
}

samza-api/src/main/java/org/apache/samza/system/StreamSpec.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,8 @@ public StreamSpec(String id, String physicalName, String systemName, Map<String,
137137
* @param config A map of properties for the stream. These may be System-specfic.
138138
*/
139139
public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) {
140-
if (id == null) {
141-
throw new NullPointerException("Parameter 'id' must not be null");
142-
}
143-
144-
if (systemName == null) {
145-
throw new NullPointerException("Parameter 'systemName' must not be null");
146-
}
140+
validateLogicalIdentifier("id", id);
141+
validateLogicalIdentifier("systemName", systemName);
147142

148143
if (partitionCount < 1) {
149144
throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0");
@@ -200,4 +195,10 @@ public String get(String propertyName) {
200195
public String getOrDefault(String propertyName, String defaultValue) {
201196
return config.getOrDefault(propertyName, defaultValue);
202197
}
198+
199+
private void validateLogicalIdentifier(String identifierName, String identifierValue) {
200+
if (!identifierValue.matches("[A-Za-z0-9_-]+")) {
201+
throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
202+
}
203+
}
203204
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.system;
20+
21+
import java.util.Map;
22+
import org.apache.samza.config.Config;
23+
import org.apache.samza.config.StreamConfig;
24+
25+
26+
public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
27+
28+
private final Config config;
29+
30+
public AbstractExecutionEnvironment(Config config) {
31+
if (config == null) {
32+
throw new NullPointerException("Parameter 'config' cannot be null.");
33+
}
34+
35+
this.config = config;
36+
}
37+
38+
@Override
39+
public StreamSpec streamFromConfig(String streamId) {
40+
StreamConfig streamConfig = new StreamConfig(config);
41+
String physicalName = streamConfig.getPhysicalName(streamId, streamId);
42+
43+
return streamFromConfig(streamId, physicalName);
44+
}
45+
46+
/**
47+
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
48+
*
49+
* The stream configurations are read from the following properties in the config:
50+
* {@code streams.{$streamId}.*}
51+
* <br>
52+
* All properties matching this pattern are assumed to be system-specific with one exception. The following
53+
* property is a Samza property which is used to bind the stream to a system.
54+
*
55+
* <ul>
56+
* <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
57+
* the stream will be associated with the System defined in {@code job.default.system}</li>
58+
* </ul>
59+
*
60+
* @param streamId The logical identifier for the stream in Samza.
61+
* @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
62+
* @return The {@link StreamSpec} instance.
63+
*/
64+
/*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
65+
StreamConfig streamConfig = new StreamConfig(config);
66+
String system = streamConfig.getSystem(streamId);
67+
68+
return streamFromConfig(streamId, physicalName, system);
69+
}
70+
71+
/**
72+
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
73+
*
74+
* The stream configurations are read from the following properties in the config:
75+
* {@code streams.{$streamId}.*}
76+
*
77+
* @param streamId The logical identifier for the stream in Samza.
78+
* @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
79+
* @param system The name of the System on which this stream will be used.
80+
* @return The {@link StreamSpec} instance.
81+
*/
82+
/*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
83+
StreamConfig streamConfig = new StreamConfig(config);
84+
Map<String, String> properties = streamConfig.getStreamProperties(streamId);
85+
86+
return new StreamSpec(streamId, physicalName, system, properties);
87+
}
88+
}

samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
/**
2525
* This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
2626
*/
27-
public class RemoteExecutionEnvironment implements ExecutionEnvironment {
27+
public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
28+
29+
public RemoteExecutionEnvironment(Config config) {
30+
super(config);
31+
}
2832

2933
@Override public void run(StreamGraphBuilder app, Config config) {
3034
// TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}

samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@
2828
/**
2929
* This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
3030
*/
31-
public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
31+
public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment {
32+
33+
public StandaloneExecutionEnvironment(Config config) {
34+
super(config);
35+
}
3236

3337
// TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
3438
StreamGraph createGraph(StreamGraphBuilder app, Config config) {

samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ object JobConfig {
4343
val SAMZA_FWK_PATH = "samza.fwk.path"
4444
val SAMZA_FWK_VERSION = "samza.fwk.version"
4545
val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
46+
val JOB_DEFAULT_SYSTEM = "job.default.system"
4647
val JOB_CONTAINER_COUNT = "job.container.count"
4748
val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
4849
val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -104,6 +105,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
104105
def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
105106
throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
106107

108+
def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)
109+
107110
def getContainerCount = {
108111
getOption(JobConfig.JOB_CONTAINER_COUNT) match {
109112
case Some(count) => count.toInt

0 commit comments

Comments
 (0)