diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index ba4c0558d..79faedf0a 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -23,8 +23,13 @@
*.falcon.replication.workflow.maxmaps=5
*.falcon.replication.workflow.mapbandwidth=100
-*.webservices.default.results.per.page=10
+*.falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb=512
+*.falcon.feed.workflow.yarn.app.mapreduce.am.command-opts=-Xmx400m -XX:MaxMetaspaceSize=64m
+*.falcon.feed.workflow.mapreduce.map.memory.mb=512
+*.falcon.feed.workflow.mapreduce.map.java.opts=-Xmx400m -XX:MaxMetaspaceSize=64m
+*.webservices.default.results.per.page=10
+
# If true, do not run retention past feedCluster validity end time.
# This will retain recent instances beyond feedCluster validity end time.
*.falcon.retention.keep.instances.beyond.validity=true
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 11d1e1b0b..7b23aad0e 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -392,6 +392,10 @@ permission indicates the permission.
+
+
+
+
@@ -414,17 +418,30 @@ available to user to specify the Hadoop job queue and priority, the same values
"timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while
waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and
order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY.
-DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents
-the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s
-used by each mapper during replication. "overwrite" represents overwrite destination during replication.
-"ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents
-bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the
-destination but not in source during replication. "preserveBlockSize" represents preserving block size during
-replication. "preserveReplicationNumber" represents preserving replication number during replication.
-"preservePermission" represents preserving permission during replication. "preserveUser" represents preserving user during replication.
-"preserveGroup" represents preserving group during replication. "preserveChecksumType" represents preserving checksum type during replication.
-"preserveAcl" represents preserving ACL during replication. "preserveXattr" represents preserving Xattr during replication.
-"preserveTimes" represents preserving access and modification times during replication. "tdeEncryptionEnabled" if TDE is enabled.
+DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. Below are the few
+properties that can be passed as key value properties to propagate into workflow engine.
+"maxMaps" represents the maximum number of maps used during replication.
+"mapBandwidth" represents the bandwidth in MB/s used by each mapper during replication.
+"mapMemory" represents the mapreduce map memory in mb to be specified to the respective replication and retention jobs.
+"mapJavaOpts" represents the mapreduce java opts to be specified to therespective replication and retention jobs.
+"amMemory" represents the application master memory in mb to be specified to the respective replication and retention
+application masters.
+"amJavaOpts" represents the application master java opts to be specified to the respective
+replication and retention application masters.
+"overwrite" represents overwrite destination during replication.
+"ignoreErrors" represents ignore failures not causing the job to fail during replication.
+"skipChecksum" represents bypassing checksum verification during replication.
+"removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication.
+"preserveBlockSize" represents preserving block size during replication.
+"preserveReplicationNumber" represents preserving replication number during replication.
+"preservePermission" represents preserving permission during replication.
+"preserveUser" represents preserving user during replication.
+"preserveGroup" represents preserving group during replication.
+"preserveChecksumType" represents preserving checksum type during replication.
+"preserveAcl" represents preserving ACL during replication.
+"preserveXattr" represents preserving Xattr during replication.
+"preserveTimes" represents preserving access and modification times during replication.
+"tdeEncryptionEnabled" if TDE is enabled.
---+++ Lifecycle
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
index dd0c6d2ad..7cc62371e 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
@@ -29,6 +29,7 @@
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
import org.apache.falcon.lifecycle.retention.AgeBasedDelete;
@@ -98,6 +99,10 @@ public static Properties build(Cluster cluster, Path basePath, Feed feed) throws
props.put(OozieBuilderUtils.MR_JOB_PRIORITY, retentionStage.getPriority());
}
+ for (Property retentionStageProperty : retentionStage.getProperties().getProperties()) {
+ props.put(retentionStageProperty.getName(), retentionStageProperty.getValue());
+ }
+
if (EntityUtil.isTableStorageType(cluster, feed)) {
setupHiveCredentials(cluster, buildPath, workflow);
// copy paste todo kludge send source hcat creds for coord dependency check to pass
diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml
index bded1d6bf..df00beb0c 100644
--- a/lifecycle/src/main/resources/action/feed/eviction-action.xml
+++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml
@@ -37,8 +37,28 @@
oozie.launcher.oozie.libpath
${wf:conf("falcon.libpath")}
+
+ oozie.launcher.yarn.app.mapreduce.am.resource.mb
+ ${amMemory}
+
+
+ oozie.launcher.yarn.app.mapreduce.am.command-opts
+ ${amCommandOpts}
+
+
+ oozie.launcher.mapreduce.map.memory.mb
+ ${mapMemory}
+
+
+ oozie.launcher.mapreduce.map.java.opts
+ ${mapJavaOpts}
+
org.apache.falcon.retention.FeedEvictor
+ -Dmapreduce.map.memory.mb={amMemory}
+ -Dmapreduce.map.memory.mb={amCommandOpts}
+ -Dmapreduce.map.memory.mb={mapMemory}
+ -Dmapreduce.map.memory.mb={mapJavaOpts}
-feedBasePath
${feedDataPath}
-falconFeedStorageType
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index db647aa6f..ff83d73b9 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -48,6 +48,10 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
protected static final String REPLICATION_ACTION_NAME = "replication";
private static final String MR_MAX_MAPS = "maxMaps";
private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
+ private static final String MR_AM_MEMORY = "amMemory";
+ private static final String MR_AM_COMMAND_OPTS = "amCommandOpts";
+ private static final String MR_MAP_MEMORY = "mapMemory";
+ private static final String MR_MAP_JAVA_OPTS = "mapJavaOpts";
private static final String REPLICATION_JOB_COUNTER = "job.counter";
private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled";
@@ -96,6 +100,18 @@ protected Properties getWorkflowProperties(Feed feed) throws FalconException {
if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
}
+ if (props.getProperty(MR_AM_MEMORY) == null) { // set default app master memory if user has not overridden
+ props.put(MR_AM_MEMORY, getDefaultAmMemory());
+ }
+ if (props.getProperty(MR_AM_COMMAND_OPTS) == null) { // set default app maseter opts if user has not overridden
+ props.put(MR_AM_COMMAND_OPTS, getDefaultAmCommandOpts());
+ }
+ if (props.getProperty(MR_MAP_MEMORY) == null) { // set default map memory if user has not overridden
+ props.put(MR_MAP_MEMORY, getDefaultMapMemory());
+ }
+ if (props.getProperty(MR_MAP_JAVA_OPTS) == null) { // set default map java opts if user has not overridden
+ props.put(MR_MAP_JAVA_OPTS, getDefaultMapJavaOpts());
+ }
return props;
}
@@ -145,6 +161,24 @@ private String getDefaultMapBandwidth() {
return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
}
+ private String getDefaultAmMemory() {
+ return RuntimeProperties.get().getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb", "512");
+ }
+
+ private String getDefaultAmCommandOpts() {
+ return RuntimeProperties.get().getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.command-opts",
+ "-Xmx400m -XX:MaxMetaspaceSize=64m");
+ }
+
+ private String getDefaultMapMemory() {
+ return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512");
+ }
+
+ private String getDefaultMapJavaOpts() {
+ return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.java.opts",
+ "-Xmx400m -XX:MaxMetaspaceSize=64m");
+ }
+
private boolean isTDEEnabled() {
String tdeEncryptionEnabled = FeedHelper.getPropertyValue(entity, TDE_ENCRYPTION_ENABLED);
return "true" .equalsIgnoreCase(tdeEncryptionEnabled);
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index fd51ed06f..8e85c254e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -30,6 +30,7 @@
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,11 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml";
private static final String EVICTION_ACTION_NAME = "eviction";
+ private static final String MR_AM_MEMORY = "amMemory";
+ private static final String MR_AM_COMMAND_OPTS = "amCommandOpts";
+ private static final String MR_MAP_MEMORY = "mapMemory";
+ private static final String MR_MAP_JAVA_OPTS = "mapJavaOpts";
+
public FeedRetentionWorkflowBuilder(Feed entity) {
super(entity, LifeCycle.EVICTION);
@@ -63,6 +69,25 @@ public FeedRetentionWorkflowBuilder(Feed entity) {
props.putAll(createDefaultConfiguration(cluster));
props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
+ if (props.getProperty(MR_AM_MEMORY) == null) { // set default app master memory if user has not overridden
+ props.put(MR_AM_MEMORY, RuntimeProperties.get()
+ .getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb", "512"));
+ }
+ if (props.getProperty(MR_AM_COMMAND_OPTS) == null) { // set default app maseter opts if user has not overridden
+ props.put(MR_AM_COMMAND_OPTS, RuntimeProperties.get()
+ .getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.command-opts",
+ "-Xmx400m -XX:MaxMetaspaceSize=64m"));
+ }
+ if (props.getProperty(MR_MAP_MEMORY) == null) { // set default memory if user has not overridden
+ props.put(MR_MAP_MEMORY, RuntimeProperties.get().
+ getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512"));
+ }
+ if (props.getProperty(MR_MAP_JAVA_OPTS) == null) { // set default map java opts if user has not overridden
+ props.put(MR_MAP_JAVA_OPTS,
+ RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.java.opts",
+ "-Xmx400m -XX:MaxMetaspaceSize=64m"));
+ }
+
if (EntityUtil.isTableStorageType(cluster, entity)) {
setupHiveCredentials(cluster, buildPath, workflow);
// todo: kludge send source hcat creds for coord dependency check to pass
diff --git a/oozie/src/main/resources/action/feed/eviction-action.xml b/oozie/src/main/resources/action/feed/eviction-action.xml
index bded1d6bf..df00beb0c 100644
--- a/oozie/src/main/resources/action/feed/eviction-action.xml
+++ b/oozie/src/main/resources/action/feed/eviction-action.xml
@@ -37,8 +37,28 @@
oozie.launcher.oozie.libpath
${wf:conf("falcon.libpath")}
+
+ oozie.launcher.yarn.app.mapreduce.am.resource.mb
+ ${amMemory}
+
+
+ oozie.launcher.yarn.app.mapreduce.am.command-opts
+ ${amCommandOpts}
+
+
+ oozie.launcher.mapreduce.map.memory.mb
+ ${mapMemory}
+
+
+ oozie.launcher.mapreduce.map.java.opts
+ ${mapJavaOpts}
+
org.apache.falcon.retention.FeedEvictor
+ -Dmapreduce.map.memory.mb={amMemory}
+ -Dmapreduce.map.memory.mb={amCommandOpts}
+ -Dmapreduce.map.memory.mb={mapMemory}
+ -Dmapreduce.map.memory.mb={mapJavaOpts}
-feedBasePath
${feedDataPath}
-falconFeedStorageType
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
index ff8f4f39d..9f9503e42 100644
--- a/oozie/src/main/resources/action/feed/replication-action.xml
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -41,11 +41,31 @@
oozie.launcher.oozie.libpath
${wf:conf("falcon.libpath")}
+
+ oozie.launcher.yarn.app.mapreduce.am.resource.mb
+ ${amMemory}
+
+
+ oozie.launcher.yarn.app.mapreduce.am.command-opts
+ ${amCommandOpts}
+
+
+ oozie.launcher.mapreduce.map.memory.mb
+ ${mapMemory}
+
+
+ oozie.launcher.mapreduce.map.java.opts
+ ${mapJavaOpts}
+
org.apache.falcon.replication.FeedReplicator
-Dfalcon.include.path=${sourceRelativePaths}
-Dmapred.job.queue.name=${queueName}
-Dmapred.job.priority=${jobPriority}
+ -Dmapreduce.map.memory.mb={amMemory}
+ -Dmapreduce.map.memory.mb={amCommandOpts}
+ -Dmapreduce.map.memory.mb={mapMemory}
+ -Dmapreduce.map.memory.mb={mapJavaOpts}
-maxMaps
${maxMaps}
-mapBandwidth
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index d753baf26..1a2d09db6 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -201,6 +201,11 @@ public void testRetentionWithLifecycle(String keepInstancesPostValidity, String
Assert.assertEquals(wfProps.get("queueName"), "ageBasedDeleteQueue");
Assert.assertEquals(wfProps.get("limit"), "hours(2)");
Assert.assertEquals(wfProps.get("jobPriority"), "LOW");
+
+ Assert.assertEquals(wfProps.get("amMemory"), "1024");
+ Assert.assertEquals(wfProps.get("amCommandOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
+ Assert.assertEquals(wfProps.get("mapMemory"), "1024");
+ Assert.assertEquals(wfProps.get("mapJavaOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
}
@Test
@@ -391,6 +396,11 @@ public void testReplicationCoordsForFSStorage() throws Exception {
Assert.assertEquals(wfProps.get("maxMaps"), "5");
Assert.assertEquals(wfProps.get("mapBandwidth"), "100");
+ Assert.assertEquals(wfProps.get("amMemory"), "1024");
+ Assert.assertEquals(wfProps.get("amCommandOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
+ Assert.assertEquals(wfProps.get("mapMemory"), "1024");
+ Assert.assertEquals(wfProps.get("mapJavaOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
+
assertLibExtensions(coord, "replication");
WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
assertWorkflowRetries(wf);
@@ -499,9 +509,9 @@ private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster,
JAVA replication = replicationActionNode.getJava();
List args = replication.getArg();
if (args.contains("-counterLogDir")) {
- Assert.assertEquals(args.size(), 17);
+ Assert.assertEquals(args.size(), 21);
} else {
- Assert.assertEquals(args.size(), 15);
+ Assert.assertEquals(args.size(), 19);
}
HashMap props = getCoordProperties(coord);
diff --git a/oozie/src/test/resources/feed/feed.xml b/oozie/src/test/resources/feed/feed.xml
index 6e3126273..10271352d 100644
--- a/oozie/src/test/resources/feed/feed.xml
+++ b/oozie/src/test/resources/feed/feed.xml
@@ -53,5 +53,9 @@
+
+
+
+
diff --git a/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml
index bdf1e59dd..145708a04 100644
--- a/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml
+++ b/oozie/src/test/resources/feed/fs-local-retention-lifecycle-feed.xml
@@ -38,6 +38,10 @@
HIGH
+
+
+
+
diff --git a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
index 230e2b096..aa9eaec18 100644
--- a/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
+++ b/oozie/src/test/resources/feed/fs-replication-feed-counters.xml
@@ -55,5 +55,9 @@
+
+
+
+
diff --git a/oozie/src/test/resources/feed/fs-replication-feed.xml b/oozie/src/test/resources/feed/fs-replication-feed.xml
index 0e9065c27..b6a34c6a1 100644
--- a/oozie/src/test/resources/feed/fs-replication-feed.xml
+++ b/oozie/src/test/resources/feed/fs-replication-feed.xml
@@ -64,5 +64,9 @@
+
+
+
+
diff --git a/oozie/src/test/resources/feed/fs-retention-feed.xml b/oozie/src/test/resources/feed/fs-retention-feed.xml
index 7eb85fa3d..6a144ffb2 100644
--- a/oozie/src/test/resources/feed/fs-retention-feed.xml
+++ b/oozie/src/test/resources/feed/fs-retention-feed.xml
@@ -45,6 +45,10 @@
+
+
+
+
diff --git a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
index a230f1581..55aa228c1 100644
--- a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
+++ b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
@@ -45,6 +45,10 @@
+
+
+
+
diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties
index 7dec191a8..179336c75 100644
--- a/webapp/src/test/resources/runtime.properties
+++ b/webapp/src/test/resources/runtime.properties
@@ -23,6 +23,10 @@
*.falcon.replication.workflow.maxmaps=5
*.falcon.replication.workflow.mapbandwidth=100
+*.falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb=512
+*.falcon.feed.workflow.yarn.app.mapreduce.am.command-opts=-Xmx400m -XX:MaxMetaspaceSize=64m
+*.falcon.feed.workflow.mapreduce.map.memory.mb=512
+*.falcon.feed.workflow.mapreduce.map.java.opts=-Xmx400m -XX:MaxMetaspaceSize=64m
*.webservices.default.results.per.page=10
# If true, do not run retention past feedCluster validity end time.