Skip to content

Commit d0104e7

Browse files
committed
YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen
(cherry picked from commit bab5bf9) (cherry picked from commit f95c082)
1 parent dedf0d4 commit d0104e7

File tree

2 files changed

+137
-6
lines changed
  • hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src

2 files changed

+137
-6
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
179179
private long launchAMEndTime = 0;
180180
private long scheduledTime = 0;
181181
private long containerAllocatedTime = 0;
182+
private boolean nonWorkPreservingAMContainerFinished = false;
182183

183184
// Set to null initially. Will eventually get set
184185
// if an RMAppAttemptUnregistrationEvent occurs
@@ -855,7 +856,7 @@ public List<ContainerStatus> pullJustFinishedContainers() {
855856

856857
// A new allocate means the AM received the previously sent
857858
// finishedContainers. We can ack this to NM now
858-
sendFinishedContainersToNM();
859+
sendFinishedContainersToNM(finishedContainersSentToAM);
859860

860861
// Mark every containerStatus as being sent to AM though we may return
861862
// only the ones that belong to the current attempt
@@ -1987,12 +1988,13 @@ private void sendFinishedAMContainerToNM(NodeId nodeId,
19871988
}
19881989

19891990
// Ack NM to remove finished containers from context.
1990-
private void sendFinishedContainersToNM() {
1991-
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
1991+
private void sendFinishedContainersToNM(
1992+
Map<NodeId, List<ContainerStatus>> finishedContainers) {
1993+
for (NodeId nodeId : finishedContainers.keySet()) {
19921994

19931995
// Clear and get current values
19941996
List<ContainerStatus> currentSentContainers =
1995-
finishedContainersSentToAM.put(nodeId, new ArrayList<>());
1997+
finishedContainers.put(nodeId, new ArrayList<>());
19961998
List<ContainerId> containerIdList =
19971999
new ArrayList<>(currentSentContainers.size());
19982000
for (ContainerStatus containerStatus : currentSentContainers) {
@@ -2001,7 +2003,7 @@ private void sendFinishedContainersToNM() {
20012003
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
20022004
containerIdList));
20032005
}
2004-
this.finishedContainersSentToAM.clear();
2006+
finishedContainers.clear();
20052007
}
20062008

20072009
// Add am container to the list so that am container instance will be
@@ -2027,7 +2029,16 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt,
20272029
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
20282030
new ArrayList<>());
20292031
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
2030-
appAttempt.sendFinishedContainersToNM();
2032+
appAttempt.sendFinishedContainersToNM(
2033+
appAttempt.finishedContainersSentToAM);
2034+
// there might be some completed containers that have not been pulled
2035+
// by the AM heartbeat, explicitly add them for cleanup.
2036+
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
2037+
2038+
// mark the fact that AM container has finished so that future finished
2039+
// containers will be cleaned up without the engagement of AM containers
2040+
// (through heartbeat)
2041+
appAttempt.nonWorkPreservingAMContainerFinished = true;
20312042
} else {
20322043
appAttempt.sendFinishedAMContainerToNM(nodeId,
20332044
containerStatus.getContainerId());
@@ -2055,6 +2066,11 @@ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
20552066
.getNodeId(), new ArrayList<>());
20562067
appAttempt.justFinishedContainers.get(containerFinishedEvent
20572068
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
2069+
2070+
if (appAttempt.nonWorkPreservingAMContainerFinished) {
2071+
// AM container has finished, so no more AM heartbeats to do the cleanup.
2072+
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
2073+
}
20582074
}
20592075

