Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
.getAutoCreationEligibility(parent);

defaultNodeLabelExpression = parent.getDefaultNodeLabelExpression();
schedulerName = "Capacity Scheduler";
}

public float getCapacity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public FairSchedulerInfo(FairScheduler fs) {
scheduler = fs;
rootQueue = new FairSchedulerQueueInfo(scheduler.getQueueManager().
getRootQueue(), scheduler);
schedulerName = "Fair Scheduler";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public FifoSchedulerInfo(final ResourceManager rm) {
this.totalNodeCapacity += ni.getTotalCapability().getMemorySize();
this.numContainers += fs.getNodeReport(ni.getNodeID()).getNumContainers();
}

this.schedulerName = "Fifo Scheduler";
}

public int getNumNodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
@XmlRootElement(name = "scheduler")
@XmlAccessorType(XmlAccessType.FIELD)
public class SchedulerTypeInfo {
protected SchedulerInfo schedulerInfo;
private SchedulerInfo schedulerInfo;
private String subClusterId;

public SchedulerTypeInfo() {
} // JAXB needs this
Expand All @@ -37,4 +38,12 @@ public SchedulerTypeInfo(final SchedulerInfo scheduler) {
public SchedulerInfo getSchedulerInfo() {
return schedulerInfo;
}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public boolean attemptExists(RMAppAttempt attempt) throws IOException {
}
}

@Test(timeout = 60000)
@Test(timeout = 120000)
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public final class RouterMetrics {
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
@Metric("# of getSchedulerInfo failed to be retrieved")
private MutableGaugeInt numGetSchedulerInfoFailedRetrieved;
@Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved")
private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved;
@Metric("# of refreshUserToGroupsMappings failed to be retrieved")
Expand Down Expand Up @@ -240,6 +242,9 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
private MutableRate totalSucceededRefreshUserToGroupsMappingsRetrieved;

@Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)")
private MutableRate totalSucceededGetSchedulerInfoRetrieved;

/**
* Provide quantile counters for all latencies.
*/
Expand Down Expand Up @@ -290,6 +295,7 @@ public final class RouterMetrics {
private MutableQuantiles getDelegationTokenLatency;
private MutableQuantiles renewDelegationTokenLatency;
private MutableQuantiles cancelDelegationTokenLatency;
private MutableQuantiles getSchedulerInfoRetrievedLatency;
private MutableQuantiles refreshSuperUserGroupsConfLatency;
private MutableQuantiles refreshUserToGroupsMappingsLatency;

Expand Down Expand Up @@ -466,6 +472,9 @@ private RouterMetrics() {
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
"latency of cancel delegation token timeouts", "ops", "latency", 10);

getSchedulerInfoRetrievedLatency = registry.newQuantiles("getSchedulerInfoRetrievedLatency",
"latency of get scheduler info timeouts", "ops", "latency", 10);

refreshSuperUserGroupsConfLatency = registry.newQuantiles("refreshSuperUserGroupsConfLatency",
"latency of refresh superuser groups configuration timeouts", "ops", "latency", 10);

Expand Down Expand Up @@ -727,6 +736,11 @@ public long getNumSucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
Expand Down Expand Up @@ -967,6 +981,11 @@ public double getLatencySucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetSchedulerInfoRetrieved() {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
Expand Down Expand Up @@ -1190,6 +1209,10 @@ public int getCancelDelegationTokenFailedRetrieved() {
return numCancelDelegationTokenFailedRetrieved.value();
}

public int getSchedulerInfoFailedRetrieved() {
return numGetSchedulerInfoFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -1425,6 +1448,11 @@ public void succeededCancelDelegationTokenRetrieved(long duration) {
cancelDelegationTokenLatency.add(duration);
}

public void succeededGetSchedulerInfoRetrieved(long duration) {
totalSucceededGetSchedulerInfoRetrieved.add(duration);
getSchedulerInfoRetrievedLatency.add(duration);
}

public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
refreshSuperUserGroupsConfLatency.add(duration);
Expand Down Expand Up @@ -1630,4 +1658,8 @@ public void incrRenewDelegationTokenFailedRetrieved() {
public void incrCancelDelegationTokenFailedRetrieved() {
numCancelDelegationTokenFailedRetrieved.incr();
}

public void incrGetSchedulerInfoFailedRetrieved() {
numGetSchedulerInfoFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
Expand Down Expand Up @@ -122,6 +123,7 @@
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -1140,9 +1142,43 @@ public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) {
throw new NotImplementedException("Code is not implemented");
}

/**
* This method retrieves the current scheduler status, and it is reachable by
* using {@link RMWSConsts#SCHEDULER}.
*
* For the federation mode, the SchedulerType information of the cluster
* cannot be integrated and displayed, and the specific cluster information needs to be marked.
*
* @return the current scheduler status
*/
@Override
public SchedulerTypeInfo getSchedulerInfo() {
throw new NotImplementedException("Code is not implemented");
try {
long startTime = Time.now();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
Class[] argsClasses = new Class[]{};
Object[] args = new Object[]{};
ClientMethod remoteMethod = new ClientMethod("getSchedulerInfo", argsClasses, args);
Map<SubClusterInfo, SchedulerTypeInfo> subClusterInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, SchedulerTypeInfo.class);
FederationSchedulerTypeInfo federationSchedulerTypeInfo = new FederationSchedulerTypeInfo();
subClusterInfoMap.forEach((subClusterInfo, schedulerTypeInfo) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
schedulerTypeInfo.setSubClusterId(subClusterId.getId());
federationSchedulerTypeInfo.getList().add(schedulerTypeInfo);
});
long stopTime = Time.now();
routerMetrics.succeededGetSchedulerInfoRetrieved(stopTime - startTime);
return federationSchedulerTypeInfo;
} catch (NotFoundException e) {
routerMetrics.incrGetSchedulerInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
} catch (YarnException | IOException e) {
routerMetrics.incrGetSchedulerInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getSchedulerInfo error.", e);
}
routerMetrics.incrGetSchedulerInfoFailedRetrieved();
throw new RuntimeException("getSchedulerInfo error.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp.dao;

import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class FederationSchedulerTypeInfo extends SchedulerTypeInfo {
@XmlElement(name = "subCluster")
private List<SchedulerTypeInfo> list = new ArrayList<>();

public FederationSchedulerTypeInfo() {
} // JAXB needs this

public FederationSchedulerTypeInfo(ArrayList<SchedulerTypeInfo> list) {
this.list = list;
}

public List<SchedulerTypeInfo> getList() {
return list;
}

public void setList(List<SchedulerTypeInfo> list) {
this.list = list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
Expand Down Expand Up @@ -138,6 +140,9 @@
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -1206,4 +1211,17 @@ public RMQueueAclInfo checkUserAccessToQueue(
return new RMQueueAclInfo(true, user.getUserName(), "");
}
}

@Override
public SchedulerTypeInfo getSchedulerInfo() {
try {
ResourceManager resourceManager = CapacitySchedulerTestUtilities.createResourceManager();
CapacityScheduler cs = (CapacityScheduler) resourceManager.getResourceScheduler();
CSQueue root = cs.getRootQueue();
SchedulerInfo schedulerInfo = new CapacitySchedulerInfo(root, cs);
return new SchedulerTypeInfo(schedulerInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;

import javax.servlet.http.HttpServletRequest;
Expand All @@ -37,6 +38,7 @@
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -105,12 +107,18 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Times;
Expand Down Expand Up @@ -1526,6 +1534,64 @@ public void testCheckFederationInterceptorRESTClient() {
Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
}

@Test
public void testGetSchedulerInfo() {
// In this test case, we will get the return results of 4 sub-clusters.
SchedulerTypeInfo typeInfo = interceptor.getSchedulerInfo();
Assert.assertNotNull(typeInfo);
Assert.assertTrue(typeInfo instanceof FederationSchedulerTypeInfo);

FederationSchedulerTypeInfo federationSchedulerTypeInfo =
FederationSchedulerTypeInfo.class.cast(typeInfo);
Assert.assertNotNull(federationSchedulerTypeInfo);
List<SchedulerTypeInfo> schedulerTypeInfos = federationSchedulerTypeInfo.getList();
Assert.assertNotNull(schedulerTypeInfos);
Assert.assertEquals(4, schedulerTypeInfos.size());
List<String> subClusterIds =
subClusters.stream().map(subClusterId -> subClusterId.getId()).
collect(Collectors.toList());

for (SchedulerTypeInfo schedulerTypeInfo : schedulerTypeInfos) {
Assert.assertNotNull(schedulerTypeInfo);

// 1. Whether the returned subClusterId is in the subCluster list
String subClusterId = schedulerTypeInfo.getSubClusterId();
Assert.assertTrue(subClusterIds.contains(subClusterId));

// 2. We test CapacityScheduler, the returned type should be CapacityScheduler.
SchedulerInfo schedulerInfo = schedulerTypeInfo.getSchedulerInfo();
Assert.assertNotNull(schedulerInfo);
Assert.assertTrue(schedulerInfo instanceof CapacitySchedulerInfo);
CapacitySchedulerInfo capacitySchedulerInfo =
CapacitySchedulerInfo.class.cast(schedulerInfo);
Assert.assertNotNull(capacitySchedulerInfo);

// 3. The parent queue name should be root
String queueName = capacitySchedulerInfo.getQueueName();
Assert.assertEquals("root", queueName);

// 4. schedulerType should be CapacityScheduler
String schedulerType = capacitySchedulerInfo.getSchedulerType();
Assert.assertEquals("Capacity Scheduler", schedulerType);

// 5. queue path should be root
String queuePath = capacitySchedulerInfo.getQueuePath();
Assert.assertEquals("root", queuePath);

// 6. mockRM has 2 test queues, [root.a, root.b]
List<String> queues = Lists.newArrayList("root.a", "root.b");
CapacitySchedulerQueueInfoList csSchedulerQueueInfoList = capacitySchedulerInfo.getQueues();
Assert.assertNotNull(csSchedulerQueueInfoList);
List<CapacitySchedulerQueueInfo> csQueueInfoList =
csSchedulerQueueInfoList.getQueueInfoList();
Assert.assertEquals(2, csQueueInfoList.size());
for (CapacitySchedulerQueueInfo csQueueInfo : csQueueInfoList) {
Assert.assertNotNull(csQueueInfo);
Assert.assertTrue(queues.contains(csQueueInfo.getQueuePath()));
}
}
}

@Test
public void testPostDelegationTokenErrorHsr() throws Exception {
// Prepare delegationToken data
Expand Down