Skip to content

Commit 0aefe28

Browse files
author
Yufei Gu
committed
YARN-9298. Implement FS placement rules using PlacementRule interface. Contributed by Wilfred Spiegelenburg.
1 parent 4b7313e commit 0aefe28

File tree

13 files changed

+1290
-15
lines changed

13 files changed

+1290
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.placement;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.classification.InterfaceStability;
24+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
25+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
26+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
27+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
import org.w3c.dom.Element;
31+
32+
import java.io.IOException;
33+
34+
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.assureRoot;
35+
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.isValidQueueName;
36+
37+
/**
38+
* Places apps in the specified default queue. If no default queue is
39+
* specified the app is placed in root.default queue.
40+
*/
41+
@InterfaceAudience.Private
42+
@InterfaceStability.Unstable
43+
public class DefaultPlacementRule extends FSPlacementRule {
44+
private static final Logger LOG =
45+
LoggerFactory.getLogger(DefaultPlacementRule.class);
46+
47+
@VisibleForTesting
48+
public String defaultQueueName;
49+
50+
/**
51+
* Set the rule config from the xml config.
52+
* @param conf An xml element from the {@link FairScheduler#conf}
53+
*/
54+
@Override
55+
public void setConfig(Element conf) {
56+
// Get the flag from the config (defaults to true if not set)
57+
createQueue = getCreateFlag(conf);
58+
// No config can be set when no policy is defined and we use defaults
59+
if (conf != null) {
60+
defaultQueueName = conf.getAttribute("queue");
61+
// A queue read from the config could be illegal check it: fall back to
62+
// the config default if it is the case
63+
// However we cannot clean the name as a nested name is allowed.
64+
if (!isValidQueueName(defaultQueueName)) {
65+
LOG.error("Default rule configured with an illegal queue name: '{}'",
66+
defaultQueueName);
67+
defaultQueueName = null;
68+
}
69+
}
70+
// The queue name does not have to be set and we really use "default"
71+
if (defaultQueueName == null || defaultQueueName.isEmpty()) {
72+
defaultQueueName = assureRoot(YarnConfiguration.DEFAULT_QUEUE_NAME);
73+
} else {
74+
defaultQueueName = assureRoot(defaultQueueName);
75+
}
76+
LOG.debug("Default rule instantiated with queue name: {}, " +
77+
"and create flag: {}", defaultQueueName, createQueue);
78+
}
79+
80+
/**
81+
* Set the rule config just setting the create flag.
82+
* @param create flag to allow queue creation for this rule
83+
*/
84+
@Override
85+
public void setConfig(Boolean create) {
86+
createQueue = create;
87+
// No config so fall back to the real default.
88+
defaultQueueName = assureRoot(YarnConfiguration.DEFAULT_QUEUE_NAME);
89+
LOG.debug("Default rule instantiated with default queue name: {}, " +
90+
"and create flag: {}", defaultQueueName, createQueue);
91+
}
92+
93+
@Override
94+
public boolean initialize(ResourceScheduler scheduler) throws IOException {
95+
super.initialize(scheduler);
96+
if (getParentRule() != null) {
97+
throw new IOException(
98+
"Parent rule must not be configured for Default rule.");
99+
}
100+
return true;
101+
}
102+
103+
@Override
104+
public ApplicationPlacementContext getPlacementForApp(
105+
ApplicationSubmissionContext asc, String user) {
106+
107+
// If we can create the queue in the rule or the queue exists return it
108+
if (createQueue || configuredQueue(defaultQueueName)) {
109+
return new ApplicationPlacementContext(defaultQueueName);
110+
}
111+
return null;
112+
}
113+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.placement;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.classification.InterfaceStability;
24+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
25+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
26+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
27+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
import org.w3c.dom.Element;
31+
32+
import java.io.IOException;
33+
34+
/**
35+
* Abstract base for all {@link FairScheduler} Placement Rules.
36+
*/
37+
@InterfaceAudience.Private
38+
@InterfaceStability.Unstable
39+
public abstract class FSPlacementRule extends PlacementRule {
40+
private static final Logger LOG =
41+
LoggerFactory.getLogger(FSPlacementRule.class);
42+
43+
// Flag to show if the rule can create a queue
44+
@VisibleForTesting
45+
protected boolean createQueue = true;
46+
private QueueManager queueManager;
47+
private PlacementRule parentRule;
48+
49+
/**
50+
* Get the {@link QueueManager} loaded from the scheduler when the rule is
51+
* initialised. All rules are initialised before the can be called to place
52+
* an application.
53+
* @return The queue manager from the scheduler, this can never be
54+
* <code>null</code> for an initialised rule.
55+
*/
56+
QueueManager getQueueManager() {
57+
return queueManager;
58+
}
59+
60+
/**
61+
* Set a rule to generate the parent queue dynamically.
62+
* @param parent A PlacementRule
63+
*/
64+
void setParentRule(PlacementRule parent) {
65+
this.parentRule = parent;
66+
}
67+
68+
/**
69+
* Get the rule that is set to generate the parent queue dynamically.
70+
* @return The rule set or <code>null</code> if not set.
71+
*/
72+
PlacementRule getParentRule() {
73+
return parentRule;
74+
}
75+
76+
/**
77+
* Set the config based on the type of object passed in.
78+
* @param initArg the config to be set
79+
*/
80+
@Override
81+
public void setConfig(Object initArg) {
82+
if (null == initArg) {
83+
LOG.debug("Null object passed in: no config set");
84+
return;
85+
}
86+
if (initArg instanceof Element) {
87+
LOG.debug("Setting config from XML");
88+
setConfig((Element) initArg);
89+
} else if (initArg instanceof Boolean) {
90+
LOG.debug("Setting config from Boolean");
91+
setConfig((Boolean) initArg);
92+
} else {
93+
LOG.info("Unknown object type passed in as config for rule {}: {}",
94+
getName(), initArg.getClass());
95+
}
96+
}
97+
98+
/**
99+
* Set the rule config from the xml config.
100+
* @param conf An xml element from the {@link FairScheduler#conf}
101+
*/
102+
protected void setConfig(Element conf) {
103+
// Get the flag from the config (defaults to true if not set)
104+
createQueue = getCreateFlag(conf);
105+
}
106+
107+
/**
108+
* Set the rule config just setting the create flag.
109+
* @param create flag to allow queue creation for this rule
110+
*/
111+
protected void setConfig(Boolean create) {
112+
createQueue = create;
113+
}
114+
115+
/**
116+
* Standard initialisation for {@link FairScheduler} rules, shared by all
117+
* rules. Each rule that extends this abstract and overrides this method must
118+
* call <code>super.initialize()</code> to run this basic initialisation.
119+
* @param scheduler the scheduler using the rule
120+
* @return <code>true</code> in all cases
121+
* @throws IOException for any errors
122+
*/
123+
@Override
124+
public boolean initialize(ResourceScheduler scheduler) throws IOException {
125+
if (!(scheduler instanceof FairScheduler)) {
126+
throw new IOException(getName() +
127+
" rule can only be configured for the FairScheduler");
128+
}
129+
if (getParentRule() != null &&
130+
getParentRule().getName().equals(getName())) {
131+
throw new IOException("Parent rule may not be the same type as the " +
132+
"child rule: " + getName());
133+
}
134+
135+
FairScheduler fs = (FairScheduler) scheduler;
136+
queueManager = fs.getQueueManager();
137+
138+
return true;
139+
}
140+
141+
/**
142+
* Check if the queue exists and is part of the configuration i.e. not
143+
* a {@link FSQueue#isDynamic()} queue.
144+
* @param queueName name of the queue to check
145+
* @return <code>true</code> if the queue exists and is a "configured" queue
146+
*/
147+
boolean configuredQueue(String queueName) {
148+
FSQueue queue = queueManager.getQueue(queueName);
149+
return (queue != null && !queue.isDynamic());
150+
}
151+
152+
/**
153+
* Get the create flag from the xml configuration element.
154+
* @param conf The FS configuration element for the queue
155+
* @return <code>false</code> only if the flag is set in the configuration to
156+
* a text that is not case ignored "true", <code>true</code> in all other
157+
* cases
158+
*/
159+
boolean getCreateFlag(Element conf) {
160+
if (conf != null) {
161+
String create = conf.getAttribute("create");
162+
return Boolean.parseBoolean(create);
163+
}
164+
return true;
165+
}
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.placement;
20+
21+
import org.apache.hadoop.classification.InterfaceAudience;
22+
import org.apache.hadoop.classification.InterfaceStability;
23+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerUtilities;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
/**
28+
* Utility methods used by Fair scheduler placement rules.
29+
* {@link
30+
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler}
31+
*/
32+
@InterfaceAudience.Private
33+
@InterfaceStability.Unstable
34+
public final class FairQueuePlacementUtils {
35+
private static final Logger LOG =
36+
LoggerFactory.getLogger(FairQueuePlacementUtils.class);
37+
38+
// Constants for name clean up and hierarchy checks
39+
protected static final String DOT = ".";
40+
protected static final String DOT_REPLACEMENT = "_dot_";
41+
protected static final String ROOT_QUEUE = "root";
42+
43+
private FairQueuePlacementUtils() {
44+
}
45+
46+
/**
47+
* Replace the periods in the username or group name with "_dot_" and
48+
* remove trailing and leading whitespace.
49+
*
50+
* @param name The name to clean
51+
* @return The name with {@link #DOT} replaced with {@link #DOT_REPLACEMENT}
52+
*/
53+
protected static String cleanName(String name) {
54+
name = FairSchedulerUtilities.trimQueueName(name);
55+
if (name.contains(DOT)) {
56+
String converted = name.replaceAll("\\.", DOT_REPLACEMENT);
57+
LOG.warn("Name {} is converted to {} when it is used as a queue name.",
58+
name, converted);
59+
return converted;
60+
} else {
61+
return name;
62+
}
63+
}
64+
65+
/**
66+
* Assure root prefix for a queue name.
67+
*
68+
* @param queueName The queue name to check for the root prefix
69+
* @return The root prefixed queue name
70+
*/
71+
protected static String assureRoot(String queueName) {
72+
if (queueName != null && !queueName.isEmpty()) {
73+
if (!queueName.startsWith(ROOT_QUEUE + DOT) &&
74+
!queueName.equals(ROOT_QUEUE)) {
75+
queueName = ROOT_QUEUE + DOT + queueName;
76+
}
77+
} else {
78+
LOG.warn("AssureRoot: queueName is empty or null.");
79+
}
80+
return queueName;
81+
}
82+
83+
/**
84+
* Validate the queue name: it may not start or end with a {@link #DOT}.
85+
*
86+
* @param queueName The queue name to validate
87+
* @return <code>false</code> if the queue name starts or ends with a
88+
* {@link #DOT}, <code>true</code>
89+
*/
90+
protected static boolean isValidQueueName(String queueName) {
91+
if (queueName != null) {
92+
if (queueName.equals(FairSchedulerUtilities.trimQueueName(queueName)) &&
93+
!queueName.startsWith(DOT) &&
94+
!queueName.endsWith(DOT)) {
95+
return true;
96+
}
97+
}
98+
return false;
99+
}
100+
}

0 commit comments

Comments
 (0)