Skip to content

Commit 42fab78

Browse files
committed
YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen
(cherry picked from commit bab5bf9) (cherry picked from commit f95c082) (cherry picked from commit d0104e7)
1 parent 34915b0 commit 42fab78

File tree

2 files changed

+137
-6
lines changed
  • hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src

2 files changed

+137
-6
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
178178
private long launchAMEndTime = 0;
179179
private long scheduledTime = 0;
180180
private long containerAllocatedTime = 0;
181+
private boolean nonWorkPreservingAMContainerFinished = false;
181182

182183
// Set to null initially. Will eventually get set
183184
// if an RMAppAttemptUnregistrationEvent occurs
@@ -854,7 +855,7 @@ public List<ContainerStatus> pullJustFinishedContainers() {
854855

855856
// A new allocate means the AM received the previously sent
856857
// finishedContainers. We can ack this to NM now
857-
sendFinishedContainersToNM();
858+
sendFinishedContainersToNM(finishedContainersSentToAM);
858859

859860
// Mark every containerStatus as being sent to AM though we may return
860861
// only the ones that belong to the current attempt
@@ -1986,12 +1987,13 @@ private void sendFinishedAMContainerToNM(NodeId nodeId,
19861987
}
19871988

19881989
// Ack NM to remove finished containers from context.
1989-
private void sendFinishedContainersToNM() {
1990-
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
1990+
private void sendFinishedContainersToNM(
1991+
Map<NodeId, List<ContainerStatus>> finishedContainers) {
1992+
for (NodeId nodeId : finishedContainers.keySet()) {
19911993

19921994
// Clear and get current values
19931995
List<ContainerStatus> currentSentContainers =
1994-
finishedContainersSentToAM.put(nodeId, new ArrayList<>());
1996+
finishedContainers.put(nodeId, new ArrayList<>());
19951997
List<ContainerId> containerIdList =
19961998
new ArrayList<>(currentSentContainers.size());
19971999
for (ContainerStatus containerStatus : currentSentContainers) {
@@ -2000,7 +2002,7 @@ private void sendFinishedContainersToNM() {
20002002
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
20012003
containerIdList));
20022004
}
2003-
this.finishedContainersSentToAM.clear();
2005+
finishedContainers.clear();
20042006
}
20052007

20062008
// Add am container to the list so that am container instance will be
@@ -2026,7 +2028,16 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt,
20262028
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
20272029
new ArrayList<>());
20282030
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
2029-
appAttempt.sendFinishedContainersToNM();
2031+
appAttempt.sendFinishedContainersToNM(
2032+
appAttempt.finishedContainersSentToAM);
2033+
// there might be some completed containers that have not been pulled
2034+
// by the AM heartbeat, explicitly add them for cleanup.
2035+
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
2036+
2037+
// mark the fact that AM container has finished so that future finished
2038+
// containers will be cleaned up without the engagement of AM containers
2039+
// (through heartbeat)
2040+
appAttempt.nonWorkPreservingAMContainerFinished = true;
20302041
} else {
20312042
appAttempt.sendFinishedAMContainerToNM(nodeId,
20322043
containerStatus.getContainerId());
@@ -2054,6 +2065,11 @@ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
20542065
.getNodeId(), new ArrayList<>());
20552066
appAttempt.justFinishedContainers.get(containerFinishedEvent
20562067
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
2068+
2069+
if (appAttempt.nonWorkPreservingAMContainerFinished) {
2070+
// AM container has finished, so no more AM heartbeats to do the cleanup.
2071+
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
2072+
}
20572073
}
20582074