20602076
private static final class ContainerFinishedAtFinalStateTransition

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,8 @@ private Container allocateApplicationAttempt() {
649649
RMContainer rmContainer = mock(RMContainerImpl.class);
650650
when(scheduler.getRMContainer(container.getId())).
651651
thenReturn(rmContainer);
652+
when(container.getNodeId()).thenReturn(
653+
BuilderUtils.newNodeId("localhost", 0));
652654

653655
applicationAttempt.handle(
654656
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
@@ -1536,6 +1538,119 @@ public void testFinishedContainer() {
15361538
.handle(Mockito.any(RMNodeEvent.class));
15371539
}
15381540

1541+
/**
1542+
* Check a completed container that is not yet pulled by AM heartbeat,
1543+
* is ACKed to NM for cleanup when the AM container exits.
1544+
*/
1545+
@Test
1546+
public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
1547+
Container amContainer = allocateApplicationAttempt();
1548+
launchApplicationAttempt(amContainer);
1549+
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
1550+
1551+
application.handle(new RMAppRunningOnNodeEvent(application
1552+
.getApplicationId(), amContainer.getNodeId()));
1553+
1554+
// Complete a non-AM container
1555+
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
1556+
.getAppAttemptId(), 2);
1557+
Container container1 = mock(Container.class);
1558+
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
1559+
when(container1.getId()).thenReturn(
1560+
containerId1);
1561+
when(containerStatus1.getContainerId()).thenReturn(containerId1);
1562+
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
1563+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1564+
applicationAttempt.getAppAttemptId(), containerStatus1,
1565+
container1.getNodeId()));
1566+
1567+
// Verify justFinishedContainers
1568+
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
1569+
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
1570+
Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
1571+
.size());
1572+
Assert.assertEquals(container1.getId(), applicationAttempt
1573+
.getJustFinishedContainers().get(0).getContainerId());
1574+
Assert.assertTrue(
1575+
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
1576+
1577+
// finish AM container to emulate AM exit event
1578+
containerStatus1 = mock(ContainerStatus.class);
1579+
ContainerId amContainerId = amContainer.getId();
1580+
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
1581+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1582+
applicationAttempt.getAppAttemptId(), containerStatus1,
1583+
amContainer.getNodeId()));
1584+
1585+
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
1586+
List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
1587+
captor.getAllValues();
1588+
// Verify AM container is acked to NM via the RMNodeEvent immediately
1589+
Assert.assertEquals(amContainer.getId(),
1590+
containerPulledEvents.get(0).getContainers().get(0));
1591+
// Verify the non-AM container is acked to NM via the RMNodeEvent
1592+
Assert.assertEquals(container1.getId(),
1593+
containerPulledEvents.get(1).getContainers().get(0));
1594+
Assert.assertTrue("No container shall be added to justFinishedContainers" +
1595+
" as soon as AM container exits",
1596+
applicationAttempt.getJustFinishedContainers().isEmpty());
1597+
Assert.assertTrue(
1598+
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
1599+
}
1600+
1601+
/**
1602+
* Check a completed container is ACKed to NM for cleanup after the AM
1603+
* container has exited.
1604+
*/
1605+
@Test
1606+
public void testFinishedContainerAfterAMExit() {
1607+
Container amContainer = allocateApplicationAttempt();
1608+
launchApplicationAttempt(amContainer);
1609+
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
1610+
1611+
// finish AM container to emulate AM exit event
1612+
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
1613+
ContainerId amContainerId = amContainer.getId();
1614+
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
1615+
application.handle(new RMAppRunningOnNodeEvent(application
1616+
.getApplicationId(),
1617+
amContainer.getNodeId()));
1618+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1619+
applicationAttempt.getAppAttemptId(), containerStatus1,
1620+
amContainer.getNodeId()));
1621+
1622+
// Verify AM container is acked to NM via the RMNodeEvent immediately
1623+
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
1624+
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
1625+
Mockito.verify(rmnodeEventHandler).handle(captor.capture());
1626+
Assert.assertEquals(amContainer.getId(),
1627+
captor.getValue().getContainers().get(0));
1628+
1629+
// Complete a non-AM container
1630+
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
1631+
.getAppAttemptId(), 2);
1632+
Container container1 = mock(Container.class);
1633+
containerStatus1 = mock(ContainerStatus.class);
1634+
when(container1.getId()).thenReturn(containerId1);
1635+
when(containerStatus1.getContainerId()).thenReturn(containerId1);
1636+
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
1637+
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
1638+
applicationAttempt.getAppAttemptId(), containerStatus1,
1639+
container1.getNodeId()));
1640+
1641+
// Verify container is acked to NM via the RMNodeEvent immediately
1642+
captor = ArgumentCaptor.forClass(
1643+
RMNodeFinishedContainersPulledByAMEvent.class);
1644+
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
1645+
Assert.assertEquals(container1.getId(),
1646+
captor.getAllValues().get(1).getContainers().get(0));
1647+
Assert.assertTrue("No container shall be added to justFinishedContainers" +
1648+
" after AM container exited",
1649+
applicationAttempt.getJustFinishedContainers().isEmpty());
1650+
Assert.assertTrue(
1651+
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
1652+
}
1653+
15391654
private static List<ContainerStatus> getFinishedContainersSentToAM(
15401655
RMAppAttempt applicationAttempt) {
15411656
List<ContainerStatus> containers = new ArrayList<ContainerStatus>();

0 commit comments

Comments
 (0)