Skip to content
This repository was archived by the owner on Apr 4, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion common/src/main/resources/runtime.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@

*.falcon.replication.workflow.maxmaps=5
*.falcon.replication.workflow.mapbandwidth=100
*.webservices.default.results.per.page=10
*.falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb=512
*.falcon.feed.workflow.yarn.app.mapreduce.am.command-opts=-Xmx400m -XX:MaxMetaspaceSize=64m
*.falcon.feed.workflow.mapreduce.map.memory.mb=512
*.falcon.feed.workflow.mapreduce.map.java.opts=-Xmx400m -XX:MaxMetaspaceSize=64m

*.webservices.default.results.per.page=10

# If true, do not run retention past feedCluster validity end time.
# This will retain recent instances beyond feedCluster validity end time.
*.falcon.retention.keep.instances.beyond.validity=true
Expand Down
39 changes: 28 additions & 11 deletions docs/src/site/twiki/EntitySpecification.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ permission indicates the permission.
<property name="parallel" value="3"/>
<property name="maxMaps" value="8"/>
<property name="mapBandwidth" value="1"/>
<property name="amMemory" value="512"/>
<property name="amCommandOpts" value="-Xmx400m -XX:MaxMetaspaceSize=64m"/>
<property name="mapMemory" value="512"/>
<property name="mapJavaOpts" value="-Xmx400m -XX:MaxMetaspaceSize=64m"/>
<property name="overwrite" value="true"/>
<property name="ignoreErrors" value="false"/>
<property name="skipChecksum" value="false"/>
Expand All @@ -414,17 +418,30 @@ available to user to specify the Hadoop job queue and priority, the same values
"timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while
waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and
order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY.
DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents
the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s
used by each mapper during replication. "overwrite" represents overwrite destination during replication.
"ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents
bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the
destination but not in source during replication. "preserveBlockSize" represents preserving block size during
replication. "preserveReplicationNumber" represents preserving replication number during replication.
"preservePermission" represents preserving permission during replication. "preserveUser" represents preserving user during replication.
"preserveGroup" represents preserving group during replication. "preserveChecksumType" represents preserving checksum type during replication.
"preserveAcl" represents preserving ACL during replication. "preserveXattr" represents preserving Xattr during replication.
"preserveTimes" represents preserving access and modification times during replication. "tdeEncryptionEnabled" if TDE is enabled.
DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. Below are the few
properties that can be passed as key value properties to propagate into workflow engine.
"maxMaps" represents the maximum number of maps used during replication.
"mapBandwidth" represents the bandwidth in MB/s used by each mapper during replication.
"mapMemory" represents the mapreduce map memory in mb to be specified to the respective replication and retention jobs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just to keep it consistent either start it from the same line or move every property name to a separate line.
Right now mapBandwith is in one line and rest of setting is in form of paragraph.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack,will make the changes accordingly.

"mapJavaOpts" represents the mapreduce java opts to be specified to therespective replication and retention jobs.
"amMemory" represents the application master memory in mb to be specified to the respective replication and retention
application masters.
"amJavaOpts" represents the application master java opts to be specified to the respective
replication and retention application masters.
"overwrite" represents overwrite destination during replication.
"ignoreErrors" represents ignore failures not causing the job to fail during replication.
"skipChecksum" represents bypassing checksum verification during replication.
"removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication.
"preserveBlockSize" represents preserving block size during replication.
"preserveReplicationNumber" represents preserving replication number during replication.
"preservePermission" represents preserving permission during replication.
"preserveUser" represents preserving user during replication.
"preserveGroup" represents preserving group during replication.
"preserveChecksumType" represents preserving checksum type during replication.
"preserveAcl" represents preserving ACL during replication.
"preserveXattr" represents preserving Xattr during replication.
"preserveTimes" represents preserving access and modification times during replication.
"tdeEncryptionEnabled" if TDE is enabled.


---+++ Lifecycle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
import org.apache.falcon.lifecycle.retention.AgeBasedDelete;
Expand Down Expand Up @@ -98,6 +99,10 @@ public static Properties build(Cluster cluster, Path basePath, Feed feed) throws
props.put(OozieBuilderUtils.MR_JOB_PRIORITY, retentionStage.getPriority());
}

