@@ -77,7 +77,6 @@ public class ConnectionPool {
7777 private static final Logger LOG =
7878 LoggerFactory .getLogger (ConnectionPool .class );
7979
80-
8180 /** Configuration settings for the connection pool. */
8281 private final Configuration conf ;
8382
@@ -94,6 +93,8 @@ public class ConnectionPool {
9493 private volatile List <ConnectionContext > connections = new ArrayList <>();
9594 /** Connection index for round-robin. */
9695 private final AtomicInteger clientIndex = new AtomicInteger (0 );
96+ /** Underlying socket index. **/
97+ private final AtomicInteger socketIndex = new AtomicInteger (0 );
9798
9899 /** Min number of connections per user. */
99100 private final int minSize ;
@@ -105,6 +106,12 @@ public class ConnectionPool {
105106 /** The last time a connection was active. */
106107 private volatile long lastActiveTime = 0 ;
107108
109+ /** Enable using multiple physical socket or not. **/
110+ private final boolean enableMultiSocket ;
111+
112+ /** Max Concurrency of each connection. */
113+ private final int maxConcurrencyPerConn ;
114+
108115 /** Map for the protocols and their protobuf implementations. */
109116 private final static Map <Class <?>, ProtoImpl > PROTO_MAP = new HashMap <>();
110117 static {
@@ -149,9 +156,15 @@ protected ConnectionPool(Configuration config, String address,
149156 this .minSize = minPoolSize ;
150157 this .maxSize = maxPoolSize ;
151158 this .minActiveRatio = minActiveRatio ;
159+ this .enableMultiSocket = conf .getBoolean (
160+ RBFConfigKeys .DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY ,
161+ RBFConfigKeys .DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT );
162+ this .maxConcurrencyPerConn = conf .getInt (
163+ RBFConfigKeys .DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY ,
164+ RBFConfigKeys .DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT );
152165
153166 // Add minimum connections to the pool
154- for (int i = 0 ; i < this .minSize ; i ++) {
167+ for (int i = 0 ; i < this .minSize ; i ++) {
155168 ConnectionContext newConnection = newConnection ();
156169 this .connections .add (newConnection );
157170 }
@@ -210,24 +223,22 @@ public AtomicInteger getClientIndex() {
210223 * @return Connection context.
211224 */
212225 protected ConnectionContext getConnection () {
213-
214226 this .lastActiveTime = Time .now ();
215-
216- // Get a connection from the pool following round-robin
217- ConnectionContext conn = null ;
218227 List <ConnectionContext > tmpConnections = this .connections ;
219- int size = tmpConnections .size ();
220- // Inc and mask off sign bit, lookup index should be non-negative int
221- int threadIndex = this .clientIndex .getAndIncrement () & 0x7FFFFFFF ;
222- for (int i =0 ; i <size ; i ++) {
223- int index = (threadIndex + i ) % size ;
224- conn = tmpConnections .get (index );
225- if (conn != null && conn .isUsable ()) {
226- return conn ;
228+ for (ConnectionContext tmpConnection : tmpConnections ) {
229+ if (tmpConnection != null && tmpConnection .isUsable ()) {
230+ return tmpConnection ;
227231 }
228232 }
229233
230- // We return a connection even if it's active
234+ ConnectionContext conn = null ;
235+ // We return a connection even if it's busy
236+ int size = tmpConnections .size ();
237+ if (size > 0 ) {
238+ // Get a connection from the pool following round-robin
239+ int threadIndex = this .clientIndex .getAndIncrement () & 0x7FFFFFFF ;
240+ conn = tmpConnections .get (threadIndex % size );
241+ }
231242 return conn ;
232243 }
233244
@@ -256,19 +267,18 @@ public synchronized List<ConnectionContext> removeConnections(int num) {
256267 int targetCount = Math .min (num , this .connections .size () - this .minSize );
257268 // Remove and close targetCount of connections
258269 List <ConnectionContext > tmpConnections = new ArrayList <>();
259- for (int i = 0 ; i < this .connections .size (); i ++) {
260- ConnectionContext conn = this .connections .get (i );
270+ for (ConnectionContext conn : this .connections ) {
261271 // Only pick idle connections to close
262- if (removed .size () < targetCount && conn .isUsable ()) {
272+ if (removed .size () < targetCount && conn .isIdle ()) {
263273 removed .add (conn );
264274 } else {
265275 tmpConnections .add (conn );
266276 }
267277 }
268278 this .connections = tmpConnections ;
269279 }
270- LOG .debug ("Expected to remove {} connection " +
271- "and actually removed {} connections" , num , removed .size ());
280+ LOG .debug ("Expected to remove {} connection and actually removed {} connections" ,
281+ num , removed .size ());
272282 return removed ;
273283 }
274284
@@ -303,7 +313,6 @@ protected int getNumConnections() {
303313 */
304314 protected int getNumActiveConnections () {
305315 int ret = 0 ;
306-
307316 List <ConnectionContext > tmpConnections = this .connections ;
308317 for (ConnectionContext conn : tmpConnections ) {
309318 if (conn .isActive ()) {
@@ -320,10 +329,9 @@ protected int getNumActiveConnections() {
320329 */
321330 protected int getNumIdleConnections () {
322331 int ret = 0 ;
323-
324332 List <ConnectionContext > tmpConnections = this .connections ;
325333 for (ConnectionContext conn : tmpConnections ) {
326- if (conn .isUsable ()) {
334+ if (conn .isIdle ()) {
327335 ret ++;
328336 }
329337 }
@@ -393,28 +401,34 @@ public String getJSON() {
393401 * @throws IOException If it cannot get a new connection.
394402 */
395403 public ConnectionContext newConnection () throws IOException {
396- return newConnection (
397- this .conf , this .namenodeAddress , this .ugi , this .protocol );
404+ return newConnection (this .conf , this .namenodeAddress ,
405+ this .ugi , this .protocol , this .enableMultiSocket ,
406+ this .socketIndex .incrementAndGet (),
407+ this .maxConcurrencyPerConn );
398408 }
399409
400410 /**
401411 * Creates a proxy wrapper for a client NN connection. Each proxy contains
402412 * context for a single user/security context. To maximize throughput it is
403413 * recommended to use multiple connection per user+server, allowing multiple
404414 * writes and reads to be dispatched in parallel.
405- * @param <T>
415+ * @param <T> Input type T.
406416 *
407417 * @param conf Configuration for the connection.
408418 * @param nnAddress Address of server supporting the ClientProtocol.
409419 * @param ugi User context.
410420 * @param proto Interface of the protocol.
421+ * @param enableMultiSocket Enable multiple socket or not.
422+ * @param maxConcurrencyPerConn The maximum number of requests that
423+ * this connection can handle concurrently.
411424 * @return proto for the target ClientProtocol that contains the user's
412425 * security context.
413426 * @throws IOException If it cannot be created.
414427 */
415428 protected static <T > ConnectionContext newConnection (Configuration conf ,
416- String nnAddress , UserGroupInformation ugi , Class <T > proto )
417- throws IOException {
429+ String nnAddress , UserGroupInformation ugi , Class <T > proto ,
430+ boolean enableMultiSocket , int socketIndex ,
431+ int maxConcurrencyPerConn ) throws IOException {
418432 if (!PROTO_MAP .containsKey (proto )) {
419433 String msg = "Unsupported protocol for connection to NameNode: "
420434 + ((proto != null ) ? proto .getName () : "null" );
@@ -437,23 +451,31 @@ protected static <T> ConnectionContext newConnection(Configuration conf,
437451 }
438452 InetSocketAddress socket = NetUtils .createSocketAddr (nnAddress );
439453 final long version = RPC .getProtocolVersion (classes .protoPb );
440- Object proxy = RPC .getProtocolProxy (classes .protoPb , version , socket , ugi ,
441- conf , factory , RPC .getRpcTimeout (conf ), defaultPolicy , null ).getProxy ();
454+ Object proxy ;
455+ if (enableMultiSocket ) {
456+ FederationConnectionId connectionId = new FederationConnectionId (
457+ socket , classes .protoPb , ugi , RPC .getRpcTimeout (conf ),
458+ defaultPolicy , conf , socketIndex );
459+ proxy = RPC .getProtocolProxy (classes .protoPb , version , connectionId ,
460+ conf , factory ).getProxy ();
461+ } else {
462+ proxy = RPC .getProtocolProxy (classes .protoPb , version , socket , ugi ,
463+ conf , factory , RPC .getRpcTimeout (conf ), defaultPolicy , null ).getProxy ();
464+ }
465+
442466 T client = newProtoClient (proto , classes , proxy );
443467 Text dtService = SecurityUtil .buildTokenService (socket );
444468
445- ProxyAndInfo <T > clientProxy =
446- new ProxyAndInfo <T >(client , dtService , socket );
447- ConnectionContext connection = new ConnectionContext (clientProxy );
448- return connection ;
469+ ProxyAndInfo <T > clientProxy = new ProxyAndInfo <T >(client , dtService , socket );
470+ return new ConnectionContext (clientProxy , maxConcurrencyPerConn );
449471 }
450472
451473 private static <T > T newProtoClient (Class <T > proto , ProtoImpl classes ,
452474 Object proxy ) {
453475 try {
454476 Constructor <?> constructor =
455477 classes .protoClientPb .getConstructor (classes .protoPb );
456- Object o = constructor .newInstance (new Object [] { proxy } );
478+ Object o = constructor .newInstance (proxy );
457479 if (proto .isAssignableFrom (o .getClass ())) {
458480 @ SuppressWarnings ("unchecked" )
459481 T client = (T ) o ;
0 commit comments