Skip to content

Commit 130c474

Browse files
committed
SAMZA-2068: Separating container launch logic into util class
The container launch logic needs to be invoked for beam-runner to run beam containers. This is a small refactoring of LocalContainerRunner.java. Author: xiliu <[email protected]> Reviewers: Prateek M <[email protected]> Closes apache#881 from xinyuiscool/SAMZA-2068
1 parent 6149298 commit 130c474

File tree

2 files changed

+180
-124
lines changed

2 files changed

+180
-124
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.runtime;
21+
22+
import org.apache.samza.SamzaException;
23+
import org.apache.samza.application.descriptors.ApplicationDescriptor;
24+
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
25+
import org.apache.samza.config.Config;
26+
import org.apache.samza.config.ShellCommandConfig;
27+
import org.apache.samza.container.ContainerHeartbeatClient;
28+
import org.apache.samza.container.ContainerHeartbeatMonitor;
29+
import org.apache.samza.container.LocalityManager;
30+
import org.apache.samza.container.SamzaContainer;
31+
import org.apache.samza.container.SamzaContainer$;
32+
import org.apache.samza.container.SamzaContainerListener;
33+
import org.apache.samza.context.ExternalContext;
34+
import org.apache.samza.context.JobContextImpl;
35+
import org.apache.samza.job.model.JobModel;
36+
import org.apache.samza.metrics.MetricsRegistryMap;
37+
import org.apache.samza.metrics.MetricsReporter;
38+
import org.apache.samza.task.TaskFactory;
39+
import org.apache.samza.task.TaskFactoryUtil;
40+
import org.apache.samza.util.ScalaJavaUtil;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
import scala.Option;
44+
45+
import java.util.HashMap;
46+
import java.util.Map;
47+
import java.util.Optional;
48+
49+
public class ContainerLaunchUtil {
50+
private static final Logger log = LoggerFactory.getLogger(ContainerLaunchUtil.class);
51+
52+
private static volatile Throwable containerRunnerException = null;
53+
54+
/**
55+
* This method launches a Samza container in a managed cluster, e.g. Yarn.
56+
*
57+
* NOTE: this util method is also invoked by Beam SamzaRunner.
58+
* Any change here needs to take Beam into account.
59+
*/
60+
public static void run(
61+
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
62+
String containerId,
63+
JobModel jobModel) {
64+
65+
Config config = jobModel.getConfig();
66+
run(appDesc, containerId, jobModel, config, buildExternalContext(config));
67+
68+
System.exit(0);
69+
}
70+
71+
private static void run(
72+
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
73+
String containerId,
74+
JobModel jobModel,
75+
Config config,
76+
Optional<ExternalContext> externalContextOptional) {
77+
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
78+
LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
79+
SamzaContainer container = SamzaContainer$.MODULE$.apply(
80+
containerId,
81+
jobModel,
82+
ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
83+
taskFactory,
84+
JobContextImpl.fromConfigWithDefaults(config),
85+
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
86+
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
87+
Option.apply(externalContextOptional.orElse(null)), localityManager);
88+
89+
ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
90+
.createInstance(new ProcessorContext() { }, config);
91+
92+
container.setContainerListener(
93+
new SamzaContainerListener() {
94+
@Override
95+
public void beforeStart() {
96+
log.info("Before starting the container.");
97+
listener.beforeStart();
98+
}
99+
100+
@Override
101+
public void afterStart() {
102+
log.info("Container Started");
103+
listener.afterStart();
104+
}
105+
106+
@Override
107+
public void afterStop() {
108+
log.info("Container Stopped");
109+
listener.afterStop();
110+
}
111+
112+
@Override
113+
public void afterFailure(Throwable t) {
114+
log.info("Container Failed");
115+
containerRunnerException = t;
116+
listener.afterFailure(t);
117+
}
118+
});
119+
120+
ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
121+
if (heartbeatMonitor != null) {
122+
heartbeatMonitor.start();
123+
}
124+
125+
container.run();
126+
if (heartbeatMonitor != null) {
127+
heartbeatMonitor.stop();
128+
}
129+
130+
if (containerRunnerException != null) {
131+
log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
132+
System.exit(1);
133+
}
134+
}
135+
136+
private static Optional<ExternalContext> buildExternalContext(Config config) {
137+
/*
138+
* By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
139+
* a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
140+
* future, components like the application descriptor may not be available to LocalContainerRunner.
141+
*/
142+
return Optional.empty();
143+
}
144+
145+
// TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
146+
// LocalApplicationRunner#createStreamProcessor()
147+
private static Map<String, MetricsReporter> loadMetricsReporters(
148+
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) {
149+
Map<String, MetricsReporter> reporters = new HashMap<>();
150+
appDesc.getMetricsReporterFactories().forEach((name, factory) ->
151+
reporters.put(name, factory.getMetricsReporter(name, containerId, config)));
152+
return reporters;
153+
}
154+
155+
/**
156+
* Creates a new container heartbeat monitor if possible.
157+
* @param container the container to monitor
158+
* @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
159+
*/
160+
private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
161+
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
162+
String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
163+
if (executionEnvContainerId != null) {
164+
log.info("Got execution environment container id: {}", executionEnvContainerId);
165+
return new ContainerHeartbeatMonitor(() -> {
166+
try {
167+
container.shutdown();
168+
containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
169+
} catch (Exception e) {
170+
log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
171+
System.exit(1);
172+
}
173+
}, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
174+
} else {
175+
log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
176+
return null;
177+
}
178+
}
179+
}

samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java

Lines changed: 1 addition & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -27,36 +27,19 @@
2727
import org.apache.samza.config.Config;
2828
import org.apache.samza.config.JobConfig;
2929
import org.apache.samza.config.ShellCommandConfig;
30-
import org.apache.samza.container.ContainerHeartbeatClient;
31-
import org.apache.samza.container.ContainerHeartbeatMonitor;
32-
import org.apache.samza.container.LocalityManager;
3330
import org.apache.samza.container.SamzaContainer;
34-
import org.apache.samza.container.SamzaContainer$;
35-
import org.apache.samza.container.SamzaContainerListener;
36-
import org.apache.samza.context.ExternalContext;
37-
import org.apache.samza.context.JobContextImpl;
3831
import org.apache.samza.job.model.JobModel;
39-
import org.apache.samza.metrics.MetricsRegistryMap;
40-
import org.apache.samza.metrics.MetricsReporter;
41-
import org.apache.samza.task.TaskFactory;
42-
import org.apache.samza.task.TaskFactoryUtil;
4332
import org.apache.samza.util.SamzaUncaughtExceptionHandler;
44-
import org.apache.samza.util.ScalaJavaUtil;
4533
import org.slf4j.Logger;
4634
import org.slf4j.LoggerFactory;
4735
import org.slf4j.MDC;
48-
import scala.Option;
49-
import java.util.HashMap;
50-
import java.util.Map;
51-
import java.util.Optional;
5236
import java.util.Random;
5337

5438
/**
5539
* Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
5640
*/
5741
public class LocalContainerRunner {
5842
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
59-
private static volatile Throwable containerRunnerException = null;
6043

6144
public static void main(String[] args) throws Exception {
6245
Thread.setDefaultUncaughtExceptionHandler(
@@ -88,113 +71,7 @@ public static void main(String[] args) throws Exception {
8871

8972
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
9073
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
91-
run(appDesc, containerId, jobModel, config, buildExternalContext(config));
9274

93-
System.exit(0);
94-
}
95-
96-
private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId,
97-
JobModel jobModel, Config config, Optional<ExternalContext> externalContextOptional) {
98-
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
99-
LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
100-
SamzaContainer container = SamzaContainer$.MODULE$.apply(
101-
containerId,
102-
jobModel,
103-
ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
104-
taskFactory,
105-
JobContextImpl.fromConfigWithDefaults(config),
106-
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
107-
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
108-
Option.apply(externalContextOptional.orElse(null)), localityManager);
109-
110-
ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
111-
.createInstance(new ProcessorContext() { }, config);
112-
113-
container.setContainerListener(
114-
new SamzaContainerListener() {
115-
@Override
116-
public void beforeStart() {
117-
log.info("Before starting the container.");
118-
listener.beforeStart();
119-
}
120-
121-
@Override
122-
public void afterStart() {
123-
log.info("Container Started");
124-
listener.afterStart();
125-
}
126-
127-
@Override
128-
public void afterStop() {
129-
log.info("Container Stopped");
130-
listener.afterStop();
131-
}
132-
133-
@Override
134-
public void afterFailure(Throwable t) {
135-
log.info("Container Failed");
136-
containerRunnerException = t;
137-
listener.afterFailure(t);
138-
}
139-
});
140-
141-
ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
142-
if (heartbeatMonitor != null) {
143-
heartbeatMonitor.start();
144-
}
145-
146-
container.run();
147-
if (heartbeatMonitor != null) {
148-
heartbeatMonitor.stop();
149-
}
150-
151-
if (containerRunnerException != null) {
152-
log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
153-
System.exit(1);
154-
}
155-
}
156-
157-
private static Optional<ExternalContext> buildExternalContext(Config config) {
158-
/*
159-
* By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
160-
* a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
161-
* future, components like the application descriptor may not be available to LocalContainerRunner.
162-
*/
163-
return Optional.empty();
164-
}
165-
166-
// TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
167-
// LocalApplicationRunner#createStreamProcessor()
168-
private static Map<String, MetricsReporter> loadMetricsReporters(
169-
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, Config config) {
170-
Map<String, MetricsReporter> reporters = new HashMap<>();
171-
appDesc.getMetricsReporterFactories().forEach((name, factory) ->
172-
reporters.put(name, factory.getMetricsReporter(name, containerId, config)));
173-
return reporters;
174-
}
175-
176-
/**
177-
* Creates a new container heartbeat monitor if possible.
178-
* @param container the container to monitor
179-
* @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
180-
*/
181-
private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
182-
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
183-
String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
184-
if (executionEnvContainerId != null) {
185-
log.info("Got execution environment container id: {}", executionEnvContainerId);
186-
return new ContainerHeartbeatMonitor(() -> {
187-
try {
188-
container.shutdown();
189-
containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
190-
} catch (Exception e) {
191-
log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e);
192-
System.exit(1);
193-
}
194-
}, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
195-
} else {
196-
log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
197-
return null;
198-
}
75+
ContainerLaunchUtil.run(appDesc, containerId, jobModel);
19976
}
20077
}

0 commit comments

Comments
 (0)