|
47 | 47 | import org.apache.commons.lang3.tuple.Pair; |
48 | 48 | import org.apache.hadoop.conf.Configuration; |
49 | 49 | import org.apache.hadoop.security.authorize.AuthorizationException; |
| 50 | +import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; |
50 | 51 | import org.apache.hadoop.util.ReflectionUtils; |
51 | 52 | import org.apache.hadoop.util.Sets; |
52 | 53 | import org.apache.hadoop.util.Time; |
|
67 | 68 | import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry; |
68 | 69 | import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; |
69 | 70 | import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; |
| 71 | +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; |
70 | 72 | import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; |
71 | 73 | import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; |
| 74 | +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; |
72 | 75 | import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; |
73 | 76 | import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; |
74 | 77 | import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; |
|
106 | 109 | import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod; |
107 | 110 | import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; |
108 | 111 | import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; |
| 112 | +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; |
109 | 113 | import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; |
110 | 114 | import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; |
111 | 115 | import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; |
@@ -1120,9 +1124,72 @@ public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) { |
1120 | 1124 | throw new NotImplementedException("Code is not implemented"); |
1121 | 1125 | } |
1122 | 1126 |
|
| 1127 | + /** |
| 1128 | + * This method retrieves the current scheduler status, and it is reachable by |
| 1129 | + * using {@link RMWSConsts#SCHEDULER}. |
| 1130 | + * |
| 1131 | + * For the federation mode, the SchedulerType information of the cluster |
| 1132 | + * cannot be integrated and displayed, and the specific cluster information needs to be marked. |
| 1133 | + * |
| 1134 | + * @return the current scheduler status |
| 1135 | + */ |
1123 | 1136 | @Override |
1124 | 1137 | public SchedulerTypeInfo getSchedulerInfo() { |
1125 | | - throw new NotImplementedException("Code is not implemented"); |
| 1138 | + |
| 1139 | + try { |
| 1140 | + LOG.info("xxxx>>>>getSchedulerInfo"); |
| 1141 | + long startTime = Time.now(); |
| 1142 | + |
| 1143 | + // Initialize subcluster information |
| 1144 | + String scAmRMAddress = "5.6.7.8:5"; |
| 1145 | + String scClientRMAddress = "5.6.7.8:6"; |
| 1146 | + String scRmAdminAddress = "5.6.7.8:7"; |
| 1147 | + String scWebAppAddress = "127.0.0.1:8080"; |
| 1148 | + |
| 1149 | + // Initialize subcluster sc1 |
| 1150 | + SubClusterInfo sc1 = |
| 1151 | + SubClusterInfo.newInstance(SubClusterId.newInstance("SC-1"), |
| 1152 | + scAmRMAddress, scClientRMAddress, scRmAdminAddress, scWebAppAddress, |
| 1153 | + SubClusterState.SC_RUNNING, Time.now(), ""); |
| 1154 | + sc1.setLastHeartBeat(Time.now()); |
| 1155 | + |
| 1156 | + // Initialize subcluster sc2 |
| 1157 | + SubClusterInfo sc2 = |
| 1158 | + SubClusterInfo.newInstance(SubClusterId.newInstance("SC-2"), |
| 1159 | + scAmRMAddress, scClientRMAddress, scRmAdminAddress, scWebAppAddress, |
| 1160 | + SubClusterState.SC_RUNNING, Time.now(), ""); |
| 1161 | + sc2.setLastHeartBeat(Time.now()); |
| 1162 | + |
| 1163 | + |
| 1164 | + // Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters(); |
| 1165 | + Map<SubClusterId, SubClusterInfo> subClustersActive = Maps.newHashMap(); |
| 1166 | + subClustersActive.put(SubClusterId.newInstance("SC-1"), sc1); |
| 1167 | + subClustersActive.put(SubClusterId.newInstance("SC-2"), sc2); |
| 1168 | + |
| 1169 | + LOG.info("xxxx>>>>subClustersActive.size={}", subClustersActive.size()); |
| 1170 | + Class[] argsClasses = new Class[]{}; |
| 1171 | + Object[] args = new Object[]{}; |
| 1172 | + ClientMethod remoteMethod = new ClientMethod("getSchedulerInfo", argsClasses, args); |
| 1173 | + Map<SubClusterInfo, SchedulerTypeInfo> subClusterInfoMap = |
| 1174 | + invokeConcurrent(subClustersActive.values(), remoteMethod, SchedulerTypeInfo.class); |
| 1175 | + FederationSchedulerTypeInfo federationSchedulerTypeInfo = new FederationSchedulerTypeInfo(); |
| 1176 | + subClusterInfoMap.forEach((subClusterInfo, schedulerTypeInfo) -> { |
| 1177 | + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); |
| 1178 | + schedulerTypeInfo.setSubClusterId(subClusterId.getId()); |
| 1179 | + federationSchedulerTypeInfo.getList().add(schedulerTypeInfo); |
| 1180 | + }); |
| 1181 | + long stopTime = Time.now(); |
| 1182 | + return federationSchedulerTypeInfo; |
| 1183 | + } catch (NotFoundException e) { |
| 1184 | + // routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); |
| 1185 | + RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e); |
| 1186 | + } catch (YarnException | IOException e) { |
| 1187 | + // routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); |
| 1188 | + RouterServerUtil.logAndThrowRunTimeException("getSchedulerInfo error.", e); |
| 1189 | + } |
| 1190 | + |
| 1191 | + // routerMetrics.incrCheckUserAccessToQueueFailedRetrieved(); |
| 1192 | + throw new RuntimeException("getSchedulerInfo error."); |
1126 | 1193 | } |
1127 | 1194 |
|
1128 | 1195 | @Override |
|
0 commit comments