Skip to content

Commit aa1a5dd

Browse files
authored
YARN-10829. Support getApplications API in FederationClientInterceptor (#3135)
YARN-10829. Support getApplications API in FederationClientInterceptor (#3135)
1 parent 3a52bfc commit aa1a5dd

File tree

7 files changed

+438
-5
lines changed

7 files changed

+438
-5
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsResponse.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hadoop.yarn.api.protocolrecords;
2020

21+
import java.util.ArrayList;
22+
import java.util.Collection;
2123
import java.util.List;
2224

2325
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,6 +54,16 @@ public static GetApplicationsResponse newInstance(
5254
return response;
5355
}
5456

57+
@Private
58+
@Unstable
59+
public static GetApplicationsResponse newInstance(
60+
Collection<ApplicationReport> applications) {
61+
GetApplicationsResponse response =
62+
Records.newRecord(GetApplicationsResponse.class);
63+
response.setApplicationList(new ArrayList<>(applications));
64+
return response;
65+
}
66+
5567
/**
5668
* Get <code>ApplicationReport</code> for applications.
5769
* @return <code>ApplicationReport</code> for applications

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3991,6 +3991,15 @@ public static boolean isAclEnabled(Configuration conf) {
39913991
ROUTER_PREFIX + "submit.retry";
39923992
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
39933993

3994+
/**
3995+
* The interceptor class used in FederationClientInterceptor should return
3996+
* partial ApplicationReports.
3997+
*/
3998+
public static final String ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
3999+
ROUTER_PREFIX + "partial-result.enabled";
4000+
public static final boolean DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
4001+
false;
4002+
39944003
public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp.";
39954004

39964005
public static final String ROUTER_USER_CLIENT_THREADS_SIZE =

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ public void initializeMemberVariables() {
184184

185185
configurationPrefixToSkipCompare
186186
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
187+
configurationPrefixToSkipCompare
188+
.add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
187189
configurationPrefixToSkipCompare
188190
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
189191
configurationPrefixToSkipCompare

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.lang.reflect.Method;
2424
import java.util.ArrayList;
25+
import java.util.Collection;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Random;
@@ -161,6 +162,7 @@ public class FederationClientInterceptor
161162
private RouterMetrics routerMetrics;
162163
private ThreadPoolExecutor executorService;
163164
private final Clock clock = new MonotonicClock();
165+
private boolean returnPartialReport;
164166

165167
@Override
166168
public void init(String userName) {
@@ -196,6 +198,10 @@ public void init(String userName) {
196198
clientRMProxies =
197199
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
198200
routerMetrics = RouterMetrics.getMetrics();
201+
202+
returnPartialReport = conf.getBoolean(
203+
YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
204+
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
199205
}
200206

201207
@Override
@@ -599,10 +605,44 @@ public GetApplicationReportResponse getApplicationReport(
599605
return response;
600606
}
601607

608+
/**
609+
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
610+
* after that it will group all the ApplicationReports by the ApplicationId.
611+
*
612+
* Possible failure:
613+
*
614+
* Client: identical behavior as {@code ClientRMService}.
615+
*
616+
* Router: the Client will timeout and resubmit the request.
617+
*
618+
* ResourceManager: the Router calls each Yarn RM in parallel. In case a
619+
* Yarn RM fails, a single call will timeout. However the Router will
620+
* merge the ApplicationReports it got, and provides a partial list to
621+
* the client.
622+
*
623+
* State Store: the Router will timeout and it will retry depending on the
624+
* FederationFacade settings - if the failure happened before the select
625+
* operation.
626+
*/
602627
@Override
603628
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
604629
throws YarnException, IOException {
605-
throw new NotImplementedException("Code is not implemented");
630+
if (request == null) {
631+
RouterServerUtil.logAndThrowException(
632+
"Missing getApplications request.",
633+
null);
634+
}
635+
Map<SubClusterId, SubClusterInfo> subclusters =
636+
federationFacade.getSubClusters(true);
637+
ClientMethod remoteMethod = new ClientMethod("getApplications",
638+
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
639+
Map<SubClusterId, GetApplicationsResponse> applications =
640+
invokeConcurrent(subclusters.keySet(), remoteMethod,
641+
GetApplicationsResponse.class);
642+
643+
// Merge the Application Reports
644+
return RouterYarnClientUtils.mergeApplications(applications.values(),
645+
returnPartialReport);
606646
}
607647

608648
@Override
@@ -676,6 +716,12 @@ public Object call() throws Exception {
676716
return results;
677717
}
678718

719+
<R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
720+
ClientMethod request, Class<R> clazz) throws YarnException, IOException {
721+
ArrayList<SubClusterId> clusterIdList = new ArrayList<>(clusterIds);
722+
return invokeConcurrent(clusterIdList, request, clazz);
723+
}
724+
679725
@Override
680726
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
681727
throws YarnException, IOException {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,25 @@
1818
package org.apache.hadoop.yarn.server.router.clientrm;
1919

2020
import java.util.Collection;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
2125
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
26+
import org.apache.hadoop.yarn.api.records.ApplicationId;
27+
import org.apache.hadoop.yarn.api.records.ApplicationReport;
28+
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
2229
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
30+
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
31+
import org.apache.hadoop.yarn.util.resource.Resources;
2332

2433
/**
2534
* Util class for Router Yarn client API calls.
2635
*/
2736
public final class RouterYarnClientUtils {
2837

38+
private final static String PARTIAL_REPORT = "Partial Report ";
39+
2940
private RouterYarnClientUtils() {
3041

3142
}
@@ -52,4 +63,130 @@ public static GetClusterMetricsResponse merge(
5263
}
5364
return GetClusterMetricsResponse.newInstance(tmp);
5465
}
66+
67+
/**
68+
* Merges a list of ApplicationReports grouping by ApplicationId.
69+
* Our current policy is to merge the application reports from the reachable
70+
* SubClusters.
71+
* @param responses a list of ApplicationResponse to merge
72+
* @param returnPartialResult if the merge ApplicationReports should contain
73+
* partial result or not
74+
* @return the merged ApplicationsResponse
75+
*/
76+
public static GetApplicationsResponse mergeApplications(
77+
Collection<GetApplicationsResponse> responses,
78+
boolean returnPartialResult){
79+
Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
80+
Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();
81+
82+
for (GetApplicationsResponse appResponse : responses){
83+
for (ApplicationReport appReport : appResponse.getApplicationList()){
84+
ApplicationId appId = appReport.getApplicationId();
85+
// Check if this ApplicationReport is an AM
86+
if (!appReport.isUnmanagedApp()) {
87+
// Insert in the list of AM
88+
federationAM.put(appId, appReport);
89+
// Check if there are any UAM found before
90+
if (federationUAMSum.containsKey(appId)) {
91+
// Merge the current AM with the found UAM
92+
mergeAMWithUAM(appReport, federationUAMSum.get(appId));
93+
// Remove the sum of the UAMs
94+
federationUAMSum.remove(appId);
95+
}
96+
// This ApplicationReport is an UAM
97+
} else if (federationAM.containsKey(appId)) {
98+
// Merge the current UAM with its own AM
99+
mergeAMWithUAM(federationAM.get(appId), appReport);
100+
} else if (federationUAMSum.containsKey(appId)) {
101+
// Merge the current UAM with its own UAM and update the list of UAM
102+
ApplicationReport mergedUAMReport =
103+
mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
104+
federationUAMSum.put(appId, mergedUAMReport);
105+
} else {
106+
// Insert in the list of UAM
107+
federationUAMSum.put(appId, appReport);
108+
}
109+
}
110+
}
111+
// Check the remaining UAMs are depending or not from federation
112+
for (ApplicationReport appReport : federationUAMSum.values()) {
113+
if (mergeUamToReport(appReport.getName(), returnPartialResult)) {
114+
federationAM.put(appReport.getApplicationId(), appReport);
115+
}
116+
}
117+
118+
return GetApplicationsResponse.newInstance(federationAM.values());
119+
}
120+
121+
private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,
122+
ApplicationReport uam2){
123+
uam1.setName(PARTIAL_REPORT + uam1.getApplicationId());
124+
mergeAMWithUAM(uam1, uam2);
125+
return uam1;
126+
}
127+
128+
private static void mergeAMWithUAM(ApplicationReport am,
129+
ApplicationReport uam){
130+
ApplicationResourceUsageReport amResourceReport =
131+
am.getApplicationResourceUsageReport();
132+
133+
ApplicationResourceUsageReport uamResourceReport =
134+
uam.getApplicationResourceUsageReport();
135+
136+
amResourceReport.setNumUsedContainers(
137+
amResourceReport.getNumUsedContainers() +
138+
uamResourceReport.getNumUsedContainers());
139+
140+
amResourceReport.setNumReservedContainers(
141+
amResourceReport.getNumReservedContainers() +
142+
uamResourceReport.getNumReservedContainers());
143+
144+
amResourceReport.setUsedResources(Resources.add(
145+
amResourceReport.getUsedResources(),
146+
uamResourceReport.getUsedResources()));
147+
148+
amResourceReport.setReservedResources(Resources.add(
149+
amResourceReport.getReservedResources(),
150+
uamResourceReport.getReservedResources()));
151+
152+
amResourceReport.setNeededResources(Resources.add(
153+
amResourceReport.getNeededResources(),
154+
uamResourceReport.getNeededResources()));
155+
156+
amResourceReport.setMemorySeconds(
157+
amResourceReport.getMemorySeconds() +
158+
uamResourceReport.getMemorySeconds());
159+
160+
amResourceReport.setVcoreSeconds(
161+
amResourceReport.getVcoreSeconds() +
162+
uamResourceReport.getVcoreSeconds());
163+
164+
amResourceReport.setQueueUsagePercentage(
165+
amResourceReport.getQueueUsagePercentage() +
166+
uamResourceReport.getQueueUsagePercentage());
167+
168+
amResourceReport.setClusterUsagePercentage(
169+
amResourceReport.getClusterUsagePercentage() +
170+
uamResourceReport.getClusterUsagePercentage());
171+
172+
am.setApplicationResourceUsageReport(amResourceReport);
173+
}
174+
175+
/**
176+
* Returns whether or not to add an unmanaged application to the report.
177+
* @param appName Application Name
178+
* @param returnPartialResult if the merge ApplicationReports should contain
179+
* partial result or not
180+
*/
181+
private static boolean mergeUamToReport(String appName,
182+
boolean returnPartialResult){
183+
if (returnPartialResult) {
184+
return true;
185+
}
186+
if (appName == null) {
187+
return false;
188+
}
189+
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
190+
appName.startsWith(PARTIAL_REPORT));
191+
}
55192
}

0 commit comments

Comments
 (0)