Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public GetNewApplicationResponse getNewApplication(
// Try calling the getNewApplication method
List<SubClusterId> blacklist = new ArrayList<>();
int activeSubClustersCount = getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);

try {
GetNewApplicationResponse response =
Expand Down Expand Up @@ -470,7 +470,7 @@ public SubmitApplicationResponse submitApplication(
// but if the number of Active SubClusters is less than this number at this time,
// we should provide a high number of retry according to the number of Active SubClusters.
int activeSubClustersCount = getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);

// Try calling the SubmitApplication method
SubmitApplicationResponse response =
Expand All @@ -484,7 +484,7 @@ public SubmitApplicationResponse submitApplication(
return response;
}

} catch (Exception e){
} catch (Exception e) {
routerMetrics.incrAppsFailedSubmitted();
RouterServerUtil.logAndThrowException(e.getMessage(), e);
}
Expand Down Expand Up @@ -543,7 +543,7 @@ private SubmitApplicationResponse invokeSubmitApplication(
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);

if (exists || retryCount == 0) {
if (!exists || retryCount == 0) {
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
} else {
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
Expand All @@ -563,8 +563,8 @@ private SubmitApplicationResponse invokeSubmitApplication(
} catch (Exception e) {
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId);
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.",
applicationId, subClusterId, e);
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.",
applicationId, retryCount, subClusterId, e);
if (subClusterId != null) {
blackList.add(subClusterId);
}
Expand Down Expand Up @@ -1948,4 +1948,9 @@ private void updateReservationHomeSubCluster(SubClusterId subClusterId,
}
}
}

@VisibleForTesting
public void setNumSubmitRetries(int numSubmitRetries) {
this.numSubmitRetries = numSubmitRetries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.hadoop.yarn.server.router.clientrm;

import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.test.LambdaTestUtils;
Expand All @@ -48,7 +51,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -64,14 +71,22 @@
* It tests the case with SubClusters down and the Router logic of retries. We
* have 1 good SubCluster and 2 bad ones for all the tests.
*/
@RunWith(Parameterized.class)
public class TestFederationClientInterceptorRetry
extends BaseRouterClientRMTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);

@Parameters
public static Collection<String[]> getParameters() {
return Arrays.asList(new String[][] {{UniformBroadcastPolicyManager.class.getName()},
{TestSequentialBroadcastPolicyManager.class.getName()}});
}

private TestableFederationClientInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private String routerPolicyManagerName;

private String user = "test-user";

Expand All @@ -84,6 +99,10 @@ public class TestFederationClientInterceptorRetry

private static List<SubClusterId> scs = new ArrayList<>();

public TestFederationClientInterceptorRetry(String policyManagerName) {
this.routerPolicyManagerName = policyManagerName;
}

@Override
public void setUp() throws IOException {
super.setUpConfig();
Expand Down Expand Up @@ -150,8 +169,7 @@ protected YarnConfiguration createConfiguration() {
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationClientInterceptor.class.getName());

conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
conf.set(FEDERATION_POLICY_MANAGER, this.routerPolicyManagerName);

// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
Expand Down Expand Up @@ -283,4 +301,56 @@ public void testSubmitApplicationOneBadOneGood()
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
Assert.assertEquals(good, respSubClusterId);
}

@Test
public void testSubmitApplicationTwoBadOneGood() throws Exception {

LOG.info("Test submitApplication with two bad, one good SC.");

// This test must require the TestSequentialRouterPolicy policy
Assume.assumeThat(routerPolicyManagerName,
is(TestSequentialBroadcastPolicyManager.class.getName()));

setupCluster(Arrays.asList(bad1, bad2, good));
final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);

// Use the TestSequentialRouterPolicy strategy,
// which will sort the SubClusterId because good=0, bad1=1, bad2=2
// We will get 2, 1, 0 [bad2, bad1, good]
// Set the retryNum to 1
// 1st time will use bad2, 2nd time will use bad1
// bad1 is updated to stateStore
interceptor.setNumSubmitRetries(1);
final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
() -> interceptor.submitApplication(request));

// We will get bad1
checkSubmitSubCluster(appId, bad1);

// Set the retryNum to 2
// 1st time will use bad2, 2nd time will use bad1, 3rd good
interceptor.setNumSubmitRetries(2);
SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request);
Assert.assertNotNull(submitAppResponse);

// We will get good
checkSubmitSubCluster(appId, good);
}

private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCluster)
throws YarnException {
GetApplicationHomeSubClusterRequest getAppRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse getAppResponse =
stateStore.getApplicationHomeSubCluster(getAppRequest);
Assert.assertNotNull(getAppResponse);
Assert.assertNotNull(getAppResponse);
ApplicationHomeSubCluster responseHomeSubCluster =
getAppResponse.getApplicationHomeSubCluster();
Assert.assertNotNull(responseHomeSubCluster);
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
Assert.assertEquals(expectSubCluster, respSubClusterId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.router.clientrm;

import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager;

/**
* This PolicyManager is used for testing and will contain the
* {@link TestSequentialRouterPolicy} policy.
*
* When we test FederationClientInterceptor Retry,
* we hope that SubCluster can return in a certain order, not randomly.
* We can view the policy description by linking to TestSequentialRouterPolicy.
*/
public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager {
public TestSequentialBroadcastPolicyManager() {
// this structurally hard-codes two compatible policies for Router and
// AMRMProxy.
routerFederationPolicy = TestSequentialRouterPolicy.class;
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.router.clientrm;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.AbstractRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* This is a test strategy,
* the purpose of this strategy is to return subClusters in descending order of subClusterId.
*
* This strategy is to verify the situation of Retry during the use of FederationClientInterceptor.
* The conditions of use are as follows:
* 1.We require subClusterId to be an integer.
* 2.The larger the subCluster, the sooner the representative is selected.
*
* We have 4 subClusters, 2 normal subClusters, 2 bad subClusters.
* We expect to select badSubClusters first and then goodSubClusters during testing.
* We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3].
* This strategy will return [3, 2, 1, 0],
* The selection order of subCluster is bad2, bad1, good2, good1.
*/
public class TestSequentialRouterPolicy extends AbstractRouterPolicy {

@Override
public void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator.validate(policyContext,
this.getClass().getCanonicalName());
setPolicyContext(policyContext);
}

@Override
protected SubClusterId chooseSubCluster(String queue,
Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException {
/**
* This strategy is only suitable for testing. We need to obtain subClusters sequentially.
* We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters.
* The sc-id of goodSubCluster is 0, and the sc-id of badSubCluster is 1 and 2.
* We hope Return in reverse order, that is, return 2, 1, 0
* Return to badCluster first.
*/
List<SubClusterId> subClusterIds = new ArrayList<>(preSelectSubClusters.keySet());
if (subClusterIds.size() > 1) {
subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId()));
}
if(CollectionUtils.isNotEmpty(subClusterIds)){
return subClusterIds.get(0);
}
return null;
}
}