Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4117,6 +4117,14 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;

/**
* GetNewApplication and SubmitApplication request retry interval time.
*/
public static final String ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME =
ROUTER_PREFIX + "submit.interval.time";
public static final long DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME =
TimeUnit.MILLISECONDS.toMillis(10);

/**
* The interceptor class used in FederationClientInterceptor should return
* partial ApplicationReports.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5047,4 +5047,13 @@
</description>
</property>

<property>
<name>yarn.router.submit.interval.time</name>
<value>10ms</value>
<description>
The interval Time between calling different subCluster requests.
Default is 10ms.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ public interface FederationActionRetry<T> {

Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class);

T run() throws Exception;
T run(int retry) throws Exception;

default T runWithRetries(int retryCount, long retrySleepTime) throws Exception {
int retry = 0;
while (true) {
try {
return run();
return run(retry);
} catch (Exception e) {
LOG.info("Exception while executing an Federation operation.", e);
if (++retry > retryCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,11 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean
throws Exception {

// Generate a request to delete data
DeleteApplicationHomeSubClusterRequest request =
DeleteApplicationHomeSubClusterRequest req =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);

// CleanUp Finish App.
return ((FederationActionRetry<Boolean>) () -> invokeCleanUpFinishApp(appId, isQuery, request))
return ((FederationActionRetry<Boolean>) (retry) -> invokeCleanUpFinishApp(appId, isQuery, req))
.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@ public static void logFailure(String user, String operation, String perm,
}
}

/**
* Create a readable and parsable audit log string for a failed event.
*
* @param user User who made the service request.
* @param operation Operation requested by the user.
* @param perm Target permissions.
* @param target The target on which the operation is being performed.
* @param description Some additional information as to why the operation failed.
* @param subClusterId SubCluster Id in which operation was performed.
*/
public static void logFailure(String user, String operation, String perm,
String target, String description, SubClusterId subClusterId) {
if (LOG.isInfoEnabled()) {
LOG.info(createFailureLog(user, operation, perm, target, description, null,
subClusterId));
}
}

/**
* A helper api for creating an audit log for a failure event.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.yarn.server.router;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand All @@ -27,6 +29,9 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +40,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.IOException;

/**
Expand All @@ -53,6 +60,8 @@ public final class RouterServerUtil {

private static final String EPOCH_PREFIX = "e";

private static Random rand = new Random(System.currentTimeMillis());

/** Disable constructor. */
private RouterServerUtil() {
}
Expand Down Expand Up @@ -446,4 +455,40 @@ public static void validateContainerId(String containerId)
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
}

/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
*
* @param activeSubClusters List of active subClusters.
* @param blackList blacklist.
* @return Active SubClusterId.
* @throws YarnException When there is no Active SubCluster,
* an exception will be thrown (No active SubCluster available to submit the request.)
*/
public static SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
throws YarnException {

// Check if activeSubClusters is empty, if it is empty, we need to throw an exception
if (MapUtils.isEmpty(activeSubClusters)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}

// Change activeSubClusters to List
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());

// If the blacklist is not empty, we need to remove all the subClusters in the blacklist
if (CollectionUtils.isNotEmpty(blackList)) {
subClusterIds.removeAll(blackList);
}

// Check there are still active subcluster after removing the blacklist
if (CollectionUtils.isEmpty(subClusterIds)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}

// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}
}
Loading