Skip to content

Commit d2b8d6b

Browse files
authored
HDFS-16302. RBF: RouterRpcFairnessPolicyController record requests accepted by each nameservice (#3621)
1 parent e5cee76 commit d2b8d6b

File tree

4 files changed

+110
-49
lines changed

4 files changed

+110
-49
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,10 @@ public interface FederationRPCMBean {
132132
* @return Number of operations rejected due to lack of permits of each namespace.
133133
*/
134134
String getProxyOpPermitRejectedPerNs();
135+
136+
/**
137+
* Get the number of operations accepted of each namespace.
138+
* @return Number of operations accepted of each namespace.
139+
*/
140+
String getProxyOpPermitAcceptedPerNs();
135141
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,4 +297,9 @@ public long getProxyOpPermitRejected() {
297297
public String getProxyOpPermitRejectedPerNs() {
298298
return rpcServer.getRPCClient().getRejectedPermitsPerNsJSON();
299299
}
300+
301+
@Override
302+
public String getProxyOpPermitAcceptedPerNs() {
303+
return rpcServer.getRPCClient().getAcceptedPermitsPerNsJSON();
304+
}
300305
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public class RouterRpcClient {
136136
/** Fairness manager to control handlers assigned per NS. */
137137
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
138138
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
139+
private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();
139140

140141
/**
141142
* Create a router RPC client to manage remote procedure calls to NNs.
@@ -330,6 +331,14 @@ public String getRejectedPermitsPerNsJSON() {
330331
return JSON.toString(rejectedPermitsPerNs);
331332
}
332333

334+
/**
335+
* JSON representation of the accepted permits for each nameservice.
336+
*
337+
* @return String representation of the accepted permits for each nameservice.
338+
*/
339+
public String getAcceptedPermitsPerNsJSON() {
340+
return JSON.toString(acceptedPermitsPerNs);
341+
}
333342
/**
334343
* Get ClientProtocol proxy client for a NameNode. Each combination of user +
335344
* NN must use a unique proxy client. Previously created clients are cached
@@ -1548,20 +1557,22 @@ private String getNameserviceForBlockPoolId(final String bpId)
15481557
private void acquirePermit(
15491558
final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
15501559
throws IOException {
1551-
if (routerRpcFairnessPolicyController != null
1552-
&& !routerRpcFairnessPolicyController.acquirePermit(nsId)) {
1553-
// Throw StandByException,
1554-
// Clients could fail over and try another router.
1555-
if (rpcMonitor != null) {
1556-
rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
1560+
if (routerRpcFairnessPolicyController != null) {
1561+
if (!routerRpcFairnessPolicyController.acquirePermit(nsId)) {
1562+
// Throw StandByException,
1563+
// Clients could fail over and try another router.
1564+
if (rpcMonitor != null) {
1565+
rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
1566+
}
1567+
incrRejectedPermitForNs(nsId);
1568+
LOG.debug("Permit denied for ugi: {} for method: {}",
1569+
ugi, m.getMethodName());
1570+
String msg =
1571+
"Router " + router.getRouterId() +
1572+
" is overloaded for NS: " + nsId;
1573+
throw new StandbyException(msg);
15571574
}
1558-
incrRejectedPermitForNs(nsId);
1559-
LOG.debug("Permit denied for ugi: {} for method: {}",
1560-
ugi, m.getMethodName());
1561-
String msg =
1562-
"Router " + router.getRouterId() +
1563-
" is overloaded for NS: " + nsId;
1564-
throw new StandbyException(msg);
1575+
incrAcceptedPermitForNs(nsId);
15651576
}
15661577
}
15671578

@@ -1596,4 +1607,13 @@ public Long getRejectedPermitForNs(String ns) {
15961607
return rejectedPermitsPerNs.containsKey(ns) ?
15971608
rejectedPermitsPerNs.get(ns).longValue() : 0L;
15981609
}
1610+
1611+
private void incrAcceptedPermitForNs(String ns) {
1612+
acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
1613+
}
1614+
1615+
public Long getAcceptedPermitForNs(String ns) {
1616+
return acceptedPermitsPerNs.containsKey(ns) ?
1617+
acceptedPermitsPerNs.get(ns).longValue() : 0L;
1618+
}
15991619
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@
4141
import org.slf4j.LoggerFactory;
4242

4343
/**
44-
* Test the Router handlers fairness control rejects
45-
* requests when the handlers are overloaded.
44+
* Test the Router handlers fairness control rejects and accepts requests.
4645
*/
4746
public class TestRouterHandlersFairness {
4847

@@ -126,6 +125,12 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
126125
throws Exception {
127126

128127
RouterContext routerContext = cluster.getRandomRouter();
128+
URI address = routerContext.getFileSystemURI();
129+
Configuration conf = new HdfsConfiguration();
130+
final int numOps = 10;
131+
AtomicInteger overloadException = new AtomicInteger();
132+
133+
// Test when handlers are overloaded
129134
if (fairness) {
130135
if (isConcurrent) {
131136
LOG.info("Taking fanout lock first");
@@ -142,42 +147,11 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
142147
}
143148
}
144149
}
145-
URI address = routerContext.getFileSystemURI();
146-
Configuration conf = new HdfsConfiguration();
147-
final int numOps = 10;
148-
final AtomicInteger overloadException = new AtomicInteger();
149150
int originalRejectedPermits = getTotalRejectedPermits(routerContext);
150151

151-
for (int i = 0; i < numOps; i++) {
152-
DFSClient routerClient = null;
153-
try {
154-
routerClient = new DFSClient(address, conf);
155-
String clientName = routerClient.getClientName();
156-
ClientProtocol routerProto = routerClient.getNamenode();
157-
if (isConcurrent) {
158-
invokeConcurrent(routerProto, clientName);
159-
} else {
160-
invokeSequential(routerProto);
161-
}
162-
} catch (RemoteException re) {
163-
IOException ioe = re.unwrapRemoteException();
164-
assertTrue("Wrong exception: " + ioe,
165-
ioe instanceof StandbyException);
166-
assertExceptionContains("is overloaded for NS", ioe);
167-
overloadException.incrementAndGet();
168-
} catch (Throwable e) {
169-
throw e;
170-
} finally {
171-
if (routerClient != null) {
172-
try {
173-
routerClient.close();
174-
} catch (IOException e) {
175-
LOG.error("Cannot close the client");
176-
}
177-
}
178-
}
179-
overloadException.get();
180-
}
152+
// |- All calls should fail since permits not released
153+
innerCalls(address, numOps, isConcurrent, conf, overloadException);
154+
181155
int latestRejectedPermits = getTotalRejectedPermits(routerContext);
182156
assertEquals(latestRejectedPermits - originalRejectedPermits,
183157
overloadException.get());
@@ -201,6 +175,17 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
201175
assertEquals("Number of failed RPCs without fairness configured",
202176
0, overloadException.get());
203177
}
178+
179+
// Test when handlers are not overloaded
180+
int originalAcceptedPermits = getTotalAcceptedPermits(routerContext);
181+
overloadException = new AtomicInteger();
182+
183+
// |- All calls should succeed since permits not acquired
184+
innerCalls(address, numOps, isConcurrent, conf, overloadException);
185+
186+
int latestAcceptedPermits = getTotalAcceptedPermits(routerContext);
187+
assertEquals(latestAcceptedPermits - originalAcceptedPermits, numOps);
188+
assertEquals(overloadException.get(), 0);
204189
}
205190

206191
private void invokeSequential(ClientProtocol routerProto) throws IOException {
@@ -222,4 +207,49 @@ private int getTotalRejectedPermits(RouterContext routerContext) {
222207
.getRejectedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
223208
return totalRejectedPermits;
224209
}
210+
211+
private int getTotalAcceptedPermits(RouterContext routerContext) {
212+
int totalAcceptedPermits = 0;
213+
for (String ns : cluster.getNameservices()) {
214+
totalAcceptedPermits += routerContext.getRouterRpcClient()
215+
.getAcceptedPermitForNs(ns);
216+
}
217+
totalAcceptedPermits += routerContext.getRouterRpcClient()
218+
.getAcceptedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
219+
return totalAcceptedPermits;
220+
}
221+
222+
private void innerCalls(URI address, int numOps, boolean isConcurrent,
223+
Configuration conf, AtomicInteger overloadException) throws IOException {
224+
for (int i = 0; i < numOps; i++) {
225+
DFSClient routerClient = null;
226+
try {
227+
routerClient = new DFSClient(address, conf);
228+
String clientName = routerClient.getClientName();
229+
ClientProtocol routerProto = routerClient.getNamenode();
230+
if (isConcurrent) {
231+
invokeConcurrent(routerProto, clientName);
232+
} else {
233+
invokeSequential(routerProto);
234+
}
235+
} catch (RemoteException re) {
236+
IOException ioe = re.unwrapRemoteException();
237+
assertTrue("Wrong exception: " + ioe,
238+
ioe instanceof StandbyException);
239+
assertExceptionContains("is overloaded for NS", ioe);
240+
overloadException.incrementAndGet();
241+
} catch (Throwable e) {
242+
throw e;
243+
} finally {
244+
if (routerClient != null) {
245+
try {
246+
routerClient.close();
247+
} catch (IOException e) {
248+
LOG.error("Cannot close the client");
249+
}
250+
}
251+
}
252+
overloadException.get();
253+
}
254+
}
225255
}

0 commit comments

Comments
 (0)