for (Property retentionStageProperty : retentionStage.getProperties().getProperties()) {
props.put(retentionStageProperty.getName(), retentionStageProperty.getValue());
}

if (EntityUtil.isTableStorageType(cluster, feed)) {
setupHiveCredentials(cluster, buildPath, workflow);
// copy paste todo kludge send source hcat creds for coord dependency check to pass
Expand Down
20 changes: 20 additions & 0 deletions lifecycle/src/main/resources/action/feed/eviction-action.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,28 @@
<name>oozie.launcher.oozie.libpath</name>
<value>${wf:conf("falcon.libpath")}</value>
</property>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are setting the launcher memory for others, better be consistent and set it here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I will put launcher memory here too.

<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>${amMemory}</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>
<value>${amCommandOpts}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>${mapMemory}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>${mapJavaOpts}</value>
</property>
</configuration>
<main-class>org.apache.falcon.retention.FeedEvictor</main-class>
<arg>-Dmapreduce.map.memory.mb={amMemory}</arg>
<arg>-Dmapreduce.map.memory.mb={amCommandOpts}</arg>
<arg>-Dmapreduce.map.memory.mb={mapMemory}</arg>
<arg>-Dmapreduce.map.memory.mb={mapJavaOpts}</arg>
<arg>-feedBasePath</arg>
<arg>${feedDataPath}</arg>
<arg>-falconFeedStorageType</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
protected static final String REPLICATION_ACTION_NAME = "replication";
private static final String MR_MAX_MAPS = "maxMaps";
private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
private static final String MR_AM_MEMORY = "amMemory";
private static final String MR_AM_COMMAND_OPTS = "amCommandOpts";
private static final String MR_MAP_MEMORY = "mapMemory";
private static final String MR_MAP_JAVA_OPTS = "mapJavaOpts";
private static final String REPLICATION_JOB_COUNTER = "job.counter";
private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled";

Expand Down Expand Up @@ -96,6 +100,18 @@ protected Properties getWorkflowProperties(Feed feed) throws FalconException {
if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
}
if (props.getProperty(MR_AM_MEMORY) == null) { // set default app master memory if user has not overridden
props.put(MR_AM_MEMORY, getDefaultAmMemory());
}
if (props.getProperty(MR_AM_COMMAND_OPTS) == null) { // set default app maseter opts if user has not overridden
props.put(MR_AM_COMMAND_OPTS, getDefaultAmCommandOpts());
}
if (props.getProperty(MR_MAP_MEMORY) == null) { // set default map memory if user has not overridden
props.put(MR_MAP_MEMORY, getDefaultMapMemory());
}
if (props.getProperty(MR_MAP_JAVA_OPTS) == null) { // set default map java opts if user has not overridden
props.put(MR_MAP_JAVA_OPTS, getDefaultMapJavaOpts());
}

return props;
}
Expand Down Expand Up @@ -145,6 +161,24 @@ private String getDefaultMapBandwidth() {
return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100");
}

private String getDefaultAmMemory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to move these method to OozieOrchestrationWorkflowBuilder and use it from there in FeedRetentionWorkflowBuilder and FeedReplicationWorkflowBuilder rather than doing same thing twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These settings are not meant for other builders like ProcessWorfklowBuilder, might give wrong conscience of being available for other builders. Thats why it has been put into these two builders specifically. In case if we want to achieve the same for process builder too then we can move it into OrchestrationWorkflowBuilder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still its better to move it to OrchestrationWorkflowBuilder to avoid repetition of code.We can override the method in ProcessWorfklowBuilder and throw an exception method not supported if donot want to use them there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its just not ProcessWorkflowBuilder , there are data import, export and hive builders.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed please remove the method and use it inline in both the places.

return RuntimeProperties.get().getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb", "512");
}

private String getDefaultAmCommandOpts() {
return RuntimeProperties.get().getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.command-opts",
"-Xmx400m -XX:MaxMetaspaceSize=64m");
}

