Skip to content

Commit 468135a

Browse files
authored
YARN-11218. [Federation] Add getActivities, getBulkActivities REST APIs for Router. (#5284)
1 parent cf1b371 commit 468135a

File tree

7 files changed

+434
-2
lines changed

7 files changed

+434
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class BulkActivitiesInfo {
3434

3535
private ArrayList<ActivitiesInfo> activities = new ArrayList<>();
3636

37+
private String subClusterId;
38+
3739
public BulkActivitiesInfo() {
3840
// JAXB needs this
3941
}
@@ -49,4 +51,12 @@ public ArrayList<ActivitiesInfo> getActivities() {
4951
public void addAll(List<ActivitiesInfo> activitiesInfoList) {
5052
activities.addAll(activitiesInfoList);
5153
}
54+
55+
public String getSubClusterId() {
56+
return subClusterId;
57+
}
58+
59+
public void setSubClusterId(String subClusterId) {
60+
this.subClusterId = subClusterId;
61+
}
5262
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ public final class RouterMetrics {
135135
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
136136
@Metric("# of renewDelegationToken failed to be retrieved")
137137
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
138+
@Metric("# of getActivities failed to be retrieved")
139+
private MutableGaugeInt numGetActivitiesFailedRetrieved;
140+
@Metric("# of getBulkActivities failed to be retrieved")
141+
private MutableGaugeInt numGetBulkActivitiesFailedRetrieved;
138142
@Metric("# of getSchedulerInfo failed to be retrieved")
139143
private MutableGaugeInt numGetSchedulerInfoFailedRetrieved;
140144
@Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved")
@@ -237,6 +241,10 @@ public final class RouterMetrics {
237241
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
238242
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
239243
private MutableRate totalSucceededCancelDelegationTokenRetrieved;
244+
@Metric("Total number of successful Retrieved GetActivities and latency(ms)")
245+
private MutableRate totalSucceededGetActivitiesRetrieved;
246+
@Metric("Total number of successful Retrieved GetBulkActivities and latency(ms)")
247+
private MutableRate totalSucceededGetBulkActivitiesRetrieved;
240248
@Metric("Total number of successful Retrieved RefreshSuperUserGroupsConfig and latency(ms)")
241249
private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved;
242250
@Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
@@ -295,6 +303,8 @@ public final class RouterMetrics {
295303
private MutableQuantiles getDelegationTokenLatency;
296304
private MutableQuantiles renewDelegationTokenLatency;
297305
private MutableQuantiles cancelDelegationTokenLatency;
306+
private MutableQuantiles getActivitiesLatency;
307+
private MutableQuantiles getBulkActivitiesLatency;
298308
private MutableQuantiles getSchedulerInfoRetrievedLatency;
299309
private MutableQuantiles refreshSuperUserGroupsConfLatency;
300310
private MutableQuantiles refreshUserToGroupsMappingsLatency;
@@ -472,6 +482,12 @@ private RouterMetrics() {
472482
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
473483
"latency of cancel delegation token timeouts", "ops", "latency", 10);
474484

485+
getActivitiesLatency = registry.newQuantiles("getActivitiesLatency",
486+
"latency of get activities timeouts", "ops", "latency", 10);
487+
488+
getBulkActivitiesLatency = registry.newQuantiles("getBulkActivitiesLatency",
489+
"latency of get bulk activities timeouts", "ops", "latency", 10);
490+
475491
getSchedulerInfoRetrievedLatency = registry.newQuantiles("getSchedulerInfoRetrievedLatency",
476492
"latency of get scheduler info timeouts", "ops", "latency", 10);
477493

@@ -736,6 +752,16 @@ public long getNumSucceededCancelDelegationTokenRetrieved() {
736752
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
737753
}
738754

755+
@VisibleForTesting
756+
public long getNumSucceededGetActivitiesRetrieved() {
757+
return totalSucceededGetActivitiesRetrieved.lastStat().numSamples();
758+
}
759+
760+
@VisibleForTesting
761+
public long getNumSucceededGetBulkActivitiesRetrieved() {
762+
return totalSucceededGetBulkActivitiesRetrieved.lastStat().numSamples();
763+
}
764+
739765
@VisibleForTesting
740766
public long getNumSucceededGetSchedulerInfoRetrieved() {
741767
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
@@ -981,6 +1007,16 @@ public double getLatencySucceededCancelDelegationTokenRetrieved() {
9811007
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
9821008
}
9831009

1010+
@VisibleForTesting
1011+
public double getLatencySucceededGetActivitiesRetrieved() {
1012+
return totalSucceededGetActivitiesRetrieved.lastStat().mean();
1013+
}
1014+
1015+
@VisibleForTesting
1016+
public double getLatencySucceededGetBulkActivitiesRetrieved() {
1017+
return totalSucceededGetBulkActivitiesRetrieved.lastStat().mean();
1018+
}
1019+
9841020
@VisibleForTesting
9851021
public double getLatencySucceededGetSchedulerInfoRetrieved() {
9861022
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
@@ -1209,6 +1245,14 @@ public int getCancelDelegationTokenFailedRetrieved() {
12091245
return numCancelDelegationTokenFailedRetrieved.value();
12101246
}
12111247

1248+
public int getActivitiesFailedRetrieved() {
1249+
return numGetActivitiesFailedRetrieved.value();
1250+
}
1251+
1252+
public int getBulkActivitiesFailedRetrieved(){
1253+
return numGetBulkActivitiesFailedRetrieved.value();
1254+
}
1255+
12121256
public int getSchedulerInfoFailedRetrieved() {
12131257
return numGetSchedulerInfoFailedRetrieved.value();
12141258
}
@@ -1448,6 +1492,16 @@ public void succeededCancelDelegationTokenRetrieved(long duration) {
14481492
cancelDelegationTokenLatency.add(duration);
14491493
}
14501494

1495+
public void succeededGetActivitiesLatencyRetrieved(long duration) {
1496+
totalSucceededGetActivitiesRetrieved.add(duration);
1497+
getActivitiesLatency.add(duration);
1498+
}
1499+
1500+
public void succeededGetBulkActivitiesRetrieved(long duration) {
1501+
totalSucceededGetBulkActivitiesRetrieved.add(duration);
1502+
getBulkActivitiesLatency.add(duration);
1503+
}
1504+
14511505
public void succeededGetSchedulerInfoRetrieved(long duration) {
14521506
totalSucceededGetSchedulerInfoRetrieved.add(duration);
14531507
getSchedulerInfoRetrievedLatency.add(duration);
@@ -1659,6 +1713,14 @@ public void incrCancelDelegationTokenFailedRetrieved() {
16591713
numCancelDelegationTokenFailedRetrieved.incr();
16601714
}
16611715

1716+
public void incrGetActivitiesFailedRetrieved() {
1717+
numGetActivitiesFailedRetrieved.incr();
1718+
}
1719+
1720+
public void incrGetBulkActivitiesFailedRetrieved() {
1721+
numGetBulkActivitiesFailedRetrieved.incr();
1722+
}
1723+
16621724
public void incrGetSchedulerInfoFailedRetrieved() {
16631725
numGetSchedulerInfoFailedRetrieved.incr();
16641726
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.commons.lang3.NotImplementedException;
4747
import org.apache.commons.lang3.StringUtils;
4848
import org.apache.hadoop.conf.Configuration;
49+
import org.apache.hadoop.fs.impl.prefetch.Validate;
4950
import org.apache.hadoop.io.Text;
5051
import org.apache.hadoop.security.UserGroupInformation;
5152
import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -121,6 +122,7 @@
121122
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
122123
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
123124
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
125+
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
124126
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
125127
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
126128
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
@@ -1187,16 +1189,110 @@ public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
11871189
throw new NotImplementedException("Code is not implemented");
11881190
}
11891191

1192+
/**
1193+
* This method retrieve all the activities in a specific node, and it is
1194+
* reachable by using {@link RMWSConsts#SCHEDULER_ACTIVITIES}.
1195+
*
1196+
* @param hsr the servlet request
1197+
* @param nodeId the node we want to retrieve the activities. It is a
1198+
* QueryParam.
1199+
* @param groupBy the groupBy type by which the activities should be
1200+
* aggregated. It is a QueryParam.
1201+
* @return all the activities in the specific node
1202+
*/
11901203
@Override
11911204
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
11921205
String groupBy) {
1193-
throw new NotImplementedException("Code is not implemented");
1206+
try {
1207+
// Check the parameters to ensure that the parameters are not empty
1208+
Validate.checkNotNullAndNotEmpty(nodeId, "nodeId");
1209+
Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");
1210+
1211+
// Query SubClusterInfo according to id,
1212+
// if the nodeId cannot get SubClusterInfo, an exception will be thrown directly.
1213+
SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
1214+
1215+
// Call the corresponding subCluster to get ActivitiesInfo.
1216+
long startTime = clock.getTime();
1217+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1218+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1219+
final HttpServletRequest hsrCopy = clone(hsr);
1220+
ActivitiesInfo activitiesInfo = interceptor.getActivities(hsrCopy, nodeId, groupBy);
1221+
if (activitiesInfo != null) {
1222+
long stopTime = clock.getTime();
1223+
routerMetrics.succeededGetActivitiesLatencyRetrieved(stopTime - startTime);
1224+
return activitiesInfo;
1225+
}
1226+
} catch (IllegalArgumentException e) {
1227+
routerMetrics.incrGetActivitiesFailedRetrieved();
1228+
throw e;
1229+
} catch (NotFoundException e) {
1230+
routerMetrics.incrGetActivitiesFailedRetrieved();
1231+
throw e;
1232+
}
1233+
1234+
routerMetrics.incrGetActivitiesFailedRetrieved();
1235+
throw new RuntimeException("getActivities Failed.");
11941236
}
11951237

1238+
/**
1239+
* This method retrieve the last n activities inside scheduler, and it is
1240+
* reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}.
1241+
*
1242+
* @param hsr the servlet request
1243+
* @param groupBy the groupBy type by which the activities should be
1244+
* aggregated. It is a QueryParam.
1245+
* @param activitiesCount number of activities
1246+
* @return last n activities
1247+
*/
11961248
@Override
11971249
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
11981250
String groupBy, int activitiesCount) throws InterruptedException {
1199-
throw new NotImplementedException("Code is not implemented");
1251+
try {
1252+
// Step1. Check the parameters to ensure that the parameters are not empty
1253+
Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");
1254+
Validate.checkNotNegative(activitiesCount, "activitiesCount");
1255+
1256+
// Step2. Call the interface of subCluster concurrently and get the returned result.
1257+
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
1258+
final HttpServletRequest hsrCopy = clone(hsr);
1259+
Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class, int.class};
1260+
Object[] args = new Object[]{hsrCopy, groupBy, activitiesCount};
1261+
ClientMethod remoteMethod = new ClientMethod("getBulkActivities", argsClasses, args);
1262+
Map<SubClusterInfo, BulkActivitiesInfo> appStatisticsMap = invokeConcurrent(
1263+
subClustersActive.values(), remoteMethod, BulkActivitiesInfo.class);
1264+
1265+
// Step3. Generate Federation objects and set subCluster information.
1266+
long startTime = clock.getTime();
1267+
FederationBulkActivitiesInfo fedBulkActivitiesInfo = new FederationBulkActivitiesInfo();
1268+
appStatisticsMap.forEach((subClusterInfo, bulkActivitiesInfo) -> {
1269+
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
1270+
bulkActivitiesInfo.setSubClusterId(subClusterId.getId());
1271+
fedBulkActivitiesInfo.getList().add(bulkActivitiesInfo);
1272+
});
1273+
long stopTime = clock.getTime();
1274+
routerMetrics.succeededGetBulkActivitiesRetrieved(stopTime - startTime);
1275+
return fedBulkActivitiesInfo;
1276+
} catch (IllegalArgumentException e) {
1277+
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
1278+
throw e;
1279+
} catch (NotFoundException e) {
1280+
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
1281+
RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
1282+
} catch (IOException e) {
1283+
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
1284+
RouterServerUtil.logAndThrowRunTimeException(e,
1285+
"getBulkActivities by groupBy = %s, activitiesCount = %s with io error.",
1286+
groupBy, String.valueOf(activitiesCount));
1287+
} catch (YarnException e) {
1288+
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
1289+
RouterServerUtil.logAndThrowRunTimeException(e,
1290+
"getBulkActivities by groupBy = %s, activitiesCount = %s with yarn error.",
1291+
groupBy, String.valueOf(activitiesCount));
1292+
}
1293+
1294+
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
1295+
throw new RuntimeException("getBulkActivities Failed.");
12001296
}
12011297

12021298
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.router.webapp.dao;
19+
20+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
21+
22+
import javax.xml.bind.annotation.XmlAccessType;
23+
import javax.xml.bind.annotation.XmlAccessorType;
24+
import javax.xml.bind.annotation.XmlElement;
25+
import javax.xml.bind.annotation.XmlRootElement;
26+
import java.util.ArrayList;
27+
28+
@XmlRootElement
29+
@XmlAccessorType(XmlAccessType.FIELD)
30+
public class FederationBulkActivitiesInfo extends BulkActivitiesInfo {
31+
32+
@XmlElement(name = "subCluster")
33+
private ArrayList<BulkActivitiesInfo> list = new ArrayList<>();
34+
35+
public FederationBulkActivitiesInfo() {
36+
} // JAXB needs this
37+
38+
public FederationBulkActivitiesInfo(ArrayList<BulkActivitiesInfo> list) {
39+
this.list = list;
40+
}
41+
42+
public ArrayList<BulkActivitiesInfo> getList() {
43+
return list;
44+
}
45+
46+
public void setList(ArrayList<BulkActivitiesInfo> list) {
47+
this.list = list;
48+
}
49+
}

0 commit comments

Comments
 (0)