Skip to content

Commit cf1b371

Browse files
YARN-10965. Centralize queue resource calculation based on CapacityVectors. Contributed by Andras Gyori
1 parent 815cde9 commit cf1b371

32 files changed

+2948
-230
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,28 @@ public static Resource createResourceWithSameValue(long value) {
818818
return res;
819819
}
820820

821+
public static Resource multiplyFloor(Resource resource, double multiplier) {
822+
Resource newResource = Resource.newInstance(0, 0);
823+
824+
for (ResourceInformation resourceInformation : resource.getResources()) {
825+
newResource.setResourceValue(resourceInformation.getName(),
826+
(long) Math.floor(resourceInformation.getValue() * multiplier));
827+
}
828+
829+
return newResource;
830+
}
831+
832+
public static Resource multiplyRound(Resource resource, double multiplier) {
833+
Resource newResource = Resource.newInstance(0, 0);
834+
835+
for (ResourceInformation resourceInformation : resource.getResources()) {
836+
newResource.setResourceValue(resourceInformation.getName(),
837+
Math.round(resourceInformation.getValue() * multiplier));
838+
}
839+
840+
return newResource;
841+
}
842+
821843
@InterfaceAudience.Private
822844
@InterfaceStability.Unstable
823845
public static Resource createResourceFromString(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
20+
21+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
22+
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
23+
24+
import java.util.Map;
25+
26+
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
27+
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType.BRANCH_DOWNSCALED;
28+
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ResourceCalculationDriver.MB_UNIT;
29+
30+
public class AbsoluteResourceCapacityCalculator extends AbstractQueueCapacityCalculator {
31+
32+
@Override
33+
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
34+
setNormalizedResourceRatio(resourceCalculationDriver);
35+
}
36+
37+
@Override
38+
public double calculateMinimumResource(
39+
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
40+
String label) {
41+
String resourceName = context.getResourceName();
42+
double normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
43+
label, ResourceVector.of(1)).getValue(resourceName);
44+
double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
45+
label, resourceName);
46+
47+
return normalizedRatio * remainingResourceRatio * context.getCurrentMinimumCapacityEntry(
48+
label).getResourceValue();
49+
}
50+
51+
@Override
52+
public double calculateMaximumResource(
53+
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
54+
String label) {
55+
return context.getCurrentMaximumCapacityEntry(label).getResourceValue();
56+
}
57+
58+
@Override
59+
public void updateCapacitiesAfterCalculation(
60+
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
61+
CapacitySchedulerQueueCapacityHandler.setQueueCapacities(
62+
resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label), queue, label);
63+
}
64+
65+
@Override
66+
public ResourceUnitCapacityType getCapacityType() {
67+
return ResourceUnitCapacityType.ABSOLUTE;
68+
}
69+
70+
/**
71+
* Calculates the normalized resource ratio of a parent queue, under which children are defined
72+
* with absolute capacity type. If the effective resource of the parent is less, than the
73+
* aggregated configured absolute resource of its children, the resource ratio will be less,
74+
* than 1.
75+
*
76+
* @param calculationDriver the driver, which contains the parent queue that will form the base
77+
* of the normalization calculation
78+
*/
79+
public static void setNormalizedResourceRatio(ResourceCalculationDriver calculationDriver) {
80+
CSQueue queue = calculationDriver.getQueue();
81+
82+
for (String label : queue.getConfiguredNodeLabels()) {
83+
// ManagedParents assign zero capacity to queues in case of overutilization, downscaling is
84+
// turned off for their children
85+
if (queue instanceof ManagedParentQueue) {
86+
return;
87+
}
88+
89+
for (String resourceName : queue.getConfiguredCapacityVector(label).getResourceNames()) {
90+
long childrenConfiguredResource = 0;
91+
long effectiveMinResource = queue.getQueueResourceQuotas().getEffectiveMinResource(
92+
label).getResourceValue(resourceName);
93+
94+
// Total configured min resources of direct children of the queue
95+
for (CSQueue childQueue : queue.getChildQueues()) {
96+
if (!childQueue.getConfiguredNodeLabels().contains(label)) {
97+
continue;
98+
}
99+
QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label);
100+
if (capacityVector.isResourceOfType(resourceName, ResourceUnitCapacityType.ABSOLUTE)) {
101+
childrenConfiguredResource += capacityVector.getResource(resourceName)
102+
.getResourceValue();
103+
}
104+
}
105+
// If no children is using ABSOLUTE capacity type, normalization is not needed
106+
if (childrenConfiguredResource == 0) {
107+
continue;
108+
}
109+
// Factor to scale down effective resource: When cluster has sufficient
110+
// resources, effective_min_resources will be same as configured
111+
// min_resources.
112+
float numeratorForMinRatio = childrenConfiguredResource;
113+
if (effectiveMinResource < childrenConfiguredResource) {
114+
numeratorForMinRatio = queue.getQueueResourceQuotas().getEffectiveMinResource(label)
115+
.getResourceValue(resourceName);
116+
calculationDriver.getUpdateContext().addUpdateWarning(BRANCH_DOWNSCALED.ofQueue(
117+
queue.getQueuePath()));
118+
}
119+
120+
String unit = resourceName.equals(MEMORY_URI) ? MB_UNIT : "";
121+
long convertedValue = UnitsConversionUtil.convert(unit, calculationDriver.getUpdateContext()
122+
.getUpdatedClusterResource(label).getResourceInformation(resourceName).getUnits(),
123+
childrenConfiguredResource);
124+
125+
if (convertedValue != 0) {
126+
Map<String, ResourceVector> normalizedResourceRatios =
127+
calculationDriver.getNormalizedResourceRatios();
128+
normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance());
129+
normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio /
130+
convertedValue);
131+
}
132+
}
133+
}
134+
}
135+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public enum CapacityConfigType {
115115
CapacityConfigType.NONE;
116116

117117
protected Map<String, QueueCapacityVector> configuredCapacityVectors;
118+
protected Map<String, QueueCapacityVector> configuredMaxCapacityVectors;
118119

119120
private final RecordFactory recordFactory =
120121
RecordFactoryProvider.getRecordFactory(null);
@@ -379,7 +380,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws
379380
this.configuredCapacityVectors = configuration
380381
.parseConfiguredResourceVector(queuePath.getFullPath(),
381382
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
382-
383+
this.configuredMaxCapacityVectors = configuration
384+
.parseConfiguredMaximumCapacityVector(queuePath.getFullPath(),
385+
this.queueNodeLabelsSettings.getConfiguredNodeLabels(),
386+
QueueCapacityVector.newInstance());
383387
// Update metrics
384388
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
385389
this, labelManager, null);
@@ -533,7 +537,8 @@ private void validateMinResourceIsNotGreaterThanMaxResource(Resource minResource
533537
private void validateAbsoluteVsPercentageCapacityConfig(
534538
CapacityConfigType localType) {
535539
if (!queuePath.isRoot()
536-
&& !this.capacityConfigType.equals(localType)) {
540+
&& !this.capacityConfigType.equals(localType) &&
541+
queueContext.getConfiguration().isLegacyQueueMode()) {
537542
throw new IllegalArgumentException("Queue '" + getQueuePath()
538543
+ "' should use either percentage based capacity"
539544
+ " configuration or absolute resource.");
@@ -572,11 +577,25 @@ public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
572577
}
573578

574579
@Override
575-
public QueueCapacityVector getConfiguredCapacityVector(
576-
String label) {
580+
public QueueCapacityVector getConfiguredCapacityVector(String label) {
577581
return configuredCapacityVectors.get(label);
578582
}
579583

584+
@Override
585+
public QueueCapacityVector getConfiguredMaxCapacityVector(String label) {
586+
return configuredMaxCapacityVectors.get(label);
587+
}
588+
589+
@Override
590+
public void setConfiguredMinCapacityVector(String label, QueueCapacityVector minCapacityVector) {
591+
configuredCapacityVectors.put(label, minCapacityVector);
592+
}
593+
594+
@Override
595+
public void setConfiguredMaxCapacityVector(String label, QueueCapacityVector maxCapacityVector) {
596+
configuredMaxCapacityVectors.put(label, maxCapacityVector);
597+
}
598+
580599
protected QueueInfo getQueueInfo() {
581600
// Deliberately doesn't use lock here, because this method will be invoked
582601
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
@@ -691,6 +710,11 @@ public ReentrantReadWriteLock.ReadLock getReadLock() {
691710
return readLock;
692711
}
693712

713+
@Override
714+
public ReentrantReadWriteLock.WriteLock getWriteLock() {
715+
return writeLock;
716+
}
717+
694718
private Resource getCurrentLimitResource(String nodePartition,
695719
Resource clusterResource, ResourceLimits currentResourceLimits,
696720
SchedulingMode schedulingMode) {
@@ -827,6 +851,11 @@ boolean canAssignToThisQueue(Resource clusterResource,
827851

828852
}
829853

854+
@Override
855+
public Set<String> getConfiguredNodeLabels() {
856+
return queueNodeLabelsSettings.getConfiguredNodeLabels();
857+
}
858+
830859
private static String ensurePartition(String partition) {
831860
return Optional.ofNullable(partition).orElse(NO_LABEL);
832861
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@
8888

8989
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedLeafQueue;
9090

91+
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
92+
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.PERCENTAGE;
93+
9194
public class AbstractLeafQueue extends AbstractCSQueue {
9295
private static final Logger LOG =
9396
LoggerFactory.getLogger(AbstractLeafQueue.class);
@@ -164,7 +167,7 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
164167
resourceCalculator);
165168

166169
// One time initialization is enough since it is static ordering policy
167-
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
170+
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>();
168171
}
169172

170173
@SuppressWarnings("checkstyle:nowhitespaceafter")
@@ -1936,6 +1939,49 @@ private void updateCurrentResourceLimits(
19361939
currentResourceLimits.getLimit()));
19371940
}
19381941

1942+
@Override
1943+
public void refreshAfterResourceCalculation(Resource clusterResource,
1944+
ResourceLimits resourceLimits) {
1945+
lastClusterResource = clusterResource;
1946+
// Update maximum applications for the queue and for users
1947+
updateMaximumApplications();
1948+
1949+
updateCurrentResourceLimits(resourceLimits, clusterResource);
1950+
1951+
// Update headroom info based on new cluster resource value
1952+
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
1953+
// during allocation
1954+
setQueueResourceLimitsInfo(clusterResource);
1955+
1956+
// Update user consumedRatios
1957+
recalculateQueueUsageRatio(clusterResource, null);
1958+
1959+
// Update metrics
1960+
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
1961+
this, labelManager, null);
1962+
// Update configured capacity/max-capacity for default partition only
1963+
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
1964+
labelManager.getResourceByLabel(null, clusterResource),
1965+
NO_LABEL, this);
1966+
1967+
// queue metrics are updated, more resource may be available
1968+
// activate the pending applications if possible
1969+
activateApplications();
1970+
1971+
// In case of any resource change, invalidate recalculateULCount to clear
1972+
// the computed user-limit.
1973+
usersManager.userLimitNeedsRecompute();
1974+
1975+
// Update application properties
1976+
for (FiCaSchedulerApp application : orderingPolicy
1977+
.getSchedulableEntities()) {
1978+
computeUserLimitAndSetHeadroom(application, clusterResource,
1979+
NO_LABEL,
1980+
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
1981+
1982+
}
1983+
}
1984+
19391985
@Override
19401986
public void updateClusterResource(Resource clusterResource,
19411987
ResourceLimits currentResourceLimits) {
@@ -2225,10 +2271,12 @@ public Map<String, TreeSet<RMContainer>> getIgnoreExclusivityRMContainers() {
22252271
}
22262272

22272273
public void setCapacity(float capacity) {
2274+
configuredCapacityVectors.put(NO_LABEL, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
22282275
queueCapacities.setCapacity(capacity);
22292276
}
22302277

22312278
public void setCapacity(String nodeLabel, float capacity) {
2279+
configuredCapacityVectors.put(nodeLabel, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
22322280
queueCapacities.setCapacity(nodeLabel, capacity);
22332281
}
22342282

0 commit comments

Comments
 (0)