Skip to content

Commit 052b979

Browse files
committed
YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).
(cherry picked from commit 31154fd)
1 parent 42fab78 commit 052b979

File tree

14 files changed

+418
-13
lines changed

14 files changed

+418
-13
lines changed

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,13 @@ public RMContext getRMContext() {
228228
public Resource getPhysicalResource() {
229229
return null;
230230
}
231+
232+
@Override
233+
public long calculateHeartBeatInterval(
234+
long defaultInterval, long minInterval, long maxInterval,
235+
float speedupFactor, float slowdownFactor) {
236+
return defaultInterval;
237+
}
231238
}
232239

233240
public static RMNode newNodeInfo(String rackName, String hostName,

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,11 @@ public RMContext getRMContext() {
216216
public Resource getPhysicalResource() {
217217
return null;
218218
}
219+
220+
@Override
221+
public long calculateHeartBeatInterval(
222+
long defaultInterval, long minInterval, long maxInterval,
223+
float speedupFactor, float slowdownFactor) {
224+
return defaultInterval;
225+
}
219226
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,30 @@ public static boolean isAclEnabled(Configuration conf) {
667667
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
668668
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
669669

670+
/** Enable Heartbeat Interval Scaling based on cpu utilization. */
671+
public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE =
672+
RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable";
673+
public static final boolean
674+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false;
675+
676+
public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS =
677+
RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms";
678+
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000;
679+
680+
public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS =
681+
RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms";
682+
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000;
683+
684+
public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR =
685+
RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor";
686+
public static final float
687+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f;
688+
689+
public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR =
690+
RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor";
691+
public static final float
692+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
693+
670694
/** Number of worker threads that write the history data. */
671695
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
672696
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,56 @@
834834
<value>1000</value>
835835
</property>
836836

837+
<property>
838+
<description>Enables heart-beat interval scaling. The NodeManager
839+
heart-beat interval will scale based on the difference between the CPU
840+
utilization on the node and the cluster-wide average CPU utilization.
841+
</description>
842+
<name>
843+
yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable
844+
</name>
845+
<value>false</value>
846+
</property>
847+
848+
<property>
849+
<description>If heart-beat interval scaling is enabled, this is the
850+
minimum heart-beat interval in milliseconds
851+
</description>
852+
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms</name>
853+
<value>1000</value>
854+
</property>
855+
856+
<property>
857+
<description>If heart-beat interval scaling is enabled, this is the
858+
maximum heart-beat interval in milliseconds</description>
859+
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms</name>
860+
<value>1000</value>
861+
</property>
862+
863+
<property>
864+
<description>If heart-beat interval scaling is enabled, this controls
865+
the degree of adjustment when speeding up heartbeat intervals.
866+
At 1.0, 20% less than average CPU utilization will result in a 20%
867+
decrease in heartbeat interval.
868+
</description>
869+
<name>
870+
yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor
871+
</name>
872+
<value>1.0</value>
873+
</property>
874+
875+
<property>
876+
<description>If heart-beat interval scaling is enabled, this controls
877+
the degree of adjustment when slowing down heartbeat intervals.
878+
At 1.0, 20% greater than average CPU utilization will result in a 20%
879+
increase in heartbeat interval.
880+
</description>
881+
<name>
882+
yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor
883+
</name>
884+
<value>1.0</value>
885+
</property>
886+
837887
<property>
838888
<description>The minimum allowed version of a connecting nodemanager. The valid values are
839889
NONE (no version checking), EqualToRM (the nodemanager's version is equal to

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,14 @@ public RefreshNodesResourcesResponse refreshNodesResources(
722722
// refresh dynamic resource in ResourceTrackerService
723723
this.rm.getRMContext().getResourceTrackerService().
724724
updateDynamicResourceConfiguration(newConf);
725+
726+
// Update our heartbeat configuration as well
727+
Configuration ysconf =
728+
getConfiguration(new Configuration(false),
729+
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
730+
this.rm.getRMContext().getResourceTrackerService()
731+
.updateHeartBeatConfiguration(ysconf);
732+
725733
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
726734
"AdminService");
727735
return response;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
3333
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
3434
import org.apache.hadoop.metrics2.lib.MutableRate;
35+
import org.apache.hadoop.yarn.api.records.Resource;
3536
import com.google.common.annotations.VisibleForTesting;
3637

3738
@InterfaceAudience.Private
@@ -53,6 +54,8 @@ public class ClusterMetrics {
5354
private MutableRate aMContainerAllocationDelay;
5455
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
5556
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
57+
@Metric("Memory Capability") MutableGaugeLong capabilityMB;
58+
@Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
5659

5760
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
5861
"Metrics for the Yarn Cluster");
@@ -83,7 +86,7 @@ private static void registerMetrics() {
8386
}
8487

8588
@VisibleForTesting
86-
synchronized static void destroy() {
89+
public synchronized static void destroy() {
8790
isInitialized.set(false);
8891
INSTANCE = null;
8992
}
@@ -195,6 +198,28 @@ public void addAMRegisterDelay(long delay) {
195198
aMRegisterDelay.add(delay);
196199
}
197200

201+
public long getCapabilityMB() {
202+
return capabilityMB.value();
203+
}
204+
205+
public long getCapabilityVirtualCores() {
206+
return capabilityVirtualCores.value();
207+
}
208+
209+
public void incrCapability(Resource res) {
210+
if (res != null) {
211+
capabilityMB.incr(res.getMemorySize());
212+
capabilityVirtualCores.incr(res.getVirtualCores());
213+
}
214+
}
215+
216+
public void decrCapability(Resource res) {
217+
if (res != null) {
218+
capabilityMB.decr(res.getMemorySize());
219+
capabilityVirtualCores.decr(res.getVirtualCores());
220+
}
221+
}
222+
198223
public void addAMContainerAllocationDelay(long delay) {
199224
aMContainerAllocationDelay.add(delay);
200225
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

Lines changed: 94 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.hadoop.yarn.api.records.Resource;
5454
import org.apache.hadoop.yarn.conf.YarnConfiguration;
5555
import org.apache.hadoop.yarn.exceptions.YarnException;
56-
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
5756
import org.apache.hadoop.yarn.factories.RecordFactory;
5857
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
5958
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -110,6 +109,13 @@ public class ResourceTrackerService extends AbstractService implements
110109
private final WriteLock writeLock;
111110

112111
private long nextHeartBeatInterval;
112+
private boolean heartBeatIntervalScalingEnable;
113+
private long heartBeatIntervalMin;
114+
private long heartBeatIntervalMax;
115+
private float heartBeatIntervalSpeedupFactor;
116+
private float heartBeatIntervalSlowdownFactor;
117+
118+
113119
private Server server;
114120
private InetSocketAddress resourceTrackerAddress;
115121
private String minimumNodeManagerVersion;
@@ -152,14 +158,6 @@ protected void serviceInit(Configuration conf) throws Exception {
152158
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
153159

154160
RackResolver.init(conf);
155-
nextHeartBeatInterval =
156-
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
157-
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
158-
if (nextHeartBeatInterval <= 0) {
159-
throw new YarnRuntimeException("Invalid Configuration. "
160-
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
161-
+ " should be larger than 0.");
162-
}
163161

164162
minAllocMb = conf.getInt(
165163
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -180,7 +178,7 @@ protected void serviceInit(Configuration conf) throws Exception {
180178
isDelegatedCentralizedNodeLabelsConf =
181179
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
182180
}
183-
181+
updateHeartBeatConfiguration(conf);
184182
loadDynamicResourceConfiguration(conf);
185183
decommissioningWatcher.init(conf);
186184
super.serviceInit(conf);
@@ -225,6 +223,84 @@ public void updateDynamicResourceConfiguration(
225223
}
226224
}
227225

226+
/**
227+
* Update HearBeatConfiguration with new configuration.
228+
* @param conf Yarn Configuration
229+
*/
230+
public void updateHeartBeatConfiguration(Configuration conf) {
231+
this.writeLock.lock();
232+
try {
233+
nextHeartBeatInterval =
234+
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
235+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
236+
heartBeatIntervalScalingEnable =
237+
conf.getBoolean(
238+
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE,
239+
YarnConfiguration.
240+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE);
241+
heartBeatIntervalMin =
242+
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS,
243+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS);
244+
heartBeatIntervalMax =
245+
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS,
246+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS);
247+
heartBeatIntervalSpeedupFactor =
248+
conf.getFloat(
249+
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR,
250+
YarnConfiguration.
251+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR);
252+
heartBeatIntervalSlowdownFactor =
253+
conf.getFloat(
254+
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR,
255+
YarnConfiguration.
256+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR);
257+
258+
if (nextHeartBeatInterval <= 0) {
259+
LOG.warn("HeartBeat interval: " + nextHeartBeatInterval
260+
+ " must be greater than 0, using default.");
261+
nextHeartBeatInterval =
262+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
263+
}
264+
265+
if (heartBeatIntervalScalingEnable) {
266+
if (heartBeatIntervalMin <= 0
267+
|| heartBeatIntervalMin > heartBeatIntervalMax
268+
|| nextHeartBeatInterval < heartBeatIntervalMin
269+
|| nextHeartBeatInterval > heartBeatIntervalMax) {
270+
LOG.warn("Invalid NM Heartbeat Configuration. "
271+
+ "Required: 0 < minimum <= interval <= maximum. Got: 0 < "
272+
+ heartBeatIntervalMin + " <= "
273+
+ nextHeartBeatInterval + " <= "
274+
+ heartBeatIntervalMax
275+
+ " Setting min and max to configured interval.");
276+
heartBeatIntervalMin = nextHeartBeatInterval;
277+
heartBeatIntervalMax = nextHeartBeatInterval;
278+
}
279+
if (heartBeatIntervalSpeedupFactor < 0
280+
|| heartBeatIntervalSlowdownFactor < 0) {
281+
LOG.warn(
282+
"Heartbeat scaling factors must be >= 0 "
283+
+ " SpeedupFactor:" + heartBeatIntervalSpeedupFactor
284+
+ " SlowdownFactor:" + heartBeatIntervalSlowdownFactor
285+
+ ". Using Defaults");
286+
heartBeatIntervalSlowdownFactor =
287+
YarnConfiguration.
288+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR;
289+
heartBeatIntervalSpeedupFactor =
290+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR;
291+
}
292+
LOG.info("Heartbeat Scaling Configuration: "
293+
+ " defaultInterval:" + nextHeartBeatInterval
294+
+ " minimumInterval:" + heartBeatIntervalMin
295+
+ " maximumInterval:" + heartBeatIntervalMax
296+
+ " speedupFactor:" + heartBeatIntervalSpeedupFactor
297+
+ " slowdownFactor:" + heartBeatIntervalSlowdownFactor);
298+
}
299+
} finally {
300+
this.writeLock.unlock();
301+
}
302+
}
303+
228304
@Override
229305
protected void serviceStart() throws Exception {
230306
super.serviceStart();
@@ -588,10 +664,17 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
588664
}
589665

590666
// Heartbeat response
667+
long newInterval = nextHeartBeatInterval;
668+
if (heartBeatIntervalScalingEnable) {
669+
newInterval = rmNode.calculateHeartBeatInterval(
670+
nextHeartBeatInterval, heartBeatIntervalMin,
671+
heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
672+
heartBeatIntervalSlowdownFactor);
673+
}
591674
NodeHeartbeatResponse nodeHeartBeatResponse =
592675
YarnServerBuilderUtils.newNodeHeartbeatResponse(
593676
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
594-
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
677+
NodeAction.NORMAL, null, null, null, null, newInterval);
595678
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
596679

597680
populateKeys(request, nodeHeartBeatResponse);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,8 @@ public interface RMNode {
195195
* @return the RM context associated with this RM node.
196196
*/
197197
RMContext getRMContext();
198+
199+
long calculateHeartBeatInterval(long defaultInterval,
200+
long minInterval, long maxInterval, float speedupFactor,
201+
float slowdownFactor);
198202
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,48 @@ public void resetLastNodeHeartBeatResponse() {
690690
}
691691
}
692692

693+
@Override
694+
public long calculateHeartBeatInterval(long defaultInterval, long minInterval,
695+
long maxInterval, float speedupFactor, float slowdownFactor) {
696+
697+
long newInterval = defaultInterval;
698+
699+
ClusterMetrics metrics = ClusterMetrics.getMetrics();
700+
float clusterUtil = metrics.getUtilizedVirtualCores()
701+
/ Math.max(1.0f, metrics.getCapabilityVirtualCores());
702+
703+
if (this.nodeUtilization != null && this.getPhysicalResource() != null) {
704+
// getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on
705+
// a physicalResource returns number of physical cores. So,
706+
// nodeUtil will be CPU utilization of entire node.
707+
float nodeUtil = this.nodeUtilization.getCPU()
708+
/ Math.max(1.0f, this.getPhysicalResource().getVirtualCores());
709+
710+
// sanitize
711+
nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil));
712+
clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil));
713+
714+
if (nodeUtil > clusterUtil) {
715+
// Slow down - 20% more CPU utilization means slow down by 20% * factor
716+
newInterval = (long) (defaultInterval
717+
* (1.0f + (nodeUtil - clusterUtil) * slowdownFactor));
718+
} else {
719+
// Speed up - 20% less CPU utilization means speed up by 20% * factor
720+
newInterval = (long) (defaultInterval
721+
* (1.0f - (clusterUtil - nodeUtil) * speedupFactor));
722+
}
723+
newInterval =
724+
Math.min(maxInterval, Math.max(minInterval, newInterval));
725+
726+
if (LOG.isDebugEnabled()) {
727+
LOG.debug("Setting heartbeatinterval to: " + newInterval
728+
+ " node:" + this.nodeId + " nodeUtil: " + nodeUtil
729+
+ " clusterUtil: " + clusterUtil);
730+
}
731+
}
732+
return newInterval;
733+
}
734+
693735
public void handle(RMNodeEvent event) {
694736
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
695737
try {

0 commit comments

Comments
 (0)