1818
1919package org .apache .hadoop .ipc ;
2020
21+ import org .apache .commons .lang3 .tuple .Pair ;
2122import org .apache .hadoop .security .AccessControlException ;
2223import org .apache .hadoop .classification .VisibleForTesting ;
2324import org .apache .hadoop .util .Preconditions ;
24- import org .apache .hadoop .thirdparty .com .google .common .util .concurrent .ThreadFactoryBuilder ;
2525import org .apache .hadoop .classification .InterfaceAudience ;
2626import org .apache .hadoop .classification .InterfaceAudience .Public ;
2727import org .apache .hadoop .classification .InterfaceStability ;
@@ -166,73 +166,6 @@ public static Object getExternalHandler() {
166166 private final int maxAsyncCalls ;
167167 private final AtomicInteger asyncCallCounter = new AtomicInteger (0 );
168168
169- /**
170- * Executor on which IPC calls' parameters are sent.
171- * Deferring the sending of parameters to a separate
172- * thread isolates them from thread interruptions in the
173- * calling code.
174- */
175- private final ExecutorService sendParamsExecutor ;
176- private final static ClientExecutorServiceFactory clientExcecutorFactory =
177- new ClientExecutorServiceFactory ();
178-
179- private static class ClientExecutorServiceFactory {
180- private int executorRefCount = 0 ;
181- private ExecutorService clientExecutor = null ;
182-
183- /**
184- * Get Executor on which IPC calls' parameters are sent.
185- * If the internal reference counter is zero, this method
186- * creates the instance of Executor. If not, this method
187- * just returns the reference of clientExecutor.
188- *
189- * @return An ExecutorService instance
190- */
191- synchronized ExecutorService refAndGetInstance () {
192- if (executorRefCount == 0 ) {
193- clientExecutor = Executors .newCachedThreadPool (
194- new ThreadFactoryBuilder ()
195- .setDaemon (true )
196- .setNameFormat ("IPC Parameter Sending Thread #%d" )
197- .build ());
198- }
199- executorRefCount ++;
200-
201- return clientExecutor ;
202- }
203-
204- /**
205- * Cleanup Executor on which IPC calls' parameters are sent.
206- * If reference counter is zero, this method discards the
207- * instance of the Executor. If not, this method
208- * just decrements the internal reference counter.
209- *
210- * @return An ExecutorService instance if it exists.
211- * Null is returned if not.
212- */
213- synchronized ExecutorService unrefAndCleanup () {
214- executorRefCount --;
215- assert (executorRefCount >= 0 );
216-
217- if (executorRefCount == 0 ) {
218- clientExecutor .shutdown ();
219- try {
220- if (!clientExecutor .awaitTermination (1 , TimeUnit .MINUTES )) {
221- clientExecutor .shutdownNow ();
222- }
223- } catch (InterruptedException e ) {
224- LOG .warn ("Interrupted while waiting for clientExecutor" +
225- " to stop" );
226- clientExecutor .shutdownNow ();
227- Thread .currentThread ().interrupt ();
228- }
229- clientExecutor = null ;
230- }
231-
232- return clientExecutor ;
233- }
234- }
235-
236169 /**
237170 * set the ping interval value in configuration
238171 *
@@ -301,11 +234,6 @@ public static final void setConnectTimeout(Configuration conf, int timeout) {
301234 conf .setInt (CommonConfigurationKeys .IPC_CLIENT_CONNECT_TIMEOUT_KEY , timeout );
302235 }
303236
304- @ VisibleForTesting
305- public static final ExecutorService getClientExecutor () {
306- return Client .clientExcecutorFactory .clientExecutor ;
307- }
308-
309237 /**
310238 * Increment this client's reference count
311239 */
@@ -462,8 +390,10 @@ private class Connection extends Thread {
462390 private AtomicLong lastActivity = new AtomicLong ();// last I/O activity time
463391 private AtomicBoolean shouldCloseConnection = new AtomicBoolean (); // indicate if the connection is closed
464392 private IOException closeException ; // close reason
465-
466- private final Object sendRpcRequestLock = new Object ();
393+
394+ private final Thread rpcRequestThread ;
395+ private final SynchronousQueue <Pair <Call , ResponseBuffer >> rpcRequestQueue =
396+ new SynchronousQueue <>(true );
467397
468398 private AtomicReference <Thread > connectingThread = new AtomicReference <>();
469399 private final Consumer <Connection > removeMethod ;
@@ -472,6 +402,9 @@ private class Connection extends Thread {
472402 Consumer <Connection > removeMethod ) {
473403 this .remoteId = remoteId ;
474404 this .server = remoteId .getAddress ();
405+ this .rpcRequestThread = new Thread (new RpcRequestSender (),
406+ "IPC Parameter Sending Thread for " + remoteId );
407+ this .rpcRequestThread .setDaemon (true );
475408
476409 this .maxResponseLength = remoteId .conf .getInt (
477410 CommonConfigurationKeys .IPC_MAXIMUM_RESPONSE_LENGTH ,
@@ -1150,6 +1083,10 @@ private synchronized void sendPing() throws IOException {
11501083
11511084 @ Override
11521085 public void run () {
1086+ // Don't start the ipc parameter sending thread until we start this
1087+ // thread, because the shutdown logic only gets triggered if this
1088+ // thread is started.
1089+ rpcRequestThread .start ();
11531090 if (LOG .isDebugEnabled ())
11541091 LOG .debug (getName () + ": starting, having connections "
11551092 + connections .size ());
@@ -1173,9 +1110,52 @@ public void run() {
11731110 + connections .size ());
11741111 }
11751112
1113+ /**
1114+ * A thread to write rpc requests to the socket.
1115+ */
1116+ private class RpcRequestSender implements Runnable {
1117+ @ Override
1118+ public void run () {
1119+ while (!shouldCloseConnection .get ()) {
1120+ ResponseBuffer buf = null ;
1121+ try {
1122+ Pair <Call , ResponseBuffer > pair =
1123+ rpcRequestQueue .poll (maxIdleTime , TimeUnit .MILLISECONDS );
1124+ if (pair == null || shouldCloseConnection .get ()) {
1125+ continue ;
1126+ }
1127+ buf = pair .getRight ();
1128+ synchronized (ipcStreams .out ) {
1129+ if (LOG .isDebugEnabled ()) {
1130+ Call call = pair .getLeft ();
1131+ LOG .debug (getName () + "{} sending #{} {}" , getName (), call .id ,
1132+ call .rpcRequest );
1133+ }
1134+ // RpcRequestHeader + RpcRequest
1135+ ipcStreams .sendRequest (buf .toByteArray ());
1136+ ipcStreams .flush ();
1137+ }
1138+ } catch (InterruptedException ie ) {
1139+ // stop this thread
1140+ return ;
1141+ } catch (IOException e ) {
1142+ // exception at this point would leave the connection in an
1143+ // unrecoverable state (eg half a call left on the wire).
1144+ // So, close the connection, killing any outstanding calls
1145+ markClosed (e );
1146+ } finally {
1147+ //the buffer is just an in-memory buffer, but it is still polite to
1148+ // close early
1149+ IOUtils .closeStream (buf );
1150+ }
1151+ }
1152+ }
1153+ }
1154+
11761155 /** Initiates a rpc call by sending the rpc request to the remote server.
1177- * Note: this is not called from the Connection thread, but by other
1178- * threads.
1156+ * Note: this is not called from the current thread, but by another
1157+ * thread, so that if the current thread is interrupted that the socket
1158+ * state isn't corrupted with a partially written message.
11791159 * @param call - the rpc request
11801160 */
11811161 public void sendRpcRequest (final Call call )
@@ -1185,8 +1165,7 @@ public void sendRpcRequest(final Call call)
11851165 }
11861166
11871167 // Serialize the call to be sent. This is done from the actual
1188- // caller thread, rather than the sendParamsExecutor thread,
1189-
1168+ // caller thread, rather than the rpcRequestThread in the connection,
11901169 // so that if the serialization throws an error, it is reported
11911170 // properly. This also parallelizes the serialization.
11921171 //
@@ -1203,51 +1182,7 @@ public void sendRpcRequest(final Call call)
12031182 final ResponseBuffer buf = new ResponseBuffer ();
12041183 header .writeDelimitedTo (buf );
12051184 RpcWritable .wrap (call .rpcRequest ).writeTo (buf );
1206-
1207- synchronized (sendRpcRequestLock ) {
1208- Future <?> senderFuture = sendParamsExecutor .submit (new Runnable () {
1209- @ Override
1210- public void run () {
1211- try {
1212- synchronized (ipcStreams .out ) {
1213- if (shouldCloseConnection .get ()) {
1214- return ;
1215- }
1216- if (LOG .isDebugEnabled ()) {
1217- LOG .debug (getName () + " sending #" + call .id
1218- + " " + call .rpcRequest );
1219- }
1220- // RpcRequestHeader + RpcRequest
1221- ipcStreams .sendRequest (buf .toByteArray ());
1222- ipcStreams .flush ();
1223- }
1224- } catch (IOException e ) {
1225- // exception at this point would leave the connection in an
1226- // unrecoverable state (eg half a call left on the wire).
1227- // So, close the connection, killing any outstanding calls
1228- markClosed (e );
1229- } finally {
1230- //the buffer is just an in-memory buffer, but it is still polite to
1231- // close early
1232- IOUtils .closeStream (buf );
1233- }
1234- }
1235- });
1236-
1237- try {
1238- senderFuture .get ();
1239- } catch (ExecutionException e ) {
1240- Throwable cause = e .getCause ();
1241-
1242- // cause should only be a RuntimeException as the Runnable above
1243- // catches IOException
1244- if (cause instanceof RuntimeException ) {
1245- throw (RuntimeException ) cause ;
1246- } else {
1247- throw new RuntimeException ("unexpected checked exception" , cause );
1248- }
1249- }
1250- }
1185+ rpcRequestQueue .put (Pair .of (call , buf ));
12511186 }
12521187
12531188 /* Receive a response.
@@ -1396,7 +1331,6 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
13961331 CommonConfigurationKeys .IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT );
13971332
13981333 this .clientId = ClientId .getClientId ();
1399- this .sendParamsExecutor = clientExcecutorFactory .refAndGetInstance ();
14001334 this .maxAsyncCalls = conf .getInt (
14011335 CommonConfigurationKeys .IPC_CLIENT_ASYNC_CALLS_MAX_KEY ,
14021336 CommonConfigurationKeys .IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT );
@@ -1440,6 +1374,7 @@ public void stop() {
14401374 // wake up all connections
14411375 for (Connection conn : connections .values ()) {
14421376 conn .interrupt ();
1377+ conn .rpcRequestThread .interrupt ();
14431378 conn .interruptConnectingThread ();
14441379 }
14451380
@@ -1456,7 +1391,6 @@ public void stop() {
14561391 }
14571392 }
14581393 }
1459- clientExcecutorFactory .unrefAndCleanup ();
14601394 }
14611395
14621396 /**
0 commit comments