Skip to content

Commit c695cf0

Browse files
minni31HarshitGupta11
authored andcommitted
YARN-11034. Add enhanced headroom in AllocateResponse (apache#3766)
1 parent 18dfa7f commit c695cf0

File tree

8 files changed

+309
-2
lines changed

8 files changed

+309
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.yarn.api.records.AMCommand;
3333
import org.apache.hadoop.yarn.api.records.Container;
3434
import org.apache.hadoop.yarn.api.records.ContainerStatus;
35+
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
3536
import org.apache.hadoop.yarn.api.records.NMToken;
3637
import org.apache.hadoop.yarn.api.records.NodeReport;
3738
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -147,6 +148,23 @@ public static AllocateResponse newInstance(int responseId,
147148
.collectorInfo(collectorInfo).build();
148149
}
149150

151+
@Private
152+
@Unstable
153+
public static AllocateResponse newInstance(int responseId,
154+
List<ContainerStatus> completedContainers,
155+
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
156+
Resource availResources, AMCommand command, int numClusterNodes,
157+
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
158+
List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo,
159+
EnhancedHeadroom enhancedHeadroom) {
160+
AllocateResponse response =
161+
newInstance(responseId, completedContainers, allocatedContainers,
162+
updatedNodes, availResources, command, numClusterNodes, preempt,
163+
nmTokens, amRMToken, updatedContainers, collectorInfo);
164+
response.setEnhancedHeadroom(enhancedHeadroom);
165+
return response;
166+
}
167+
150168
/**
151169
* If the <code>ResourceManager</code> needs the
152170
* <code>ApplicationMaster</code> to take some action then it will send an
@@ -439,6 +457,14 @@ public static AllocateResponseBuilder newBuilder() {
439457
return new AllocateResponseBuilder();
440458
}
441459

460+
@Public
461+
@Unstable
462+
public abstract EnhancedHeadroom getEnhancedHeadroom();
463+
464+
@Private
465+
@Unstable
466+
public abstract void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom);
467+
442468
/**
443469
* Class to construct instances of {@link AllocateResponse} with specific
444470
* options.
@@ -666,6 +692,18 @@ public AllocateResponseBuilder containersFromPreviousAttempt(
666692
return this;
667693
}
668694

695+
@Public
696+
@Unstable
697+
public EnhancedHeadroom getEnhancedHeadroom() {
698+
return allocateResponse.getEnhancedHeadroom();
699+
}
700+
701+
@Private
702+
@Unstable
703+
public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom){
704+
allocateResponse.setEnhancedHeadroom(enhancedHeadroom);
705+
}
706+
669707
/**
670708
* Return generated {@link AllocateResponse} object.
671709
* @return {@link AllocateResponse}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.api.records;
20+
21+
import org.apache.hadoop.yarn.util.Records;
22+
23+
/**
24+
* Enhanced head room in AllocateResponse.
25+
* This provides a channel for RMs to return load information for AMRMProxy
26+
* decision making when rerouting resource requests.
27+
*
28+
* Contains total pending container count and active cores for a cluster.
29+
*/
30+
public abstract class EnhancedHeadroom {
31+
public static EnhancedHeadroom newInstance(int totalPendingCount,
32+
int totalActiveCores) {
33+
EnhancedHeadroom enhancedHeadroom =
34+
Records.newRecord(EnhancedHeadroom.class);
35+
enhancedHeadroom.setTotalPendingCount(totalPendingCount);
36+
enhancedHeadroom.setTotalActiveCores(totalActiveCores);
37+
return enhancedHeadroom;
38+
}
39+
40+
/**
41+
* Set total pending container count.
42+
* @param totalPendingCount the pending container count
43+
*/
44+
public abstract void setTotalPendingCount(int totalPendingCount);
45+
46+
/**
47+
* Get total pending container count.
48+
* @return the pending container count
49+
*/
50+
public abstract int getTotalPendingCount();
51+
52+
/**
53+
* Set total active cores for the cluster.
54+
* @param totalActiveCores the total active cores for the cluster
55+
*/
56+
public abstract void setTotalActiveCores(int totalActiveCores);
57+
58+
/**
59+
* Get total active cores for the cluster.
60+
* @return totalActiveCores the total active cores for the cluster
61+
*/
62+
public abstract int getTotalActiveCores();
63+
64+
@Override
65+
public String toString() {
66+
StringBuilder sb = new StringBuilder();
67+
sb.append("<pendingCount:").append(this.getTotalPendingCount());
68+
sb.append(", activeCores:").append(this.getTotalActiveCores());
69+
sb.append(">");
70+
return sb.toString();
71+
}
72+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ message UpdatedContainerProto {
106106
required ContainerProto container = 2;
107107
}
108108

109+
message EnhancedHeadroomProto {
110+
optional int32 total_pending_count = 1;
111+
optional int32 total_active_cores = 2;
112+
}
113+
109114
message AllocateResponseProto {
110115
optional AMCommandProto a_m_command = 1;
111116
optional int32 response_id = 2;
@@ -123,6 +128,7 @@ message AllocateResponseProto {
123128
repeated UpdatedContainerProto updated_containers = 16;
124129
repeated ContainerProto containers_from_previous_attempts = 17;
125130
repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18;
131+
optional EnhancedHeadroomProto enhanced_headroom = 19;
126132
}
127133

128134
enum SchedulerResourceTypes {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.yarn.api.records.AMCommand;
3232
import org.apache.hadoop.yarn.api.records.Container;
3333
import org.apache.hadoop.yarn.api.records.ContainerStatus;
34+
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
3435
import org.apache.hadoop.yarn.api.records.NMToken;
3536
import org.apache.hadoop.yarn.api.records.NodeReport;
3637
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -43,6 +44,7 @@
4344
import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
4445
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
4546
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
47+
import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl;
4648
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
4749
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
4850
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
@@ -89,6 +91,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
8991
private Token amrmToken = null;
9092
private Priority appPriority = null;
9193
private CollectorInfo collectorInfo = null;
94+
private EnhancedHeadroom enhancedHeadroom = null;
9295

9396
public AllocateResponsePBImpl() {
9497
builder = AllocateResponseProto.newBuilder();
@@ -190,6 +193,9 @@ private synchronized void mergeLocalToBuilder() {
190193
getContainerProtoIterable(this.containersFromPreviousAttempts);
191194
builder.addAllContainersFromPreviousAttempts(iterable);
192195
}
196+
if (this.enhancedHeadroom != null) {
197+
builder.setEnhancedHeadroom(convertToProtoFormat(this.enhancedHeadroom));
198+
}
193199
}
194200

195201
private synchronized void mergeLocalToProto() {
@@ -422,6 +428,28 @@ public synchronized void setAMRMToken(Token amRMToken) {
422428
this.amrmToken = amRMToken;
423429
}
424430

431+
@Override
432+
public synchronized EnhancedHeadroom getEnhancedHeadroom() {
433+
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
434+
if (enhancedHeadroom != null) {
435+
return enhancedHeadroom;
436+
}
437+
if (!p.hasEnhancedHeadroom()) {
438+
return null;
439+
}
440+
this.enhancedHeadroom = convertFromProtoFormat(p.getEnhancedHeadroom());
441+
return enhancedHeadroom;
442+
}
443+
444+
@Override
445+
public synchronized void setEnhancedHeadroom(
446+
EnhancedHeadroom enhancedHeadroom) {
447+
maybeInitBuilder();
448+
if (enhancedHeadroom == null) {
449+
builder.clearEnhancedHeadroom();
450+
}
451+
this.enhancedHeadroom = enhancedHeadroom;
452+
}
425453

426454
@Override
427455
public synchronized CollectorInfo getCollectorInfo() {
@@ -933,4 +961,14 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
933961
private PriorityProto convertToProtoFormat(Priority t) {
934962
return ((PriorityPBImpl)t).getProto();
935963
}
964+
965+
private EnhancedHeadroomPBImpl convertFromProtoFormat(
966+
YarnServiceProtos.EnhancedHeadroomProto p) {
967+
return new EnhancedHeadroomPBImpl(p);
968+
}
969+
970+
private YarnServiceProtos.EnhancedHeadroomProto convertToProtoFormat(
971+
EnhancedHeadroom t) {
972+
return ((EnhancedHeadroomPBImpl) t).getProto();
973+
}
936974
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.api.records.impl.pb;
20+
21+
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
22+
import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProto;
23+
import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProtoOrBuilder;
24+
25+
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
26+
27+
public class EnhancedHeadroomPBImpl extends EnhancedHeadroom {
28+
29+
private EnhancedHeadroomProto proto =
30+
EnhancedHeadroomProto.getDefaultInstance();
31+
private EnhancedHeadroomProto.Builder builder = null;
32+
private boolean viaProto = false;
33+
34+
public EnhancedHeadroomPBImpl() {
35+
builder = EnhancedHeadroomProto.newBuilder();
36+
}
37+
38+
public EnhancedHeadroomPBImpl(EnhancedHeadroomProto proto) {
39+
this.proto = proto;
40+
viaProto = true;
41+
}
42+
43+
public EnhancedHeadroomProto getProto() {
44+
mergeLocalToProto();
45+
proto = viaProto ? proto : builder.build();
46+
viaProto = true;
47+
return proto;
48+
}
49+
50+
@Override
51+
public int hashCode() {
52+
return getProto().hashCode();
53+
}
54+
55+
@Override
56+
public boolean equals(Object other) {
57+
if (other == null) {
58+
return false;
59+
}
60+
if (other.getClass().isAssignableFrom(this.getClass())) {
61+
return this.getProto().equals(this.getClass().cast(other).getProto());
62+
}
63+
return false;
64+
}
65+
66+
@Override
67+
public String toString() {
68+
return TextFormat.shortDebugString(getProto());
69+
}
70+
71+
private void mergeLocalToBuilder() {
72+
// No local content yet
73+
}
74+
75+
private void mergeLocalToProto() {
76+
if (viaProto) {
77+
maybeInitBuilder();
78+
}
79+
mergeLocalToBuilder();
80+
proto = builder.build();
81+
viaProto = true;
82+
}
83+
84+
private void maybeInitBuilder() {
85+
if (viaProto || builder == null) {
86+
builder = EnhancedHeadroomProto.newBuilder(proto);
87+
}
88+
viaProto = false;
89+
}
90+
91+
@Override
92+
public void setTotalPendingCount(int totalPendingCount) {
93+
maybeInitBuilder();
94+
if (totalPendingCount == 0) {
95+
builder.clearTotalPendingCount();
96+
return;
97+
}
98+
builder.setTotalPendingCount(totalPendingCount);
99+
}
100+
101+
@Override
102+
public int getTotalPendingCount() {
103+
EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder;
104+
return (p.hasTotalPendingCount()) ? p.getTotalPendingCount() : 0;
105+
}
106+
107+
@Override
108+
public void setTotalActiveCores(int totalActiveCores) {
109+
maybeInitBuilder();
110+
if (totalActiveCores == 0) {
111+
builder.clearTotalActiveCores();
112+
return;
113+
}
114+
builder.setTotalActiveCores(totalActiveCores);
115+
}
116+
117+
@Override
118+
public int getTotalActiveCores() {
119+
EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder;
120+
return (p.hasTotalActiveCores()) ? p.getTotalActiveCores() : 0;
121+
}
122+
123+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
import org.apache.hadoop.yarn.api.records.ContainerReport;
128128
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
129129
import org.apache.hadoop.yarn.api.records.ContainerStatus;
130+
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
130131
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
131132
import org.apache.hadoop.yarn.api.records.LocalResource;
132133
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -184,6 +185,7 @@
184185
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl;
185186
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl;
186187
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
188+
import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl;
187189
import org.apache.hadoop.yarn.api.records.impl.pb.ExecutionTypeRequestPBImpl;
188190
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
189191
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@@ -430,6 +432,7 @@ public static void setup() throws Exception {
430432
generateByNewInstance(UpdatedContainer.class);
431433
generateByNewInstance(ContainerUpdateRequest.class);
432434
generateByNewInstance(ContainerUpdateResponse.class);
435+
generateByNewInstance(EnhancedHeadroom.class);
433436
// genByNewInstance does not apply to QueueInfo, cause
434437
// it is recursive(has sub queues)
435438
typeValueCache.put(QueueInfo.class, QueueInfo.
@@ -1331,4 +1334,10 @@ public void testGetNodesToAttributesResponsePBImpl() throws Exception {
13311334
validatePBImplRecord(GetNodesToAttributesResponsePBImpl.class,
13321335
YarnServiceProtos.GetNodesToAttributesResponseProto.class);
13331336
}
1337+
1338+
@Test
1339+
public void testGetEnhancedHeadroomPBImpl() throws Exception {
1340+
validatePBImplRecord(EnhancedHeadroomPBImpl.class,
1341+
YarnServiceProtos.EnhancedHeadroomProto.class);
1342+
}
13341343
}

0 commit comments

Comments
 (0)