Skip to content

Commit fbbc76e

Browse files
ZanderXuzhanghaobo
authored andcommitted
HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout (apache#4597)
1 parent a42b69b commit fbbc76e

File tree

5 files changed

+66
-27
lines changed

5 files changed

+66
-27
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
33+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
34+
3235
/**
3336
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
3437
* Internally a map of nameservice to Semaphore is used to control permits.
@@ -42,15 +45,26 @@ public class AbstractRouterRpcFairnessPolicyController
4245
/** Hash table to hold semaphore for each configured name service. */
4346
private Map<String, Semaphore> permits;
4447

48+
private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
49+
4550
public void init(Configuration conf) {
4651
this.permits = new HashMap<>();
52+
long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT,
53+
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
54+
if (timeoutMs >= 0) {
55+
acquireTimeoutMs = timeoutMs;
56+
} else {
57+
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
58+
"Using default value of : {}ms instead.", timeoutMs,
59+
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT);
60+
}
4761
}
4862

4963
@Override
5064
public boolean acquirePermit(String nsId) {
5165
try {
5266
LOG.debug("Taking lock for nameservice {}", nsId);
53-
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
67+
return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);
5468
} catch (InterruptedException e) {
5569
LOG.debug("Cannot get a permit for nameservice {}", nsId);
5670
}
@@ -82,15 +96,13 @@ protected int getAvailablePermits(String nsId) {
8296
@Override
8397
public String getAvailableHandlerOnPerNs() {
8498
JSONObject json = new JSONObject();
85-
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
99+
permits.forEach((k, v) -> {
86100
try {
87-
String nsId = entry.getKey();
88-
int availableHandler = entry.getValue().availablePermits();
89-
json.put(nsId, availableHandler);
101+
json.put(k, v.availablePermits());
90102
} catch (JSONException e) {
91-
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
103+
LOG.warn("Cannot put {} into JSONObject", k, e);
92104
}
93-
}
105+
});
94106
return json.toString();
95107
}
96108
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,10 @@ public StaticRouterRpcFairnessPolicyController(Configuration conf) {
5050
init(conf);
5151
}
5252

53-
public void init(Configuration conf)
54-
throws IllegalArgumentException {
53+
public void init(Configuration conf) throws IllegalArgumentException {
5554
super.init(conf);
5655
// Total handlers configured to process all incoming Rpc.
57-
int handlerCount = conf.getInt(
58-
DFS_ROUTER_HANDLER_COUNT_KEY,
59-
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
56+
int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
6057

6158
LOG.info("Handlers available for fairness assignment {} ", handlerCount);
6259

@@ -71,8 +68,7 @@ public void init(Configuration conf)
7168
allConfiguredNS.add(CONCURRENT_NS);
7269
validateHandlersCount(conf, handlerCount, allConfiguredNS);
7370
for (String nsId : allConfiguredNS) {
74-
int dedicatedHandlers =
75-
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
71+
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
7672
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
7773
if (dedicatedHandlers > 0) {
7874
handlerCount -= dedicatedHandlers;
@@ -86,7 +82,7 @@ public void init(Configuration conf)
8682
// Assign remaining handlers equally to remaining name services and
8783
// general pool if applicable.
8884
if (!unassignedNS.isEmpty()) {
89-
LOG.info("Unassigned ns {}", unassignedNS.toString());
85+
LOG.info("Unassigned ns {}", unassignedNS);
9086
int handlersPerNS = handlerCount / unassignedNS.size();
9187
LOG.info("Handlers available per ns {}", handlersPerNS);
9288
for (String nsId : unassignedNS) {
@@ -101,24 +97,20 @@ public void init(Configuration conf)
10197
int existingPermits = getAvailablePermits(CONCURRENT_NS);
10298
if (leftOverHandlers > 0) {
10399
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
104-
insertNameServiceWithPermits(CONCURRENT_NS,
105-
existingPermits + leftOverHandlers);
100+
insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers);
106101
}
107-
LOG.info("Final permit allocation for concurrent ns: {}",
108-
getAvailablePermits(CONCURRENT_NS));
102+
LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS));
109103
}
110104

111105
private static void logAssignment(String nsId, int count) {
112-
LOG.info("Assigned {} handlers to nsId {} ",
113-
count, nsId);
106+
LOG.info("Assigned {} handlers to nsId {} ", count, nsId);
114107
}
115108

116-
private void validateHandlersCount(Configuration conf, int handlerCount,
117-
Set<String> allConfiguredNS) {
109+
private void validateHandlersCount(Configuration conf,
110+
int handlerCount, Set<String> allConfiguredNS) {
118111
int totalDedicatedHandlers = 0;
119112
for (String nsId : allConfiguredNS) {
120-
int dedicatedHandlers =
121-
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
113+
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
122114
if (dedicatedHandlers > 0) {
123115
// Total handlers should not be less than sum of dedicated handlers.
124116
totalDedicatedHandlers += dedicatedHandlers;
@@ -128,8 +120,7 @@ private void validateHandlersCount(Configuration conf, int handlerCount,
128120
}
129121
}
130122
if (totalDedicatedHandlers > handlerCount) {
131-
String msg = String.format(ERROR_MSG, handlerCount,
132-
totalDedicatedHandlers);
123+
String msg = String.format(ERROR_MSG, handlerCount, totalDedicatedHandlers);
133124
LOG.error(msg);
134125
throw new IllegalArgumentException(msg);
135126
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,4 +337,8 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
337337
NoRouterRpcFairnessPolicyController.class;
338338
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
339339
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
340+
public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT =
341+
FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout";
342+
public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT =
343+
TimeUnit.SECONDS.toMillis(1);
340344
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,4 +670,12 @@
670670
concurrent calls.
671671
</description>
672672
</property>
673+
674+
<property>
675+
<name>dfs.federation.router.fairness.acquire.timeout</name>
676+
<value>1s</value>
677+
<description>
678+
The maximum time to wait for a permit.
679+
</description>
680+
</property>
673681
</configuration>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
2424
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
2525
import org.apache.hadoop.test.GenericTestUtils;
26+
import org.apache.hadoop.util.Time;
2627
import org.junit.Test;
2728
import org.slf4j.LoggerFactory;
2829

30+
import java.util.concurrent.TimeUnit;
31+
2932
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
33+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
3034
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
3135
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
3236
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@@ -83,6 +87,26 @@ public void testHandlerAllocationPreconfigured() {
8387
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
8488
}
8589

90+
@Test
91+
public void testAcquireTimeout() {
92+
Configuration conf = createConf(40);
93+
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
94+
conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
95+
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
96+
FederationUtil.newFairnessPolicyController(conf);
97+
98+
// ns1 should have 30 permits allocated
99+
for (int i = 0; i < 30; i++) {
100+
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
101+
}
102+
long acquireBeginTimeMs = Time.monotonicNow();
103+
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
104+
long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;
105+
106+
// There are some other operations, so acquireTimeMs >= 100ms.
107+
assertTrue(acquireTimeMs >= 100);
108+
}
109+
86110
@Test
87111
public void testAllocationErrorWithZeroHandlers() {
88112
Configuration conf = createConf(0);

0 commit comments

Comments
 (0)