Skip to content

Commit ba77530

Browse files
authored
YARN-11357. Fix FederationClientInterceptor#submitApplication Can't Update SubClusterId (#5055)
1 parent 562b693 commit ba77530

File tree

4 files changed

+200
-8
lines changed

4 files changed

+200
-8
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public GetNewApplicationResponse getNewApplication(
314314
// Try calling the getNewApplication method
315315
List<SubClusterId> blacklist = new ArrayList<>();
316316
int activeSubClustersCount = getActiveSubClustersCount();
317-
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
317+
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
318318

319319
try {
320320
GetNewApplicationResponse response =
@@ -470,7 +470,7 @@ public SubmitApplicationResponse submitApplication(
470470
// but if the number of Active SubClusters is less than this number at this time,
471471
// we should provide a high number of retry according to the number of Active SubClusters.
472472
int activeSubClustersCount = getActiveSubClustersCount();
473-
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
473+
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
474474

475475
// Try calling the SubmitApplication method
476476
SubmitApplicationResponse response =
@@ -484,7 +484,7 @@ public SubmitApplicationResponse submitApplication(
484484
return response;
485485
}
486486

487-
} catch (Exception e){
487+
} catch (Exception e) {
488488
routerMetrics.incrAppsFailedSubmitted();
489489
RouterServerUtil.logAndThrowException(e.getMessage(), e);
490490
}
@@ -543,7 +543,7 @@ private SubmitApplicationResponse invokeSubmitApplication(
543543
ApplicationHomeSubCluster appHomeSubCluster =
544544
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
545545

546-
if (exists || retryCount == 0) {
546+
if (!exists || retryCount == 0) {
547547
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
548548
} else {
549549
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
@@ -563,8 +563,8 @@ private SubmitApplicationResponse invokeSubmitApplication(
563563
} catch (Exception e) {
564564
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
565565
TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId);
566-
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.",
567-
applicationId, subClusterId, e);
566+
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.",
567+
applicationId, retryCount, subClusterId, e);
568568
if (subClusterId != null) {
569569
blackList.add(subClusterId);
570570
}
@@ -1948,4 +1948,9 @@ private void updateReservationHomeSubCluster(SubClusterId subClusterId,
19481948
}
19491949
}
19501950
}
1951+
1952+
@VisibleForTesting
1953+
public void setNumSubmitRetries(int numSubmitRetries) {
1954+
this.numSubmitRetries = numSubmitRetries;
1955+
}
19511956
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
package org.apache.hadoop.yarn.server.router.clientrm;
2020

21+
import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER;
22+
import static org.hamcrest.CoreMatchers.is;
2123
import static org.mockito.Mockito.mock;
2224

2325
import java.io.IOException;
2426
import java.util.ArrayList;
2527
import java.util.Arrays;
28+
import java.util.Collection;
2629
import java.util.List;
2730

2831
import org.apache.hadoop.test.LambdaTestUtils;
@@ -48,7 +51,11 @@
4851
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
4952
import org.apache.hadoop.yarn.util.resource.Resources;
5053
import org.junit.Assert;
54+
import org.junit.Assume;
5155
import org.junit.Test;
56+
import org.junit.runner.RunWith;
57+
import org.junit.runners.Parameterized;
58+
import org.junit.runners.Parameterized.Parameters;
5259
import org.slf4j.Logger;
5360
import org.slf4j.LoggerFactory;
5461

@@ -64,14 +71,22 @@
6471
* It tests the case with SubClusters down and the Router logic of retries. We
6572
* have 1 good SubCluster and 2 bad ones for all the tests.
6673
*/
74+
@RunWith(Parameterized.class)
6775
public class TestFederationClientInterceptorRetry
6876
extends BaseRouterClientRMTest {
6977
private static final Logger LOG =
7078
LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
7179

80+
@Parameters
81+
public static Collection<String[]> getParameters() {
82+
return Arrays.asList(new String[][] {{UniformBroadcastPolicyManager.class.getName()},
83+
{TestSequentialBroadcastPolicyManager.class.getName()}});
84+
}
85+
7286
private TestableFederationClientInterceptor interceptor;
7387
private MemoryFederationStateStore stateStore;
7488
private FederationStateStoreTestUtil stateStoreUtil;
89+
private String routerPolicyManagerName;
7590

7691
private String user = "test-user";
7792

@@ -84,6 +99,10 @@ public class TestFederationClientInterceptorRetry
8499

85100
private static List<SubClusterId> scs = new ArrayList<>();
86101

102+
public TestFederationClientInterceptorRetry(String policyManagerName) {
103+
this.routerPolicyManagerName = policyManagerName;
104+
}
105+
87106
@Override
88107
public void setUp() throws IOException {
89108
super.setUpConfig();
@@ -150,8 +169,7 @@ protected YarnConfiguration createConfiguration() {
150169
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
151170
+ "," + TestableFederationClientInterceptor.class.getName());
152171

153-
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
154-
UniformBroadcastPolicyManager.class.getName());
172+
conf.set(FEDERATION_POLICY_MANAGER, this.routerPolicyManagerName);
155173

156174
// Disable StateStoreFacade cache
157175
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
@@ -283,4 +301,56 @@ public void testSubmitApplicationOneBadOneGood()
283301
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
284302
Assert.assertEquals(good, respSubClusterId);
285303
}
304+
305+
@Test
306+
public void testSubmitApplicationTwoBadOneGood() throws Exception {
307+
308+
LOG.info("Test submitApplication with two bad, one good SC.");
309+
310+
// This test must require the TestSequentialRouterPolicy policy
311+
Assume.assumeThat(routerPolicyManagerName,
312+
is(TestSequentialBroadcastPolicyManager.class.getName()));
313+
314+
setupCluster(Arrays.asList(bad1, bad2, good));
315+
final ApplicationId appId =
316+
ApplicationId.newInstance(System.currentTimeMillis(), 1);
317+
318+
// Use the TestSequentialRouterPolicy strategy,
319+
// which will sort the SubClusterId because good=0, bad1=1, bad2=2
320+
// We will get 2, 1, 0 [bad2, bad1, good]
321+
// Set the retryNum to 1
322+
// 1st time will use bad2, 2nd time will use bad1
323+
// bad1 is updated to stateStore
324+
interceptor.setNumSubmitRetries(1);
325+
final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
326+
LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
327+
() -> interceptor.submitApplication(request));
328+
329+
// We will get bad1
330+
checkSubmitSubCluster(appId, bad1);
331+
332+
// Set the retryNum to 2
333+
// 1st time will use bad2, 2nd time will use bad1, 3rd good
334+
interceptor.setNumSubmitRetries(2);
335+
SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request);
336+
Assert.assertNotNull(submitAppResponse);
337+
338+
// We will get good
339+
checkSubmitSubCluster(appId, good);
340+
}
341+
342+
private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCluster)
343+
throws YarnException {
344+
GetApplicationHomeSubClusterRequest getAppRequest =
345+
GetApplicationHomeSubClusterRequest.newInstance(appId);
346+
GetApplicationHomeSubClusterResponse getAppResponse =
347+
stateStore.getApplicationHomeSubCluster(getAppRequest);
348+
Assert.assertNotNull(getAppResponse);
349+
Assert.assertNotNull(getAppResponse);
350+
ApplicationHomeSubCluster responseHomeSubCluster =
351+
getAppResponse.getApplicationHomeSubCluster();
352+
Assert.assertNotNull(responseHomeSubCluster);
353+
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
354+
Assert.assertEquals(expectSubCluster, respSubClusterId);
355+
}
286356
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.router.clientrm;
20+
21+
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
22+
import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager;
23+
24+
/**
25+
* This PolicyManager is used for testing and will contain the
26+
* {@link TestSequentialRouterPolicy} policy.
27+
*
28+
* When we test FederationClientInterceptor Retry,
29+
* we hope that SubCluster can return in a certain order, not randomly.
30+
* We can view the policy description by linking to TestSequentialRouterPolicy.
31+
*/
32+
public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager {
33+
public TestSequentialBroadcastPolicyManager() {
34+
// this structurally hard-codes two compatible policies for Router and
35+
// AMRMProxy.
36+
routerFederationPolicy = TestSequentialRouterPolicy.class;
37+
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.router.clientrm;
20+
21+
import org.apache.commons.collections.CollectionUtils;
22+
import org.apache.hadoop.yarn.exceptions.YarnException;
23+
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
24+
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
25+
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
26+
import org.apache.hadoop.yarn.server.federation.policies.router.AbstractRouterPolicy;
27+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
28+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
29+
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/**
35+
* This is a test strategy,
36+
* the purpose of this strategy is to return subClusters in descending order of subClusterId.
37+
*
38+
* This strategy is to verify the situation of Retry during the use of FederationClientInterceptor.
39+
* The conditions of use are as follows:
40+
* 1.We require subClusterId to be an integer.
41+
* 2.The larger the subCluster, the sooner the representative is selected.
42+
*
43+
* We have 4 subClusters, 2 normal subClusters, 2 bad subClusters.
44+
* We expect to select badSubClusters first and then goodSubClusters during testing.
45+
* We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3].
46+
* This strategy will return [3, 2, 1, 0],
47+
* The selection order of subCluster is bad2, bad1, good2, good1.
48+
*/
49+
public class TestSequentialRouterPolicy extends AbstractRouterPolicy {
50+
51+
@Override
52+
public void reinitialize(FederationPolicyInitializationContext policyContext)
53+
throws FederationPolicyInitializationException {
54+
FederationPolicyInitializationContextValidator.validate(policyContext,
55+
this.getClass().getCanonicalName());
56+
setPolicyContext(policyContext);
57+
}
58+
59+
@Override
60+
protected SubClusterId chooseSubCluster(String queue,
61+
Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException {
62+
/**
63+
* This strategy is only suitable for testing. We need to obtain subClusters sequentially.
64+
* We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters.
65+
* The sc-id of goodSubCluster is 0, and the sc-id of badSubCluster is 1 and 2.
66+
* We hope Return in reverse order, that is, return 2, 1, 0
67+
* Return to badCluster first.
68+
*/
69+
List<SubClusterId> subClusterIds = new ArrayList<>(preSelectSubClusters.keySet());
70+
if (subClusterIds.size() > 1) {
71+
subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId()));
72+
}
73+
if(CollectionUtils.isNotEmpty(subClusterIds)){
74+
return subClusterIds.get(0);
75+
}
76+
return null;
77+
}
78+
}

0 commit comments

Comments
 (0)