-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11180. Refactor some code of getNewApplication, submitApplication etc. #4618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
bfef1dc
a8150e4
975d9bf
24234ac
15ab9a9
390f1f2
fe41b76
41b7d8a
ea6a4a3
07a9043
730ea24
515f422
b2aa446
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -227,35 +227,29 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( | |
| ApplicationClientProtocol clientRMProxy = null; | ||
| try { | ||
| boolean serviceAuthEnabled = getConf().getBoolean( | ||
| CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); | ||
| CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); | ||
| UserGroupInformation realUser = user; | ||
| if (serviceAuthEnabled) { | ||
| realUser = UserGroupInformation.createProxyUser( | ||
| user.getShortUserName(), UserGroupInformation.getLoginUser()); | ||
| user.getShortUserName(), UserGroupInformation.getLoginUser()); | ||
| } | ||
| clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), | ||
| ApplicationClientProtocol.class, subClusterId, realUser); | ||
| } catch (Exception e) { | ||
| RouterServerUtil.logAndThrowException( | ||
| "Unable to create the interface to reach the SubCluster " | ||
| + subClusterId, | ||
| e); | ||
| "Unable to create the interface to reach the SubCluster " + subClusterId, e); | ||
| } | ||
|
|
||
| clientRMProxies.put(subClusterId, clientRMProxy); | ||
| return clientRMProxy; | ||
| } | ||
|
|
||
| private SubClusterId getRandomActiveSubCluster( | ||
| Map<SubClusterId, SubClusterInfo> activeSubclusters) | ||
| throws YarnException { | ||
|
|
||
| if (activeSubclusters == null || activeSubclusters.size() < 1) { | ||
| Map<SubClusterId, SubClusterInfo> activeSubClusters) throws YarnException { | ||
| if (activeSubClusters == null || activeSubClusters.size() < 1) { | ||
| RouterServerUtil.logAndThrowException( | ||
| FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); | ||
| } | ||
| List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); | ||
|
|
||
| List<SubClusterId> list = new ArrayList<>(activeSubClusters.keySet()); | ||
| return list.get(rand.nextInt(list.size())); | ||
| } | ||
|
|
||
|
|
@@ -280,47 +274,50 @@ private SubClusterId getRandomActiveSubCluster( | |
| public GetNewApplicationResponse getNewApplication( | ||
| GetNewApplicationRequest request) throws YarnException, IOException { | ||
|
|
||
| long startTime = clock.getTime(); | ||
| if(request == null) { | ||
|
||
| routerMetrics.incrAppsFailedCreated(); | ||
| String errMsg = "Missing getNewApplication request."; | ||
| RouterAuditLogger.logFailure(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN", | ||
| "RouterClientRMService", errMsg); | ||
| RouterServerUtil.logAndThrowException(errMsg, null); | ||
| } | ||
|
|
||
| long startTime = clock.getTime(); | ||
| Map<SubClusterId, SubClusterInfo> subClustersActive = | ||
| federationFacade.getSubClusters(true); | ||
|
|
||
| GetNewApplicationResponse response = null; | ||
|
||
|
|
||
| for (int i = 0; i < numSubmitRetries; ++i) { | ||
| SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); | ||
| LOG.debug( | ||
| "getNewApplication try #{} on SubCluster {}", i, subClusterId); | ||
| ApplicationClientProtocol clientRMProxy = | ||
| getClientRMProxyForSubCluster(subClusterId); | ||
| GetNewApplicationResponse response = null; | ||
| LOG.info("getNewApplication try #{} on SubCluster {}", i, subClusterId); | ||
| ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); | ||
| response = null; | ||
| try { | ||
| response = clientRMProxy.getNewApplication(request); | ||
| } catch (Exception e) { | ||
| LOG.warn("Unable to create a new ApplicationId in SubCluster " | ||
| + subClusterId.getId(), e); | ||
| LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); | ||
| subClustersActive.remove(subClusterId); | ||
| } | ||
|
|
||
| if (response != null) { | ||
|
|
||
| long stopTime = clock.getTime(); | ||
| routerMetrics.succeededAppsCreated(stopTime - startTime); | ||
| RouterAuditLogger.logSuccess(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.GET_NEW_APP, | ||
| "RouterClientRMService", response.getApplicationId()); | ||
| return response; | ||
| } else { | ||
| // Empty response from the ResourceManager. | ||
| // Blacklist this subcluster for this request. | ||
| subClustersActive.remove(subClusterId); | ||
goiri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| } | ||
|
|
||
| routerMetrics.incrAppsFailedCreated(); | ||
| String errMsg = "Fail to create a new application."; | ||
| String errMsg = "Failed to create a new application."; | ||
| RouterAuditLogger.logFailure(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN", | ||
| "RouterClientRMService", errMsg); | ||
| throw new YarnException(errMsg); | ||
| RouterServerUtil.logAndThrowException(errMsg, null); | ||
| return response; | ||
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -392,32 +389,31 @@ public GetNewApplicationResponse getNewApplication( | |
| public SubmitApplicationResponse submitApplication( | ||
| SubmitApplicationRequest request) throws YarnException, IOException { | ||
|
|
||
| long startTime = clock.getTime(); | ||
|
|
||
| if (request == null || request.getApplicationSubmissionContext() == null | ||
| || request.getApplicationSubmissionContext() | ||
| .getApplicationId() == null) { | ||
| || request.getApplicationSubmissionContext().getApplicationId() == null) { | ||
| routerMetrics.incrAppsFailedSubmitted(); | ||
| String errMsg = | ||
| "Missing submitApplication request or applicationSubmissionContext " | ||
| + "information."; | ||
| "Missing submitApplication request or applicationSubmissionContext information."; | ||
| RouterAuditLogger.logFailure(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", | ||
| "RouterClientRMService", errMsg); | ||
| throw new YarnException(errMsg); | ||
| RouterServerUtil.logAndThrowException(errMsg, null); | ||
| } | ||
|
|
||
| long startTime = clock.getTime(); | ||
|
|
||
| ApplicationId applicationId = | ||
| request.getApplicationSubmissionContext().getApplicationId(); | ||
|
|
||
| List<SubClusterId> blacklist = new ArrayList<SubClusterId>(); | ||
| List<SubClusterId> blacklist = new ArrayList<>(); | ||
|
|
||
| for (int i = 0; i < numSubmitRetries; ++i) { | ||
|
|
||
| SubClusterId subClusterId = policyFacade.getHomeSubcluster( | ||
| request.getApplicationSubmissionContext(), blacklist); | ||
| LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i, | ||
| subClusterId); | ||
|
|
||
| LOG.info("submitApplication appId {} try #{} on SubCluster {}.", | ||
| applicationId, i, subClusterId); | ||
|
|
||
| ApplicationHomeSubCluster appHomeSubCluster = | ||
| ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); | ||
|
|
@@ -430,32 +426,34 @@ public SubmitApplicationResponse submitApplication( | |
| federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); | ||
| } catch (YarnException e) { | ||
| routerMetrics.incrAppsFailedSubmitted(); | ||
| String message = "Unable to insert the ApplicationId " + applicationId | ||
| + " into the FederationStateStore"; | ||
| String message = | ||
| String.format("Unable to insert the ApplicationId %s into the FederationStateStore.", | ||
| applicationId); | ||
| RouterAuditLogger.logFailure(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", | ||
| "RouterClientRMService", message, applicationId, subClusterId); | ||
| throw new YarnException(message, e); | ||
| RouterServerUtil.logAndThrowException(message, e); | ||
| } | ||
| } else { | ||
| try { | ||
| // update the mapping of applicationId and the home subClusterId to | ||
| // the new subClusterId we have selected | ||
| federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); | ||
| } catch (YarnException e) { | ||
| String message = "Unable to update the ApplicationId " + applicationId | ||
| + " into the FederationStateStore"; | ||
| String message = | ||
| String.format("Unable to update the ApplicationId %s into the FederationStateStore.", | ||
| applicationId); | ||
| SubClusterId subClusterIdInStateStore = | ||
| federationFacade.getApplicationHomeSubCluster(applicationId); | ||
| if (subClusterId == subClusterIdInStateStore) { | ||
| LOG.info("Application {} already submitted on SubCluster {}.", applicationId, | ||
| subClusterId); | ||
| LOG.info("Application {} already submitted on SubCluster {}.", | ||
| applicationId, subClusterId); | ||
| } else { | ||
| routerMetrics.incrAppsFailedSubmitted(); | ||
| RouterAuditLogger.logFailure(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", | ||
| "RouterClientRMService", message, applicationId, subClusterId); | ||
| throw new YarnException(message, e); | ||
| RouterServerUtil.logAndThrowException(message, e); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -489,9 +487,8 @@ public SubmitApplicationResponse submitApplication( | |
| } | ||
|
|
||
| routerMetrics.incrAppsFailedSubmitted(); | ||
| String errMsg = "Application " | ||
| + request.getApplicationSubmissionContext().getApplicationName() | ||
| + " with appId " + applicationId + " failed to be submitted."; | ||
| String errMsg = String.format("Application %s with appId %s failed to be submitted.", | ||
| request.getApplicationSubmissionContext().getApplicationName(), applicationId); | ||
| RouterAuditLogger.logFailure(user.getShortUserName(), | ||
| RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", | ||
| "RouterClientRMService", errMsg, applicationId); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isEmpty()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your suggestion is right, I will modify the code.