Skip to content

Commit 3101299

Browse files
author
Adam Antal
committed
YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan
(cherry picked from commit a1f7e76)
1 parent 105fd92 commit 3101299

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ public void addCompletedContainer(ContainerId containerId) {
645645
@VisibleForTesting
646646
@Private
647647
public void removeOrTrackCompletedContainersFromContext(
648-
List<ContainerId> containerIds) throws IOException {
648+
List<ContainerId> containerIds) {
649649
Set<ContainerId> removedContainers = new HashSet<ContainerId>();
650650

651651
pendingContainersToRemove.addAll(containerIds);
@@ -662,13 +662,13 @@ public void removeOrTrackCompletedContainersFromContext(
662662
removedContainers.add(containerId);
663663
iter.remove();
664664
}
665+
pendingCompletedContainers.remove(containerId);
665666
}
666667

667668
if (!removedContainers.isEmpty()) {
668669
LOG.info("Removed completed containers from NM context: "
669670
+ removedContainers);
670671
}
671-
pendingCompletedContainers.clear();
672672
}
673673

674674
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -1037,6 +1037,7 @@ private class StatusUpdaterRunnable implements Runnable {
10371037
@SuppressWarnings("unchecked")
10381038
public void run() {
10391039
int lastHeartbeatID = 0;
1040+
boolean missedHearbeat = false;
10401041
while (!isStopped) {
10411042
// Send heartbeat
10421043
try {
@@ -1083,6 +1084,20 @@ public void run() {
10831084
removeOrTrackCompletedContainersFromContext(response
10841085
.getContainersToBeRemovedFromNM());
10851086

1087+
// If the last heartbeat was missed, it is possible that the
1088+
// RM saw this one as a duplicate and did not process it.
1089+
// If so, we can fail to notify the RM of these completed containers
1090+
// on the next heartbeat if we clear pendingCompletedContainers.
1091+
// If it wasn't a duplicate, the only impact is we might notify
1092+
// the RM twice, which it can handle.
1093+
if (!missedHearbeat) {
1094+
pendingCompletedContainers.clear();
1095+
} else {
1096+
LOG.info("skipped clearing pending completed containers due to " +
1097+
"missed heartbeat");
1098+
missedHearbeat = false;
1099+
}
1100+
10861101
logAggregationReportForAppsTempList.clear();
10871102
lastHeartbeatID = response.getResponseId();
10881103
List<ContainerId> containersToCleanup = response
@@ -1158,6 +1173,7 @@ public void run() {
11581173
// TODO Better error handling. Thread can die with the rest of the
11591174
// NM still running.
11601175
LOG.error("Caught exception in status-updater", e);
1176+
missedHearbeat = true;
11611177
} finally {
11621178
synchronized (heartbeatMonitor) {
11631179
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -758,15 +758,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
758758
} else if (heartBeatID == 2 || heartBeatID == 3) {
759759
List<ContainerStatus> statuses =
760760
request.getNodeStatus().getContainersStatuses();
761-
if (heartBeatID == 2) {
762-
// NM should send completed containers again, since the last
763-
// heartbeat is lost.
764-
Assert.assertEquals(4, statuses.size());
765-
} else {
766-
// NM should not send completed containers again, since the last
767-
// heartbeat is successful.
768-
Assert.assertEquals(2, statuses.size());
769-
}
761+
// NM should send completed containers on heartbeat 2,
762+
// since heartbeat 1 was lost. It will send them again on
763+
// heartbeat 3, because it does not clear them if the previous
764+
// heartbeat was lost in case the RM treated it as a duplicate.
765+
Assert.assertEquals(4, statuses.size());
770766
Assert.assertEquals(4, context.getContainers().size());
771767

772768
boolean container2Exist = false, container3Exist = false,
@@ -797,14 +793,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
797793
container5Exist = true;
798794
}
799795
}
800-
if (heartBeatID == 2) {
801-
Assert.assertTrue(container2Exist && container3Exist
802-
&& container4Exist && container5Exist);
803-
} else {
804-
// NM do not send completed containers again
805-
Assert.assertTrue(container2Exist && !container3Exist
806-
&& container4Exist && !container5Exist);
807-
}
796+
Assert.assertTrue(container2Exist && container3Exist
797+
&& container4Exist && container5Exist);
808798

809799
if (heartBeatID == 3) {
810800
finishedContainersPulledByAM.add(containerStatus3.getContainerId());

0 commit comments

Comments
 (0)