Skip to content

Commit 3ae78e4

Browse files
author
Adam Antal
committed
YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan
(cherry picked from commit a1f7e76)
1 parent d80dfad commit 3ae78e4

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ public void addCompletedContainer(ContainerId containerId) {
703703
@VisibleForTesting
704704
@Private
705705
public void removeOrTrackCompletedContainersFromContext(
706-
List<ContainerId> containerIds) throws IOException {
706+
List<ContainerId> containerIds) {
707707
Set<ContainerId> removedContainers = new HashSet<ContainerId>();
708708

709709
pendingContainersToRemove.addAll(containerIds);
@@ -720,13 +720,13 @@ public void removeOrTrackCompletedContainersFromContext(
720720
removedContainers.add(containerId);
721721
iter.remove();
722722
}
723+
pendingCompletedContainers.remove(containerId);
723724
}
724725

725726
if (!removedContainers.isEmpty()) {
726727
LOG.info("Removed completed containers from NM context: "
727728
+ removedContainers);
728729
}
729-
pendingCompletedContainers.clear();
730730
}
731731

732732
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -1301,6 +1301,7 @@ private class StatusUpdaterRunnable implements Runnable {
13011301
@SuppressWarnings("unchecked")
13021302
public void run() {
13031303
int lastHeartbeatID = 0;
1304+
boolean missedHearbeat = false;
13041305
while (!isStopped) {
13051306
// Send heartbeat
13061307
try {
@@ -1354,6 +1355,20 @@ public void run() {
13541355
removeOrTrackCompletedContainersFromContext(response
13551356
.getContainersToBeRemovedFromNM());
13561357

1358+
// If the last heartbeat was missed, it is possible that the
1359+
// RM saw this one as a duplicate and did not process it.
1360+
// If so, we can fail to notify the RM of these completed containers
1361+
// on the next heartbeat if we clear pendingCompletedContainers.
1362+
// If it wasn't a duplicate, the only impact is we might notify
1363+
// the RM twice, which it can handle.
1364+
if (!missedHearbeat) {
1365+
pendingCompletedContainers.clear();
1366+
} else {
1367+
LOG.info("skipped clearing pending completed containers due to " +
1368+
"missed heartbeat");
1369+
missedHearbeat = false;
1370+
}
1371+
13571372
logAggregationReportForAppsTempList.clear();
13581373
lastHeartbeatID = response.getResponseId();
13591374
List<ContainerId> containersToCleanup = response
@@ -1433,6 +1448,7 @@ public void run() {
14331448
// TODO Better error handling. Thread can die with the rest of the
14341449
// NM still running.
14351450
LOG.error("Caught exception in status-updater", e);
1451+
missedHearbeat = true;
14361452
} finally {
14371453
synchronized (heartbeatMonitor) {
14381454
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -721,15 +721,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
721721
} else if (heartBeatID == 2 || heartBeatID == 3) {
722722
List<ContainerStatus> statuses =
723723
request.getNodeStatus().getContainersStatuses();
724-
if (heartBeatID == 2) {
725-
// NM should send completed containers again, since the last
726-
// heartbeat is lost.
727-
Assert.assertEquals(4, statuses.size());
728-
} else {
729-
// NM should not send completed containers again, since the last
730-
// heartbeat is successful.
731-
Assert.assertEquals(2, statuses.size());
732-
}
724+
// NM should send completed containers on heartbeat 2,
725+
// since heartbeat 1 was lost. It will send them again on
726+
// heartbeat 3, because it does not clear them if the previous
727+
// heartbeat was lost in case the RM treated it as a duplicate.
728+
Assert.assertEquals(4, statuses.size());
733729
Assert.assertEquals(4, context.getContainers().size());
734730

735731
boolean container2Exist = false, container3Exist = false,
@@ -760,14 +756,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
760756
container5Exist = true;
761757
}
762758
}
763-
if (heartBeatID == 2) {
764-
Assert.assertTrue(container2Exist && container3Exist
765-
&& container4Exist && container5Exist);
766-
} else {
767-
// NM do not send completed containers again
768-
Assert.assertTrue(container2Exist && !container3Exist
769-
&& container4Exist && !container5Exist);
770-
}
759+
Assert.assertTrue(container2Exist && container3Exist
760+
&& container4Exist && container5Exist);
771761

772762
if (heartBeatID == 3) {
773763
finishedContainersPulledByAM.add(containerStatus3.getContainerId());

0 commit comments

Comments
 (0)