private String getDefaultMapMemory() {
return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512");
}

private String getDefaultMapJavaOpts() {
return RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.java.opts",
"-Xmx400m -XX:MaxMetaspaceSize=64m");
}

private boolean isTDEEnabled() {
String tdeEncryptionEnabled = FeedHelper.getPropertyValue(entity, TDE_ENCRYPTION_ENABLED);
return "true" .equalsIgnoreCase(tdeEncryptionEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
Expand All @@ -43,6 +44,11 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml";

private static final String EVICTION_ACTION_NAME = "eviction";
private static final String MR_AM_MEMORY = "amMemory";
private static final String MR_AM_COMMAND_OPTS = "amCommandOpts";
private static final String MR_MAP_MEMORY = "mapMemory";
private static final String MR_MAP_JAVA_OPTS = "mapJavaOpts";


public FeedRetentionWorkflowBuilder(Feed entity) {
super(entity, LifeCycle.EVICTION);
Expand All @@ -63,6 +69,25 @@ public FeedRetentionWorkflowBuilder(Feed entity) {
props.putAll(createDefaultConfiguration(cluster));
props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));

if (props.getProperty(MR_AM_MEMORY) == null) { // set default app master memory if user has not overridden
props.put(MR_AM_MEMORY, RuntimeProperties.get()
.getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.resource.mb", "512"));
}
if (props.getProperty(MR_AM_COMMAND_OPTS) == null) { // set default app maseter opts if user has not overridden
props.put(MR_AM_COMMAND_OPTS, RuntimeProperties.get()
.getProperty("falcon.feed.workflow.yarn.app.mapreduce.am.command-opts",
"-Xmx400m -XX:MaxMetaspaceSize=64m"));
}
if (props.getProperty(MR_MAP_MEMORY) == null) { // set default memory if user has not overridden
props.put(MR_MAP_MEMORY, RuntimeProperties.get().
getProperty("falcon.feed.workflow.mapreduce.map.memory.mb", "512"));
}
if (props.getProperty(MR_MAP_JAVA_OPTS) == null) { // set default map java opts if user has not overridden
props.put(MR_MAP_JAVA_OPTS,
RuntimeProperties.get().getProperty("falcon.feed.workflow.mapreduce.map.java.opts",
"-Xmx400m -XX:MaxMetaspaceSize=64m"));
}

if (EntityUtil.isTableStorageType(cluster, entity)) {
setupHiveCredentials(cluster, buildPath, workflow);
// todo: kludge send source hcat creds for coord dependency check to pass
Expand Down
20 changes: 20 additions & 0 deletions oozie/src/main/resources/action/feed/eviction-action.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,28 @@
<name>oozie.launcher.oozie.libpath</name>
<value>${wf:conf("falcon.libpath")}</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>${amMemory}</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>
<value>${amCommandOpts}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>${mapMemory}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>${mapJavaOpts}</value>
</property>
</configuration>
<main-class>org.apache.falcon.retention.FeedEvictor</main-class>
<arg>-Dmapreduce.map.memory.mb={amMemory}</arg>
<arg>-Dmapreduce.map.memory.mb={amCommandOpts}</arg>
<arg>-Dmapreduce.map.memory.mb={mapMemory}</arg>
<arg>-Dmapreduce.map.memory.mb={mapJavaOpts}</arg>
<arg>-feedBasePath</arg>
<arg>${feedDataPath}</arg>
<arg>-falconFeedStorageType</arg>
Expand Down
20 changes: 20 additions & 0 deletions oozie/src/main/resources/action/feed/replication-action.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,31 @@
<name>oozie.launcher.oozie.libpath</name>
<value>${wf:conf("falcon.libpath")}</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>${amMemory}</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>
<value>${amCommandOpts}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>${mapMemory}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>${mapJavaOpts}</value>
</property>
</configuration>
<main-class>org.apache.falcon.replication.FeedReplicator</main-class>
<arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
<arg>-Dmapred.job.queue.name=${queueName}</arg>
<arg>-Dmapred.job.priority=${jobPriority}</arg>
<arg>-Dmapreduce.map.memory.mb={amMemory}</arg>
<arg>-Dmapreduce.map.memory.mb={amCommandOpts}</arg>
<arg>-Dmapreduce.map.memory.mb={mapMemory}</arg>
<arg>-Dmapreduce.map.memory.mb={mapJavaOpts}</arg>
<arg>-maxMaps</arg>
<arg>${maxMaps}</arg>
<arg>-mapBandwidth</arg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ public void testRetentionWithLifecycle(String keepInstancesPostValidity, String
Assert.assertEquals(wfProps.get("queueName"), "ageBasedDeleteQueue");
Assert.assertEquals(wfProps.get("limit"), "hours(2)");
Assert.assertEquals(wfProps.get("jobPriority"), "LOW");

Assert.assertEquals(wfProps.get("amMemory"), "1024");
Assert.assertEquals(wfProps.get("amCommandOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
Assert.assertEquals(wfProps.get("mapMemory"), "1024");
Assert.assertEquals(wfProps.get("mapJavaOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
}

@Test
Expand Down Expand Up @@ -391,6 +396,11 @@ public void testReplicationCoordsForFSStorage() throws Exception {
Assert.assertEquals(wfProps.get("maxMaps"), "5");
Assert.assertEquals(wfProps.get("mapBandwidth"), "100");

Assert.assertEquals(wfProps.get("amMemory"), "1024");
Assert.assertEquals(wfProps.get("amCommandOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");
Assert.assertEquals(wfProps.get("mapMemory"), "1024");
Assert.assertEquals(wfProps.get("mapJavaOpts"), "-Xmx800m -XX:MaxMetaspaceSize=128m");

assertLibExtensions(coord, "replication");
WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
assertWorkflowRetries(wf);
Expand Down Expand Up @@ -499,9 +509,9 @@ private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster,
JAVA replication = replicationActionNode.getJava();
List<String> args = replication.getArg();
if (args.contains("-counterLogDir")) {
Assert.assertEquals(args.size(), 17);
Assert.assertEquals(args.size(), 21);
} else {
Assert.assertEquals(args.size(), 15);
Assert.assertEquals(args.size(), 19);
}

HashMap<String, String> props = getCoordProperties(coord);
Expand Down
4 changes: 4 additions & 0 deletions oozie/src/test/resources/feed/feed.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,9 @@
<property name="order" value="FIFO" />
<property name="parallel" value="2" />
<property name="field4" value="value2"/>
<property name="amMemory" value="1024"></property>
<property name="amCommandOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
<property name="mapMemory" value="1024"></property>
<property name="mapJavaOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
</properties>
</feed>
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<priority>HIGH</priority>
<properties>
<property name="retention.policy.agebaseddelete.limit" value="hours(4)"></property>
<property name="amMemory" value="1024"></property>
<property name="amCommandOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
<property name="mapMemory" value="1024"></property>
<property name="mapJavaOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
</properties>
</retention-stage>
</lifecycle>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,9 @@
<property name="maxMaps" value="33" />
<property name="mapBandwidth" value="2" />
<property name="job.counter" value="true" />
<property name="amMemory" value="1024"></property>
<property name="amCommandOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
<property name="mapMemory" value="1024"></property>
<property name="mapJavaOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
</properties>
</feed>
4 changes: 4 additions & 0 deletions oozie/src/test/resources/feed/fs-replication-feed.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,9 @@
<properties>
<property name="maxMaps" value="33" />
<property name="mapBandwidth" value="2" />
<property name="amMemory" value="1024"></property>
<property name="amCommandOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
<property name="mapMemory" value="1024"></property>
<property name="mapJavaOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
</properties>
</feed>
4 changes: 4 additions & 0 deletions oozie/src/test/resources/feed/fs-retention-feed.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<properties>
<property name="maxMaps" value="33" />
<property name="mapBandwidth" value="2" />
<property name="amMemory" value="1024"></property>
<property name="amCommandOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
<property name="mapMemory" value="1024"></property>
<property name="mapJavaOpts" value="-Xmx800m -XX:MaxMetaspaceSize=128m"></property>
</properties>

</feed>
Loading