Skip to content

Commit 4e3d318

Browse files
author
chaosju
committed
YARN-11110. An implementation for using CGroups to control the number of the process in container
1 parent d5cba5c commit 4e3d318

File tree

7 files changed

+333
-2
lines changed

7 files changed

+333
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,18 @@ public static boolean isAclEnabled(Configuration conf) {
20182018
public static final long DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES =
20192019
10000000000L;
20202020

2021+
/** Enable switch for container process number monitoring. */
2022+
public static final String NM_CONTAINER_PROCESS_MONITOR_ENABLED =
2023+
NM_PREFIX + "container-process-monitor.enable";
2024+
public static final boolean
2025+
DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED= false;
2026+
2027+
/** The max process number limit for a single container. */
2028+
public static final String NM_CONTAINER_PROCESS_MAX_LIMIT_NUM =
2029+
NM_PREFIX + "container-process-monitor.max-limit-num";
2030+
public static final int DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT =
2031+
10000;
2032+
20212033
/** Enable/disable container metrics. */
20222034
@Private
20232035
public static final String NM_CONTAINER_METRICS_ENABLE =

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,19 @@
18131813
<value>1000000000</value>
18141814
</property>
18151815

1816+
<property>
1817+
<description>Flag to enable the container process monitor which enforces
1818+
container process number limits.</description>
1819+
<name>yarn.nodemanager.container-process-monitor.enable</name>
1820+
<value>false</value>
1821+
</property>
1822+
1823+
<property>
1824+
<description>The max process number limit for a single container.</description>
1825+
<name>yarn.nodemanager.container-process-monitor.max-limit-num</name>
1826+
<value>10000</value>
1827+
</property>
1828+
18161829
<property>
18171830
<description>The disk space limit, in bytes, for all of a container's
18181831
logs</description>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ enum CGroupController {
4646
CPUACCT("cpuacct"),
4747
CPUSET("cpuset"),
4848
FREEZER("freezer"),
49-
DEVICES("devices");
49+
DEVICES("devices"),
50+
PIDS("pids");
5051

5152
private final String name;
5253

@@ -84,7 +85,7 @@ public static Set<String> getValidCGroups() {
8485
String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
8586
String CGROUP_NO_LIMIT = "-1";
8687
String UNDER_OOM = "under_oom 1";
87-
88+
String CGROUP_PIDS_MAX = "max";
8889

8990
String CGROUP_CPU_PERIOD_US = "cfs_period_us";
9091
String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.classification.InterfaceStability;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.yarn.api.records.ContainerId;
27+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
28+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
29+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
34+
/**
35+
* An implementation for using CGroups to control the number of the process in container.
36+
*
37+
* The process number controller is used to allow a cgroup hierarchy to stop any
38+
* new tasks from being fork()'d or clone()'d after a certain limit is reached.
39+
* @see <a href="https://www.kernel.org/doc/Documentation/cgroup-v1/pids.txt">PIDS</a>
40+
*/
41+
42+
@InterfaceStability.Unstable
43+
@InterfaceAudience.Private
44+
public class CGroupsPidsResourceHandlerImpl implements PidsResourceHandler {
45+
46+
static final Log LOG = LogFactory.getLog(CGroupsPidsResourceHandlerImpl.class);
47+
48+
private CGroupsHandler cGroupsHandler;
49+
private static final CGroupsHandler.CGroupController PIDS = CGroupsHandler.CGroupController.PIDS;
50+
private int processMaxCount;
51+
52+
CGroupsPidsResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
53+
this.cGroupsHandler = cGroupsHandler;
54+
}
55+
56+
@Override
57+
public List<PrivilegedOperation> bootstrap(Configuration conf)
58+
throws ResourceHandlerException {
59+
this.cGroupsHandler.initializeCGroupController(PIDS);
60+
processMaxCount =
61+
conf.getInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM,
62+
YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT);
63+
if (processMaxCount < 0){
64+
throw new ResourceHandlerException(
65+
"Illegal value '" + processMaxCount + "' "
66+
+ YarnConfiguration.
67+
NM_CONTAINER_PROCESS_MAX_LIMIT_NUM
68+
+ ". Value must be positive number.");
69+
}
70+
LOG.info("Maximum number of processes is " + processMaxCount);
71+
72+
return null;
73+
}
74+
75+
@Override
76+
public List<PrivilegedOperation> preStart(Container container)
77+
throws ResourceHandlerException {
78+
79+
String cgroupId = container.getContainerId().toString();
80+
cGroupsHandler.createCGroup(PIDS, cgroupId);
81+
try {
82+
cGroupsHandler.updateCGroupParam(PIDS, cgroupId,
83+
CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(processMaxCount));
84+
} catch (ResourceHandlerException re) {
85+
cGroupsHandler.deleteCGroup(PIDS, cgroupId);
86+
LOG.error("Could not update cgroup for container", re);
87+
throw re;
88+
}
89+
List<PrivilegedOperation> ret = new ArrayList<>();
90+
ret.add(new PrivilegedOperation(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
91+
PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
92+
.getPathForCGroupTasks(PIDS, cgroupId)));
93+
return ret;
94+
}
95+
96+
@Override
97+
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
98+
throws ResourceHandlerException {
99+
return null;
100+
}
101+
102+
@Override
103+
public List<PrivilegedOperation> updateContainer(Container container)
104+
throws ResourceHandlerException {
105+
return null;
106+
}
107+
108+
@Override
109+
public List<PrivilegedOperation> postComplete(ContainerId containerId)
110+
throws ResourceHandlerException {
111+
cGroupsHandler.deleteCGroup(PIDS, containerId.toString());
112+
return null;
113+
}
114+
115+
@Override
116+
public List<PrivilegedOperation> teardown()
117+
throws ResourceHandlerException {
118+
return null;
119+
}
120+
121+
public int getProcessMaxCount() {
122+
return processMaxCount;
123+
}
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
21+
22+
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.classification.InterfaceStability;
24+
25+
/**
26+
* Resource handler for pid resources.
27+
*/
28+
@InterfaceAudience.Private
29+
@InterfaceStability.Unstable
30+
public interface PidsResourceHandler extends ResourceHandler {
31+
32+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,15 @@ private static ResourceHandler getNumaResourceHandler(Configuration conf,
278278
return null;
279279
}
280280

281+
private static ResourceHandler
282+
getPidsResourceHandler(Configuration conf) throws ResourceHandlerException{
283+
if (conf.getBoolean(YarnConfiguration.NM_CONTAINER_PROCESS_MONITOR_ENABLED,
284+
YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED)){
285+
return new CGroupsPidsResourceHandlerImpl(getCGroupsHandler());
286+
}
287+
return null;
288+
}
289+
281290
private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
282291
ResourceHandler handler) {
283292
if (handler != null) {
@@ -299,6 +308,7 @@ private static void initializeConfiguredResourceHandlerChain(
299308
addHandlerIfNotNull(handlerList,
300309
initCGroupsCpuResourceHandler(conf));
301310
addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
311+
addHandlerIfNotNull(handlerList, getPidsResourceHandler(conf));
302312
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
303313
resourceHandlerChain = new ResourceHandlerChain(handlerList);
304314
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.yarn.api.records.ContainerId;
22+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
23+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
24+
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
25+
import org.junit.Assert;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
import java.io.IOException;
30+
import java.util.List;
31+
32+
import static org.mockito.ArgumentMatchers.any;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
35+
import static org.mockito.Mockito.verify;
36+
import static org.mockito.Mockito.times;
37+
38+
/**
39+
* Test class for CGroupsPidsResourceHandlerImpl.
40+
*
41+
*/
42+
public class TestPidsResourceHandlerImpl {
43+
44+
private CGroupsPidsResourceHandlerImpl pidsResourceHandler;
45+
private CGroupsHandler mockCGroupsHandler;
46+
47+
@Before
48+
public void setUp() throws IOException, ResourceHandlerException {
49+
mockCGroupsHandler = mock(CGroupsHandler.class);
50+
when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
51+
pidsResourceHandler =
52+
new CGroupsPidsResourceHandlerImpl(mockCGroupsHandler);
53+
}
54+
55+
@Test
56+
public void testBootstrap() throws Exception {
57+
Configuration conf = new YarnConfiguration();
58+
List<PrivilegedOperation> ret =
59+
pidsResourceHandler.bootstrap(conf);
60+
verify(mockCGroupsHandler, times(1))
61+
.initializeCGroupController(CGroupsHandler.CGroupController.PIDS);
62+
Assert.assertNull(ret);
63+
Assert.assertEquals("Default process number incorrect", 10000,
64+
pidsResourceHandler.getProcessMaxCount());
65+
}
66+
67+
@Test
68+
public void testProcessNumbers() throws Exception {
69+
Configuration conf = new YarnConfiguration();
70+
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, -1);
71+
try {
72+
pidsResourceHandler.bootstrap(conf);
73+
Assert.fail("Negative values for process number should not be allowed.");
74+
} catch (ResourceHandlerException re) {
75+
// do nothing
76+
}
77+
78+
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1000);
79+
pidsResourceHandler.bootstrap(conf);
80+
Assert.assertEquals("process number value incorrect", 1000,
81+
pidsResourceHandler.getProcessMaxCount());
82+
}
83+
84+
@Test
85+
public void testPreStart() throws Exception {
86+
Configuration conf = new Configuration();
87+
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1024);
88+
pidsResourceHandler.bootstrap(conf);
89+
String id = "container_01_01";
90+
String path = "test-path/" + id;
91+
ContainerId mockContainerId = mock(ContainerId.class);
92+
when(mockContainerId.toString()).thenReturn(id);
93+
Container mockContainer = mock(Container.class);
94+
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
95+
when(mockCGroupsHandler
96+
.getPathForCGroupTasks(CGroupsHandler.CGroupController.PIDS, id))
97+
.thenReturn(path);
98+
int maxProcess = 1024;
99+
List<PrivilegedOperation> ret =
100+
pidsResourceHandler.preStart(mockContainer);
101+
verify(mockCGroupsHandler, times(1))
102+
.createCGroup(CGroupsHandler.CGroupController.PIDS, id);
103+
verify(mockCGroupsHandler, times(1))
104+
.updateCGroupParam(CGroupsHandler.CGroupController.PIDS, id,
105+
CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(maxProcess));
106+
Assert.assertNotNull(ret);
107+
Assert.assertEquals(1, ret.size());
108+
PrivilegedOperation op = ret.get(0);
109+
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
110+
op.getOperationType());
111+
List<String> args = op.getArguments();
112+
Assert.assertEquals(1, args.size());
113+
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
114+
args.get(0));
115+
}
116+
117+
@Test
118+
public void testReacquireContainer() throws Exception {
119+
ContainerId containerIdMock = mock(ContainerId.class);
120+
Assert.assertNull(
121+
pidsResourceHandler.reacquireContainer(containerIdMock));
122+
}
123+
124+
@Test
125+
public void testPostComplete() throws Exception {
126+
String id = "container_01_01";
127+
ContainerId mockContainerId = mock(ContainerId.class);
128+
when(mockContainerId.toString()).thenReturn(id);
129+
Assert
130+
.assertNull(pidsResourceHandler.postComplete(mockContainerId));
131+
verify(mockCGroupsHandler, times(1))
132+
.deleteCGroup(CGroupsHandler.CGroupController.PIDS, id);
133+
}
134+
135+
@Test
136+
public void testTeardown() throws Exception {
137+
Assert.assertNull(pidsResourceHandler.teardown());
138+
}
139+
}

0 commit comments

Comments
 (0)