Skip to content

Commit aa19cb2

Browse files
committed
YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R
(cherry picked from commit c30c23c) (cherry picked from commit 7a323a4) (cherry picked from commit a80595a6deb3124a3d6d99057e9d5298cd7237d8)
1 parent 0cd4459 commit aa19cb2

File tree

16 files changed

+1739
-235
lines changed

16 files changed

+1739
-235
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,19 @@ public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
298298
return lhs;
299299
}
300300

301+
/**
302+
* Subtract {@code rhs} from {@code lhs} and reset any negative values to
303+
* zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs}
304+
* unmodified.
305+
*
306+
* @param lhs {@link Resource} to subtract from
307+
* @param rhs {@link Resource} to subtract
308+
* @return the value of lhs after subtraction
309+
*/
310+
public static Resource subtractNonNegative(Resource lhs, Resource rhs) {
311+
return subtractFromNonNegative(clone(lhs), rhs);
312+
}
313+
301314
public static Resource negate(Resource resource) {
302315
return subtract(NONE, resource);
303316
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.hadoop.classification.InterfaceStability.Unstable;
3838
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
3939
import org.apache.hadoop.yarn.api.records.ApplicationId;
40-
import org.apache.hadoop.yarn.api.records.Container;
4140
import org.apache.hadoop.yarn.api.records.ExecutionType;
4241
import org.apache.hadoop.yarn.api.records.Resource;
4342
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -531,7 +530,7 @@ public boolean isPlaceBlacklisted(String resourceName,
531530

532531
public ContainerRequest allocate(NodeType type,
533532
SchedulerNode node, SchedulerRequestKey schedulerKey,
534-
Container containerAllocated) {
533+
RMContainer containerAllocated) {
535534
try {
536535
writeLock.lock();
537536

@@ -690,7 +689,7 @@ public boolean checkAllocation(NodeType type, SchedulerNode node,
690689
}
691690

692691
private void updateMetricsForAllocatedContainer(NodeType type,
693-
SchedulerNode node, Container containerAllocated) {
692+
SchedulerNode node, RMContainer containerAllocated) {
694693
QueueMetrics metrics = queue.getMetrics();
695694
if (pending) {
696695
// once an allocation is done we assume the application is
@@ -703,18 +702,21 @@ private void updateMetricsForAllocatedContainer(NodeType type,
703702
}
704703

705704
public static void updateMetrics(ApplicationId applicationId, NodeType type,
706-
SchedulerNode node, Container containerAllocated, String user,
705+
SchedulerNode node, RMContainer containerAllocated, String user,
707706
Queue queue) {
708707
if (LOG.isDebugEnabled()) {
709708
LOG.debug("allocate: applicationId=" + applicationId + " container="
710-
+ containerAllocated.getId() + " host=" + containerAllocated
711-
.getNodeId().toString() + " user=" + user + " resource="
712-
+ containerAllocated.getResource() + " type="
713-
+ type);
709+
+ containerAllocated.getContainer().getId() + " host="
710+
+ containerAllocated.getNodeId().toString() + " user="
711+
+ user + " resource="
712+
+ containerAllocated.getContainer().getResource() + " type=" + type);
714713
}
715714
if(node != null) {
716715
queue.getMetrics().allocateResources(node.getPartition(), user, 1,
717-
containerAllocated.getResource(), true);
716+
containerAllocated.getContainer().getResource(), false);
717+
queue.getMetrics().decrPendingResources(
718+
containerAllocated.getNodeLabelExpression(), user, 1,
719+
containerAllocated.getContainer().getResource());
718720
}
719721
queue.getMetrics().incrNodeTypeAggregations(user, type);
720722
}
@@ -768,4 +770,8 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey,
768770
this.readLock.unlock();
769771
}
770772
}
773+
774+
public RMContext getRMContext() {
775+
return this.rmContext;
776+
}
771777
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,17 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode,
161161
// Decrement the pending using a dummy RR with
162162
// resource = prev update req capability
163163
if (pendingAsk != null && pendingAsk.getCount() > 0) {
164+
Container container = Container.newInstance(UNDEFINED,
165+
schedulerNode.getNodeID(), "host:port",
166+
pendingAsk.getPerAllocationResource(),
167+
schedulerKey.getPriority(), null);
164168
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
165-
schedulerKey, Container.newInstance(UNDEFINED,
166-
schedulerNode.getNodeID(), "host:port",
167-
pendingAsk.getPerAllocationResource(),
168-
schedulerKey.getPriority(), null));
169+
schedulerKey,
170+
new RMContainerImpl(container, schedulerKey,
171+
appSchedulingInfo.getApplicationAttemptId(),
172+
schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
173+
appSchedulingInfo.getRMContext(),
174+
appPlacementAllocator.getPrimaryRequestedNodePartition()));
169175
}
170176
}
171177
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.metrics2.MetricsSystem;
23+
import org.apache.hadoop.metrics2.annotation.Metrics;
24+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
25+
26+
@Metrics(context = "yarn")
27+
public class PartitionQueueMetrics extends QueueMetrics {
28+
29+
private String partition;
30+
31+
protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
32+
Queue parent, boolean enableUserMetrics, Configuration conf,
33+
String partition) {
34+
super(ms, queueName, parent, enableUserMetrics, conf);
35+
this.partition = partition;
36+
if (getParentQueue() != null) {
37+
String newQueueName = (getParentQueue() instanceof CSQueue)
38+
? ((CSQueue) getParentQueue()).getQueuePath()
39+
: getParentQueue().getQueueName();
40+
String parentMetricName =
41+
partition + METRIC_NAME_DELIMITER + newQueueName;
42+
setParent(getQueueMetrics().get(parentMetricName));
43+
}
44+
}
45+
46+
/**
47+
* Partition * Queue * User Metrics
48+
*
49+
* Computes Metrics at Partition (Node Label) * Queue * User Level.
50+
*
51+
* Sample JMX O/P Structure:
52+
*
53+
* PartitionQueueMetrics (labelX)
54+
* QueueMetrics (A)
55+
* usermetrics
56+
* QueueMetrics (A1)
57+
* usermetrics
58+
* QueueMetrics (A2)
59+
* usermetrics
60+
* QueueMetrics (B)
61+
* usermetrics
62+
*
63+
* @return QueueMetrics
64+
*/
65+
@Override
66+
public synchronized QueueMetrics getUserMetrics(String userName) {
67+
if (users == null) {
68+
return null;
69+
}
70+
71+
String partitionJMXStr =
72+
(partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
73+
: partition;
74+
75+
QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
76+
if (metrics == null) {
77+
metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
78+
null, false, this.conf, this.partition);
79+
users.put(userName, metrics);
80+
metricsSystem.register(
81+
pSourceName(partitionJMXStr).append(qSourceName(queueName))
82+
.append(",user=").append(userName).toString(),
83+
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
84+
((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
85+
.tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
86+
}
87+
return metrics;
88+
}
89+
}

0 commit comments

Comments
 (0)