3131import org .apache .hadoop .fs .CommonConfigurationKeysPublic ;
3232import org .apache .hadoop .io .IOUtils ;
3333import org .apache .hadoop .io .Writable ;
34- import org .apache .hadoop .io .WritableUtils ;
3534import org .apache .hadoop .io .retry .RetryPolicies ;
3635import org .apache .hadoop .io .retry .RetryPolicy ;
3736import org .apache .hadoop .io .retry .RetryPolicy .RetryAction ;
37+ import org .apache .hadoop .ipc .netty .client .IpcStreams ;
3838import org .apache .hadoop .ipc .RPC .RpcKind ;
3939import org .apache .hadoop .ipc .Server .AuthProtocol ;
4040import org .apache .hadoop .ipc .protobuf .IpcConnectionContextProtos .IpcConnectionContextProto ;
6262import javax .net .SocketFactory ;
6363import javax .security .sasl .Sasl ;
6464import javax .security .sasl .SaslException ;
65- import java .io .*;
66- import java .net .*;
65+ import java .io .DataOutputStream ;
66+ import java .io .EOFException ;
67+ import java .io .FilterInputStream ;
68+ import java .io .IOException ;
69+ import java .io .InputStream ;
70+ import java .io .InterruptedIOException ;
71+ import java .net .InetAddress ;
72+ import java .net .InetSocketAddress ;
73+ import java .net .Socket ;
74+ import java .net .SocketTimeoutException ;
75+ import java .net .UnknownHostException ;
6776import java .nio .ByteBuffer ;
6877import java .security .PrivilegedExceptionAction ;
69- import java .util .*;
78+ import java .util .Arrays ;
79+ import java .util .Hashtable ;
80+ import java .util .Iterator ;
7081import java .util .Map .Entry ;
71- import java .util .concurrent .*;
82+ import java .util .Random ;
83+ import java .util .Set ;
84+ import java .util .concurrent .ConcurrentHashMap ;
85+ import java .util .concurrent .ConcurrentMap ;
86+ import java .util .concurrent .ExecutionException ;
87+ import java .util .concurrent .ExecutorService ;
88+ import java .util .concurrent .Executors ;
89+ import java .util .concurrent .Future ;
90+ import java .util .concurrent .RejectedExecutionException ;
91+ import java .util .concurrent .TimeUnit ;
92+ import java .util .concurrent .TimeoutException ;
7293import java .util .concurrent .atomic .AtomicBoolean ;
7394import java .util .concurrent .atomic .AtomicInteger ;
7495import java .util .concurrent .atomic .AtomicLong ;
7798
7899import static org .apache .hadoop .ipc .RpcConstants .CONNECTION_CONTEXT_CALL_ID ;
79100import static org .apache .hadoop .ipc .RpcConstants .PING_CALL_ID ;
101+ import static org .apache .hadoop .ipc .netty .client .IpcStreams .*;
80102
81103/** A client for an IPC service. IPC calls take a single {@link Writable} as a
82104 * parameter, and return a {@link Writable} as their value. A service runs on
@@ -150,13 +172,7 @@ public static void setCallIdAndRetryCount(int cid, int rc,
150172 private final int maxAsyncCalls ;
151173 private final AtomicInteger asyncCallCounter = new AtomicInteger (0 );
152174
153- /**
154- * Executor on which IPC calls' parameters are sent.
155- * Deferring the sending of parameters to a separate
156- * thread isolates them from thread interruptions in the
157- * calling code.
158- */
159- private final ExecutorService sendParamsExecutor ;
175+
160176 private final static ClientExecutorServiceFactory clientExcecutorFactory =
161177 new ClientExecutorServiceFactory ();
162178
@@ -437,6 +453,7 @@ private class Connection extends Thread {
437453 private final boolean tcpNoDelay ; // if T then disable Nagle's Algorithm
438454 private final boolean tcpLowLatency ; // if T then use low-delay QoS
439455 private final boolean doPing ; //do we need to send ping message
456+ private final boolean useNettySSL ; // do we need SSL on in the implementation
440457 private final int pingInterval ; // how often sends ping to the server
441458 private final int soTimeout ; // used by ipc ping and rpc timeout
442459 private byte [] pingRequest ; // ping message
@@ -460,6 +477,9 @@ private class Connection extends Thread {
460477 this .maxResponseLength = remoteId .conf .getInt (
461478 CommonConfigurationKeys .IPC_MAXIMUM_RESPONSE_LENGTH ,
462479 CommonConfigurationKeys .IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT );
480+ this .useNettySSL = remoteId .conf .getBoolean (
481+ CommonConfigurationKeys .IPC_SSL_KEY ,
482+ CommonConfigurationKeys .IPC_SSL_DEFAULT );
463483 this .rpcTimeout = remoteId .getRpcTimeout ();
464484 this .maxIdleTime = remoteId .getMaxIdleTime ();
465485 this .connectionRetryPolicy = remoteId .connectionRetryPolicy ;
@@ -837,7 +857,8 @@ private synchronized void setupIOstreams(
837857 Random rand = null ;
838858 while (true ) {
839859 setupConnection (ticket );
840- ipcStreams = new IpcStreams (socket , maxResponseLength );
860+ ipcStreams = newInstance (socket , maxResponseLength ,
861+ remoteId .conf );
841862 writeConnectionHeader (ipcStreams );
842863 if (authProtocol == AuthProtocol .SASL ) {
843864 try {
@@ -944,14 +965,17 @@ private void closeConnection() {
944965 return ;
945966 }
946967 // close the current connection
947- try {
948- socket .close ();
949- } catch (IOException e ) {
950- LOG .warn ("Not able to close a socket" , e );
951- }
968+ IOUtils .cleanupWithLogger (LOG , ipcStreams , socket );
952969 // set socket to null so that the next call to setupIOstreams
953970 // can start the process of connect all over again.
954971 socket = null ;
972+ // TODO: This change causes TestApplicationClientProtocolOnHA to throw
973+ // a NullPointerException on synchronized(ipcStreams.out). This
974+ // happens because closeConnection is called before the thread that
975+ // handles client.submit finishes. This change was introduced as
976+ // part of the SSL changes. Revisit later for further investigation
977+ // and possibly better handling.
978+ // ipcStreams = null;
955979 }
956980
957981 /* Handle connection failures due to timeout on connect
@@ -1165,11 +1189,8 @@ public void sendRpcRequest(final Call call)
11651189 return ;
11661190 }
11671191
1168- // Serialize the call to be sent. This is done from the actual
1169- // caller thread, rather than the sendParamsExecutor thread,
1170-
1171- // so that if the serialization throws an error, it is reported
1172- // properly. This also parallelizes the serialization.
1192+ // Serialize the call to be sent so that if the serialization throws an
1193+ // error, it is reported properly.
11731194 //
11741195 // Format of a call on the wire:
11751196 // 0) Length of rest below (1 + 2)
@@ -1186,7 +1207,7 @@ public void sendRpcRequest(final Call call)
11861207 RpcWritable .wrap (call .rpcRequest ).writeTo (buf );
11871208
11881209 synchronized (sendRpcRequestLock ) {
1189- Future <?> senderFuture = sendParamsExecutor .submit (new Runnable () {
1210+ Future <?> senderFuture = ipcStreams .submit (new Runnable () {
11901211 @ Override
11911212 public void run () {
11921213 try {
@@ -1377,7 +1398,11 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
13771398 CommonConfigurationKeys .IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT );
13781399
13791400 this .clientId = ClientId .getClientId ();
1380- this .sendParamsExecutor = clientExcecutorFactory .refAndGetInstance ();
1401+ // TODO: This call can be moved to the place where getClientExecutor is
1402+ // invoked. However this move will change the general behaviour of
1403+ // the client, which initializes the factory in the constructor. This
1404+ // should be done with additional testing.
1405+ clientExcecutorFactory .refAndGetInstance ();
13811406 this .maxAsyncCalls = conf .getInt (
13821407 CommonConfigurationKeys .IPC_CLIENT_ASYNC_CALLS_MAX_KEY ,
13831408 CommonConfigurationKeys .IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT );
@@ -1900,79 +1925,4 @@ public static int nextCallId() {
19001925 public void close () throws Exception {
19011926 stop ();
19021927 }
1903-
1904- /** Manages the input and output streams for an IPC connection.
1905- * Only exposed for use by SaslRpcClient.
1906- */
1907- @ InterfaceAudience .Private
1908- public static class IpcStreams implements Closeable , Flushable {
1909- private DataInputStream in ;
1910- public DataOutputStream out ;
1911- private int maxResponseLength ;
1912- private boolean firstResponse = true ;
1913-
1914- IpcStreams (Socket socket , int maxResponseLength ) throws IOException {
1915- this .maxResponseLength = maxResponseLength ;
1916- setInputStream (
1917- new BufferedInputStream (NetUtils .getInputStream (socket )));
1918- setOutputStream (
1919- new BufferedOutputStream (NetUtils .getOutputStream (socket )));
1920- }
1921-
1922- void setSaslClient (SaslRpcClient client ) throws IOException {
1923- // Wrap the input stream in a BufferedInputStream to fill the buffer
1924- // before reading its length (HADOOP-14062).
1925- setInputStream (new BufferedInputStream (client .getInputStream (in )));
1926- setOutputStream (client .getOutputStream (out ));
1927- }
1928-
1929- private void setInputStream (InputStream is ) {
1930- this .in = (is instanceof DataInputStream )
1931- ? (DataInputStream )is : new DataInputStream (is );
1932- }
1933-
1934- private void setOutputStream (OutputStream os ) {
1935- this .out = (os instanceof DataOutputStream )
1936- ? (DataOutputStream )os : new DataOutputStream (os );
1937- }
1938-
1939- public ByteBuffer readResponse () throws IOException {
1940- int length = in .readInt ();
1941- if (firstResponse ) {
1942- firstResponse = false ;
1943- // pre-rpcv9 exception, almost certainly a version mismatch.
1944- if (length == -1 ) {
1945- in .readInt (); // ignore fatal/error status, it's fatal for us.
1946- throw new RemoteException (WritableUtils .readString (in ),
1947- WritableUtils .readString (in ));
1948- }
1949- }
1950- if (length <= 0 ) {
1951- throw new RpcException (String .format ("RPC response has " +
1952- "invalid length of %d" , length ));
1953- }
1954- if (maxResponseLength > 0 && length > maxResponseLength ) {
1955- throw new RpcException (String .format ("RPC response has a " +
1956- "length of %d exceeds maximum data length" , length ));
1957- }
1958- ByteBuffer bb = ByteBuffer .allocate (length );
1959- in .readFully (bb .array ());
1960- return bb ;
1961- }
1962-
1963- public void sendRequest (byte [] buf ) throws IOException {
1964- out .write (buf );
1965- }
1966-
1967- @ Override
1968- public void flush () throws IOException {
1969- out .flush ();
1970- }
1971-
1972- @ Override
1973- public void close () {
1974- IOUtils .closeStream (out );
1975- IOUtils .closeStream (in );
1976- }
1977- }
19781928}
0 commit comments