Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class BulkActivitiesInfo {

private ArrayList<ActivitiesInfo> activities = new ArrayList<>();

private String subClusterId;

public BulkActivitiesInfo() {
// JAXB needs this
}
Expand All @@ -49,4 +51,12 @@ public ArrayList<ActivitiesInfo> getActivities() {
public void addAll(List<ActivitiesInfo> activitiesInfoList) {
activities.addAll(activitiesInfoList);
}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public final class RouterMetrics {
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
@Metric("# of getActivities failed to be retrieved")
private MutableGaugeInt numGetActivitiesFailedRetrieved;
@Metric("# of getBulkActivities failed to be retrieved")
private MutableGaugeInt numGetBulkActivitiesFailedRetrieved;
@Metric("# of getSchedulerInfo failed to be retrieved")
private MutableGaugeInt numGetSchedulerInfoFailedRetrieved;
@Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved")
Expand Down Expand Up @@ -237,6 +241,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
private MutableRate totalSucceededCancelDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved GetActivities and latency(ms)")
private MutableRate totalSucceededGetActivitiesRetrieved;
@Metric("Total number of successful Retrieved GetBulkActivities and latency(ms)")
private MutableRate totalSucceededGetBulkActivitiesRetrieved;
@Metric("Total number of successful Retrieved RefreshSuperUserGroupsConfig and latency(ms)")
private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved;
@Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
Expand Down Expand Up @@ -295,6 +303,8 @@ public final class RouterMetrics {
private MutableQuantiles getDelegationTokenLatency;
private MutableQuantiles renewDelegationTokenLatency;
private MutableQuantiles cancelDelegationTokenLatency;
private MutableQuantiles getActivitiesLatency;
private MutableQuantiles getBulkActivitiesLatency;
private MutableQuantiles getSchedulerInfoRetrievedLatency;
private MutableQuantiles refreshSuperUserGroupsConfLatency;
private MutableQuantiles refreshUserToGroupsMappingsLatency;
Expand Down Expand Up @@ -472,6 +482,12 @@ private RouterMetrics() {
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
"latency of cancel delegation token timeouts", "ops", "latency", 10);

getActivitiesLatency = registry.newQuantiles("getActivitiesLatency",
"latency of get activities timeouts", "ops", "latency", 10);

getBulkActivitiesLatency = registry.newQuantiles("getBulkActivitiesLatency",
"latency of get bulk activities timeouts", "ops", "latency", 10);

getSchedulerInfoRetrievedLatency = registry.newQuantiles("getSchedulerInfoRetrievedLatency",
"latency of get scheduler info timeouts", "ops", "latency", 10);

Expand Down Expand Up @@ -736,6 +752,16 @@ public long getNumSucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetActivitiesRetrieved() {
return totalSucceededGetActivitiesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetBulkActivitiesRetrieved() {
return totalSucceededGetBulkActivitiesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
Expand Down Expand Up @@ -981,6 +1007,16 @@ public double getLatencySucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetActivitiesRetrieved() {
return totalSucceededGetActivitiesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetBulkActivitiesRetrieved() {
return totalSucceededGetBulkActivitiesRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
Expand Down Expand Up @@ -1209,6 +1245,14 @@ public int getCancelDelegationTokenFailedRetrieved() {
return numCancelDelegationTokenFailedRetrieved.value();
}

public int getActivitiesFailedRetrieved() {
return numGetActivitiesFailedRetrieved.value();
}

public int getBulkActivitiesFailedRetrieved(){
return numGetBulkActivitiesFailedRetrieved.value();
}

public int getSchedulerInfoFailedRetrieved() {
return numGetSchedulerInfoFailedRetrieved.value();
}
Expand Down Expand Up @@ -1448,6 +1492,16 @@ public void succeededCancelDelegationTokenRetrieved(long duration) {
cancelDelegationTokenLatency.add(duration);
}

public void succeededGetActivitiesLatencyRetrieved(long duration) {
totalSucceededGetActivitiesRetrieved.add(duration);
getActivitiesLatency.add(duration);
}

public void succeededGetBulkActivitiesRetrieved(long duration) {
totalSucceededGetBulkActivitiesRetrieved.add(duration);
getBulkActivitiesLatency.add(duration);
}

public void succeededGetSchedulerInfoRetrieved(long duration) {
totalSucceededGetSchedulerInfoRetrieved.add(duration);
getSchedulerInfoRetrievedLatency.add(duration);
Expand Down Expand Up @@ -1659,6 +1713,14 @@ public void incrCancelDelegationTokenFailedRetrieved() {
numCancelDelegationTokenFailedRetrieved.incr();
}

public void incrGetActivitiesFailedRetrieved() {
numGetActivitiesFailedRetrieved.incr();
}

public void incrGetBulkActivitiesFailedRetrieved() {
numGetBulkActivitiesFailedRetrieved.incr();
}

public void incrGetSchedulerInfoFailedRetrieved() {
numGetSchedulerInfoFailedRetrieved.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
Expand Down Expand Up @@ -121,6 +122,7 @@
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
Expand Down Expand Up @@ -1187,16 +1189,110 @@ public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
throw new NotImplementedException("Code is not implemented");
}

/**
* This method retrieve all the activities in a specific node, and it is
* reachable by using {@link RMWSConsts#SCHEDULER_ACTIVITIES}.
*
* @param hsr the servlet request
* @param nodeId the node we want to retrieve the activities. It is a
* QueryParam.
* @param groupBy the groupBy type by which the activities should be
* aggregated. It is a QueryParam.
* @return all the activities in the specific node
*/
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy) {
throw new NotImplementedException("Code is not implemented");
try {
// Check the parameters to ensure that the parameters are not empty
Validate.checkNotNullAndNotEmpty(nodeId, "nodeId");
Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");

// Query SubClusterInfo according to id,
// if the nodeId cannot get SubClusterInfo, an exception will be thrown directly.
SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);

// Call the corresponding subCluster to get ActivitiesInfo.
long startTime = clock.getTime();
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
final HttpServletRequest hsrCopy = clone(hsr);
ActivitiesInfo activitiesInfo = interceptor.getActivities(hsrCopy, nodeId, groupBy);
if (activitiesInfo != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetActivitiesLatencyRetrieved(stopTime - startTime);
return activitiesInfo;
}
} catch (IllegalArgumentException e) {
routerMetrics.incrGetActivitiesFailedRetrieved();
throw e;
} catch (NotFoundException e) {
routerMetrics.incrGetActivitiesFailedRetrieved();
throw e;
}

routerMetrics.incrGetActivitiesFailedRetrieved();
throw new RuntimeException("getActivities Failed.");
}

/**
* This method retrieve the last n activities inside scheduler, and it is
* reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}.
*
* @param hsr the servlet request
* @param groupBy the groupBy type by which the activities should be
* aggregated. It is a QueryParam.
* @param activitiesCount number of activities
* @return last n activities
*/
@Override
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) throws InterruptedException {
throw new NotImplementedException("Code is not implemented");
try {
// Step1. Check the parameters to ensure that the parameters are not empty
Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");
Validate.checkNotNegative(activitiesCount, "activitiesCount");

// Step2. Call the interface of subCluster concurrently and get the returned result.
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class, int.class};
Object[] args = new Object[]{hsrCopy, groupBy, activitiesCount};
ClientMethod remoteMethod = new ClientMethod("getBulkActivities", argsClasses, args);
Map<SubClusterInfo, BulkActivitiesInfo> appStatisticsMap = invokeConcurrent(
subClustersActive.values(), remoteMethod, BulkActivitiesInfo.class);

// Step3. Generate Federation objects and set subCluster information.
long startTime = clock.getTime();
FederationBulkActivitiesInfo fedBulkActivitiesInfo = new FederationBulkActivitiesInfo();
appStatisticsMap.forEach((subClusterInfo, bulkActivitiesInfo) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
bulkActivitiesInfo.setSubClusterId(subClusterId.getId());
fedBulkActivitiesInfo.getList().add(bulkActivitiesInfo);
});
long stopTime = clock.getTime();
routerMetrics.succeededGetBulkActivitiesRetrieved(stopTime - startTime);
return fedBulkActivitiesInfo;
} catch (IllegalArgumentException e) {
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
throw e;
} catch (NotFoundException e) {
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
} catch (IOException e) {
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"getBulkActivities by groupBy = %s, activitiesCount = %s with io error.",
groupBy, String.valueOf(activitiesCount));
} catch (YarnException e) {
routerMetrics.incrGetBulkActivitiesFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"getBulkActivities by groupBy = %s, activitiesCount = %s with yarn error.",
groupBy, String.valueOf(activitiesCount));
}

routerMetrics.incrGetBulkActivitiesFailedRetrieved();
throw new RuntimeException("getBulkActivities Failed.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp.dao;

import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class FederationBulkActivitiesInfo extends BulkActivitiesInfo {

@XmlElement(name = "subCluster")
private ArrayList<BulkActivitiesInfo> list = new ArrayList<>();

public FederationBulkActivitiesInfo() {
} // JAXB needs this

public FederationBulkActivitiesInfo(ArrayList<BulkActivitiesInfo> list) {
this.list = list;
}

public ArrayList<BulkActivitiesInfo> getList() {
return list;
}

public void setList(ArrayList<BulkActivitiesInfo> list) {
this.list = list;
}
}
Loading