Skip to content

Commit 3f767a6

Browse files
authored
YARN-8900. [Follow Up] Fix FederationInterceptorREST#invokeConcurrent Inaccurate Order of Subclusters. (#5260)
1 parent 72b7601 commit 3f767a6

File tree

3 files changed

+118
-14
lines changed

3 files changed

+118
-14
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
import org.apache.commons.lang3.NotImplementedException;
4747
import org.apache.commons.lang3.StringUtils;
48-
import org.apache.commons.lang3.tuple.Pair;
4948
import org.apache.hadoop.conf.Configuration;
5049
import org.apache.hadoop.io.Text;
5150
import org.apache.hadoop.security.UserGroupInformation;
@@ -123,6 +122,7 @@
123122
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
124123
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
125124
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
125+
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
126126
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
127127
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
128128
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
@@ -2532,7 +2532,7 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
25322532
// If there is a sub-cluster access error,
25332533
// we should choose whether to throw exception information according to user configuration.
25342534
// Send the requests in parallel.
2535-
CompletionService<Pair<R, Exception>> compSvc = new ExecutorCompletionService<>(threadpool);
2535+
CompletionService<SubClusterResult<R>> compSvc = new ExecutorCompletionService<>(threadpool);
25362536

25372537
// This part of the code should be able to expose the accessed Exception information.
25382538
// We use Pair to store related information. The left value of the Pair is the response,
@@ -2548,36 +2548,41 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
25482548
getMethod(request.getMethodName(), request.getTypes());
25492549
Object retObj = method.invoke(interceptor, request.getParams());
25502550
R ret = clazz.cast(retObj);
2551-
return Pair.of(ret, null);
2551+
return new SubClusterResult<>(info, ret, null);
25522552
} catch (Exception e) {
25532553
LOG.error("SubCluster {} failed to call {} method.",
25542554
info.getSubClusterId(), request.getMethodName(), e);
2555-
return Pair.of(null, e);
2555+
return new SubClusterResult<>(info, null, e);
25562556
}
25572557
});
25582558
}
25592559

2560-
clusterIds.stream().forEach(clusterId -> {
2560+
for (int i = 0; i < clusterIds.size(); i++) {
2561+
SubClusterInfo subClusterInfo = null;
25612562
try {
2562-
Future<Pair<R, Exception>> future = compSvc.take();
2563-
Pair<R, Exception> pair = future.get();
2564-
R response = pair.getKey();
2563+
Future<SubClusterResult<R>> future = compSvc.take();
2564+
SubClusterResult<R> result = future.get();
2565+
subClusterInfo = result.getSubClusterInfo();
2566+
2567+
R response = result.getResponse();
25652568
if (response != null) {
2566-
results.put(clusterId, response);
2569+
results.put(subClusterInfo, response);
25672570
}
2568-
Exception exception = pair.getValue();
2571+
2572+
Exception exception = result.getException();
2573+
25692574
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
25702575
// an exception will be thrown directly.
25712576
if (!allowPartialResult && exception != null) {
25722577
throw exception;
25732578
}
25742579
} catch (Throwable e) {
2575-
String msg = String.format("SubCluster %s failed to %s report.",
2576-
clusterId.getSubClusterId(), request.getMethodName());
2577-
LOG.error(msg, e);
2580+
String subClusterId = subClusterInfo != null ?
2581+
subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
2582+
LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
25782583
throw new YarnRuntimeException(e.getCause().getMessage(), e);
25792584
}
2580-
});
2585+
}
25812586

25822587
return results;
25832588
}
@@ -2648,4 +2653,16 @@ public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
26482653
public void setAllowPartialResult(boolean allowPartialResult) {
26492654
this.allowPartialResult = allowPartialResult;
26502655
}
2656+
2657+
@VisibleForTesting
2658+
public Map<SubClusterInfo, NodesInfo> invokeConcurrentGetNodeLabel()
2659+
throws IOException, YarnException {
2660+
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
2661+
Class[] argsClasses = new Class[]{String.class};
2662+
Object[] args = new Object[]{null};
2663+
ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args);
2664+
Map<SubClusterInfo, NodesInfo> nodesMap =
2665+
invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class);
2666+
return nodesMap;
2667+
}
26512668
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.router.webapp.dao;
19+
20+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
21+
22+
public class SubClusterResult<R> {
23+
private SubClusterInfo subClusterInfo;
24+
private R response;
25+
private Exception exception;
26+
27+
public SubClusterResult() {
28+
}
29+
30+
public SubClusterResult(SubClusterInfo subCluster, R res, Exception ex) {
31+
this.subClusterInfo = subCluster;
32+
this.response = res;
33+
this.exception = ex;
34+
}
35+
36+
public SubClusterInfo getSubClusterInfo() {
37+
return subClusterInfo;
38+
}
39+
40+
public void setSubClusterInfo(SubClusterInfo subClusterInfo) {
41+
this.subClusterInfo = subClusterInfo;
42+
}
43+
44+
public Exception getException() {
45+
return exception;
46+
}
47+
48+
public void setException(Exception exception) {
49+
this.exception = exception;
50+
}
51+
52+
public R getResponse() {
53+
return response;
54+
}
55+
56+
public void setResponse(R response) {
57+
this.response = response;
58+
}
59+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,34 @@ public void testCheckFederationInterceptorRESTClient() {
15341534
Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
15351535
}
15361536

1537+
@Test
1538+
public void testInvokeConcurrent() throws IOException, YarnException {
1539+
1540+
// We design such a test case, we call the interceptor's getNodes interface,
1541+
// this interface will generate the following test data
1542+
// subCluster0 Node 0
1543+
// subCluster1 Node 1
1544+
// subCluster2 Node 2
1545+
// subCluster3 Node 3
1546+
// We use the returned data to verify whether the subClusterId
1547+
// of the multi-thread call can match the node data
1548+
Map<SubClusterInfo, NodesInfo> subClusterInfoNodesInfoMap =
1549+
interceptor.invokeConcurrentGetNodeLabel();
1550+
Assert.assertNotNull(subClusterInfoNodesInfoMap);
1551+
Assert.assertEquals(4, subClusterInfoNodesInfoMap.size());
1552+
1553+
subClusterInfoNodesInfoMap.forEach((subClusterInfo, nodesInfo) -> {
1554+
String subClusterId = subClusterInfo.getSubClusterId().getId();
1555+
List<NodeInfo> nodeInfos = nodesInfo.getNodes();
1556+
Assert.assertNotNull(nodeInfos);
1557+
Assert.assertEquals(1, nodeInfos.size());
1558+
1559+
String expectNodeId = "Node " + subClusterId;
1560+
String nodeId = nodeInfos.get(0).getNodeId();
1561+
Assert.assertEquals(expectNodeId, nodeId);
1562+
});
1563+
}
1564+
15371565
@Test
15381566
public void testGetSchedulerInfo() {
15391567
// In this test case, we will get the return results of 4 sub-clusters.

0 commit comments

Comments
 (0)