20592075
private static final class ContainerFinishedAtFinalStateTransition

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,8 @@ private Container allocateApplicationAttempt() {
649649
RMContainer rmContainer = mock(RMContainerImpl.class);
650650
when(scheduler.getRMContainer(container.getId())).
651651
thenReturn(rmContainer);
652+
when(container.getNodeId()).thenReturn(
653+
BuilderUtils.newNodeId("localhost", 0));
652654

653655
applicationAttempt.handle(
654656
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
@@ -1536,6 +1538,119 @@ public void testFinishedContainer() {
15361538
.handle(Mockito.any(RMNodeEvent.class));
15371539
}
15381540

1541+
/**
1542+
* Check a completed container that is not yet pulled by AM heartbeat,
1543+
* is ACKed to NM for cleanup when the AM container exits.
1544+
*/
1545+
@Test
1546+
public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
1547+
Container amContainer = allocateApplicationAttempt();
1548+
launchApplicationAttempt(amContainer);
1549+
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
1550+
1551+
application.handle(new RMAppRunningOnNodeEvent(application
1552+
.getApplicationId(), amContainer.getNodeId()));
1553+
1554+
// Complete a non-AM container
1555+
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
1556+
.getAppAttemptId(), 2);
1557+
Container container1 = mock(Container.class);
1558+
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
1559+
when(container1.getId()).thenReturn(
1560+
containerId1);
1561+
when(containerStatus1.getContainerId()).thenReturn(containerId1);
1562+
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
1563+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1564+
applicationAttempt.getAppAttemptId(), containerStatus1,
1565+
container1.getNodeId()));
1566+
1567+
// Verify justFinishedContainers
1568+
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
1569+
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
1570+
Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
1571+
.size());
1572+
Assert.assertEquals(container1.getId(), applicationAttempt
1573+
.getJustFinishedContainers().get(0).getContainerId());
1574+
Assert.assertTrue(
1575+
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
1576+
1577+
// finish AM container to emulate AM exit event
1578+
containerStatus1 = mock(ContainerStatus.class);
1579+
ContainerId amContainerId = amContainer.getId();
1580+
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
1581+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1582+
applicationAttempt.getAppAttemptId(), containerStatus1,
1583+
amContainer.getNodeId()));
1584+
1585+
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
1586+
List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
1587+
captor.getAllValues();
1588+
// Verify AM container is acked to NM via the RMNodeEvent immediately
1589+
Assert.assertEquals(amContainer.getId(),
1590+
containerPulledEvents.get(0).getContainers().get(0));
1591+
// Verify the non-AM container is acked to NM via the RMNodeEvent
1592+
Assert.assertEquals(container1.getId(),
1593+
containerPulledEvents.get(1).getContainers().get(0));
1594+
Assert.assertTrue("No container shall be added to justFinishedContainers" +
1595+
" as soon as AM container exits",
1596+
applicationAttempt.getJustFinishedContainers().isEmpty());
1597+
Assert.assertTrue(
1598+
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
1599+
}
1600+
1601+
/**
1602+
* Check a completed container is ACKed to NM for cleanup after the AM
1603+
* container has exited.
1604+
*/
1605+
@Test
1606+
public void testFinishedContainerAfterAMExit() {
1607+
Container amContainer = allocateApplicationAttempt();
1608+
launchApplicationAttempt(amContainer);
1609+
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
1610+
1611+
// finish AM container to emulate AM exit event
1612+
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
1613+
ContainerId amContainerId = amContainer.getId();
1614+
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
1615+
application.handle(new RMAppRunningOnNodeEvent(application
1616+
.getApplicationId(),
1617+
amContainer.getNodeId()));
1618+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1619+
applicationAttempt.getAppAttemptId(), containerStatus1,
1620+
amContainer.getNodeId()));
1621+
1622+
// Verify AM container is acked to NM via the RMNodeEvent immediately
1623+
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
1624+
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
1625+
Mockito.verify(rmnodeEventHandler).handle(captor.capture());
1626+
Assert.assertEquals(amContainer.getId(),
1627+
captor.getValue().getContainers().get(0));
1628+
1629+
// Complete a non-AM container
1630+
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
1631+
.getAppAttemptId(), 2);
1632+
Container container1 = mock(Container.class);
1633+
containerStatus1 = mock(ContainerStatus.class);
1634+
when(container1.getId()).thenReturn(containerId1);
1635+
when(containerStatus1.getContainerId()).thenReturn(containerId1);
1636+
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
1637+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1638+
applicationAttempt.getAppAttemptId(), containerStatus1,
1639+
container1.getNodeId()));
1640+
1641+
// Verify container is acked to NM via the RMNodeEvent immediately
1642+
captor = ArgumentCaptor.forClass(
1643+
RMNodeFinishedContainersPulledByAMEvent.class);
1644+
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
1645+
Assert.assertEquals(container1.getId(),
1646+
captor.getAllValues().get(1).getContainers().get(0));
1647+
Assert.assertTrue("No container shall be added to justFinishedContainers" +
1648+
" after AM container exited",
1649+
applicationAttempt.getJustFinishedContainers().isEmpty());
1650+
Assert.assertTrue(
1651+
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
1652+
}
1653+
15391654
private static List<ContainerStatus> getFinishedContainersSentToAM(
15401655
RMAppAttempt applicationAttempt) {
15411656
List<ContainerStatus> containers = new ArrayList<ContainerStatus>();

0 commit comments

Comments
 (0)