Skip to content

Commit d3fa53a

Browse files
committed
HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula.
(cherry picked from commit bd37355)
1 parent 3e5a85f commit d3fa53a

File tree

4 files changed

+98
-4
lines changed

4 files changed

+98
-4
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import com.google.protobuf.ByteString;
122122
import com.google.protobuf.CodedOutputStream;
123123
import com.google.protobuf.Message;
124+
import org.codehaus.jackson.map.ObjectMapper;
124125

125126
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
126127
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -2082,6 +2083,9 @@ private void processConnectionContext(RpcWritable.Buffer buffer)
20822083
authorizeConnection();
20832084
// don't set until after authz because connection isn't established
20842085
connectionContextRead = true;
2086+
if (user != null) {
2087+
connectionManager.incrUserConnections(user.getShortUserName());
2088+
}
20852089
}
20862090

20872091
/**
@@ -2935,7 +2939,20 @@ public int getPort() {
29352939
public int getNumOpenConnections() {
29362940
return connectionManager.size();
29372941
}
2938-
2942+
2943+
/**
2944+
* Get the NumOpenConnections/User.
2945+
*/
2946+
public String getNumOpenConnectionsPerUser() {
2947+
ObjectMapper mapper = new ObjectMapper();
2948+
try {
2949+
return mapper
2950+
.writeValueAsString(connectionManager.getUserToConnectionsMap());
2951+
} catch (IOException ignored) {
2952+
}
2953+
return null;
2954+
}
2955+
29392956
/**
29402957
* The number of rpc calls in the queue.
29412958
* @return The number of rpc calls in the queue.
@@ -3055,6 +3072,9 @@ private static int channelIO(ReadableByteChannel readCh,
30553072
private class ConnectionManager {
30563073
final private AtomicInteger count = new AtomicInteger();
30573074
final private Set<Connection> connections;
3075+
/* Map to maintain the statistics per User */
3076+
final private Map<String, Integer> userToConnectionsMap;
3077+
final private Object userToConnectionsMapLock = new Object();
30583078

30593079
final private Timer idleScanTimer;
30603080
final private int idleScanThreshold;
@@ -3086,6 +3106,7 @@ private class ConnectionManager {
30863106
this.connections = Collections.newSetFromMap(
30873107
new ConcurrentHashMap<Connection,Boolean>(
30883108
maxQueueSize, 0.75f, readThreads+2));
3109+
this.userToConnectionsMap = new ConcurrentHashMap<>();
30893110
}
30903111

30913112
private boolean add(Connection connection) {
@@ -3103,7 +3124,39 @@ private boolean remove(Connection connection) {
31033124
}
31043125
return removed;
31053126
}
3106-
3127+
3128+
void incrUserConnections(String user) {
3129+
synchronized (userToConnectionsMapLock) {
3130+
Integer count = userToConnectionsMap.get(user);
3131+
if (count == null) {
3132+
count = 1;
3133+
} else {
3134+
count++;
3135+
}
3136+
userToConnectionsMap.put(user, count);
3137+
}
3138+
}
3139+
3140+
void decrUserConnections(String user) {
3141+
synchronized (userToConnectionsMapLock) {
3142+
Integer count = userToConnectionsMap.get(user);
3143+
if (count == null) {
3144+
return;
3145+
} else {
3146+
count--;
3147+
}
3148+
if (count == 0) {
3149+
userToConnectionsMap.remove(user);
3150+
} else {
3151+
userToConnectionsMap.put(user, count);
3152+
}
3153+
}
3154+
}
3155+
3156+
Map<String, Integer> getUserToConnectionsMap() {
3157+
return userToConnectionsMap;
3158+
}
3159+
31073160
int size() {
31083161
return count.get();
31093162
}
@@ -3142,6 +3195,10 @@ boolean close(Connection connection) {
31423195
// only close if actually removed to avoid double-closing due
31433196
// to possible races
31443197
connection.close();
3198+
// Remove authorized users only
3199+
if (connection.user != null && connection.connectionContextRead) {
3200+
decrUserConnections(connection.user.getShortUserName());
3201+
}
31453202
}
31463203
return exists;
31473204
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public static RpcMetrics create(Server server, Configuration conf) {
104104
return server.getNumOpenConnections();
105105
}
106106

107+
@Metric("Number of open connections per user")
108+
public String numOpenConnectionsPerUser() {
109+
return server.getNumOpenConnectionsPerUser();
110+
}
111+
107112
@Metric("Length of the call queue") public int callQueueLength() {
108113
return server.getCallQueueLen();
109114
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.net.InetAddress;
6565
import java.net.InetSocketAddress;
6666
import java.net.SocketTimeoutException;
67+
import java.security.PrivilegedAction;
6768
import java.security.PrivilegedExceptionAction;
6869
import java.util.ArrayList;
6970
import java.util.Arrays;
@@ -1027,7 +1028,7 @@ public UserGroupInformation getRemoteUser() {
10271028

10281029
@Test
10291030
public void testRpcMetrics() throws Exception {
1030-
Server server;
1031+
final Server server;
10311032
TestRpcService proxy = null;
10321033

10331034
final int interval = 1;
@@ -1037,14 +1038,29 @@ public void testRpcMetrics() throws Exception {
10371038
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
10381039

10391040
server = setupTestServer(conf, 5);
1040-
1041+
String testUser = "testUser";
1042+
UserGroupInformation anotherUser =
1043+
UserGroupInformation.createRemoteUser(testUser);
1044+
TestRpcService proxy2 =
1045+
anotherUser.doAs(new PrivilegedAction<TestRpcService>() {
1046+
public TestRpcService run() {
1047+
try {
1048+
return RPC.getProxy(TestRpcService.class, 0,
1049+
server.getListenerAddress(), conf);
1050+
} catch (IOException e) {
1051+
e.printStackTrace();
1052+
}
1053+
return null;
1054+
}
1055+
});
10411056
try {
10421057
proxy = getClient(addr, conf);
10431058

10441059
for (int i = 0; i < 1000; i++) {
10451060
proxy.ping(null, newEmptyRequest());
10461061

10471062
proxy.echo(null, newEchoRequest("" + i));
1063+
proxy2.echo(null, newEchoRequest("" + i));
10481064
}
10491065
MetricsRecordBuilder rpcMetrics =
10501066
getMetrics(server.getRpcMetrics().name());
@@ -1056,7 +1072,16 @@ public void testRpcMetrics() throws Exception {
10561072
rpcMetrics);
10571073
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
10581074
rpcMetrics);
1075+
String actualUserVsCon = MetricsAsserts
1076+
.getStringMetric("NumOpenConnectionsPerUser", rpcMetrics);
1077+
String proxyUser =
1078+
UserGroupInformation.getCurrentUser().getShortUserName();
1079+
assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1"));
1080+
assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1"));
10591081
} finally {
1082+
if (proxy2 != null) {
1083+
RPC.stopProxy(proxy2);
1084+
}
10601085
stop(server, proxy);
10611086
}
10621087
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,13 @@ public static long getLongCounter(String name, MetricsRecordBuilder rb) {
236236
return captor.getValue();
237237
}
238238

239+
public static String getStringMetric(String name, MetricsRecordBuilder rb) {
240+
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
241+
verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture());
242+
checkCaptured(captor, name);
243+
return captor.getValue();
244+
}
245+
239246
/**
240247
* Assert a float gauge metric as expected
241248
* @param name of the metric

0 commit comments

Comments
 (0)