+ * On the first pass, it processes the connectionHeader, + * connectionContext (an outOfBand RPC) and at most one RPC request that + * follows that. On future passes it will process at most one RPC request. + *
+ * Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR
+ * rpc request length.
+ *
+ * @return -1 in case of error, else num bytes read so far
+ * @throws IOException - internal error that should not be returned to
+ * client, typically failure to respond to client
+ * @throws InterruptedException - if the thread is interrupted.
+ */
+ public int doRead(Object in) throws InterruptedException {
+ setLastContact(Time.now());
+
+ int count;
+ try {
+ count = readAndProcess(in);
+ } catch (InterruptedException ie) {
+ Server.LOG.info(Thread.currentThread().getName() +
+ ": readAndProcess caught InterruptedException", ie);
+ throw ie;
+ } catch (Exception e) {
+ // Any exceptions that reach here are fatal unexpected internal errors
+ // that could not be sent to the client.
+ Server.LOG.info(Thread.currentThread().getName() +
+ ": readAndProcess from client " + this +
+ " threw exception [" + e + "]", e);
+ count = -1; //so that the (count < 0) block is executed
+ }
+ // setupResponse will signal the connection should be closed when a
+ // fatal response is sent.
+ if (count < 0 || shouldClose()) {
+ server.closeConnection(this);
+ } else {
+ setLastContact(Time.now());
+ }
+ return count;
+ }
+
+ private int readAndProcess(Object in)
+ throws IOException, InterruptedException {
+ while (server.running &&
+ !shouldClose()) { // stop if a fatal response has been sent.
+ // dataLengthBuffer is used to read "hrpc" or the rpc-packet length
+ int count = -1;
+ if (dataLengthBuffer.remaining() > 0) {
+ count = bufferRead(in, dataLengthBuffer);
+ if (count < 0 || dataLengthBuffer.remaining() > 0) {
+ return count;
+ }
+ }
+
+ if (!connectionHeaderRead) {
+ // Every connection is expected to send the header;
+ // so far we read "hrpc" of the connection header.
+ if (connectionHeaderBuf == null) {
+ // for the bytes that follow "hrpc", in the connection header
+ connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART);
+ }
+ count = bufferRead(in, connectionHeaderBuf);
+ if (count < 0 || connectionHeaderBuf.remaining() > 0) {
+ return count;
+ }
+ int version = connectionHeaderBuf.get(0);
+ // TODO we should add handler for service class later
+ this.setServiceClass(connectionHeaderBuf.get(1));
+ dataLengthBuffer.flip();
+
+ // Check if it looks like the user is hitting an IPC port
+ // with an HTTP GET - this is a common error, so we can
+ // send back a simple string indicating as much.
+ if (Server.HTTP_GET_BYTES.equals(dataLengthBuffer)) {
+ setupHttpRequestOnIpcPortResponse();
+ return -1;
+ }
+
+ if (!RpcConstants.HEADER.equals(dataLengthBuffer)) {
+ Server.LOG.warn("Incorrect RPC Header length from {}:{} "
+ + "expected length: {} got length: {}",
+ hostAddress, remotePort, RpcConstants.HEADER, dataLengthBuffer);
+ setupBadVersionResponse(version);
+ return -1;
+ }
+ if (version != CURRENT_VERSION) {
+ //Warning is ok since this is not supposed to happen.
+ Server.LOG.warn("Version mismatch from " +
+ hostAddress + ":" + remotePort +
+ " got version " + version +
+ " expected version " + CURRENT_VERSION);
+ setupBadVersionResponse(version);
+ return -1;
+ }
+
+ // this may switch us into SIMPLE
+ authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
+
+ dataLengthBuffer.clear(); // clear to next read rpc packet len
+ connectionHeaderBuf = null;
+ connectionHeaderRead = true;
+ continue; // connection header read, now read 4 bytes rpc packet len
+ }
+
+ if (data == null) { // just read 4 bytes - length of RPC packet
+ dataLengthBuffer.flip();
+ dataLength = dataLengthBuffer.getInt();
+ checkDataLength(dataLength);
+ // Set buffer for reading EXACTLY the RPC-packet length and no more.
+ data = ByteBuffer.allocate(dataLength);
+ }
+ // Now read the RPC packet
+ count = bufferRead(in, data);
+
+ if (data.remaining() == 0) {
+ dataLengthBuffer.clear(); // to read length of future rpc packets
+ data.flip();
+ ByteBuffer requestData = data;
+ data = null; // null out in case processOneRpc throws.
+ boolean isHeaderRead = connectionContextRead;
+ processOneRpc(requestData);
+ // the last rpc-request we processed could have simply been the
+ // connectionContext; if so continue to read the first RPC.
+ if (!isHeaderRead) {
+ continue;
+ }
+ }
+ return count;
+ }
+ return -1;
+ }
+
+ private Server.AuthProtocol initializeAuthContext(int authType)
+ throws IOException {
+ Server.AuthProtocol authProtocol = Server.AuthProtocol.valueOf(authType);
+ if (authProtocol == null) {
+ IOException ioe = new IpcException("Unknown auth protocol:" + authType);
+ doSaslReply(ioe);
+ throw ioe;
+ }
+ boolean isSimpleEnabled = server.enabledAuthMethods.contains(
+ SaslRpcServer.AuthMethod.SIMPLE);
+ switch (authProtocol) {
+ case NONE: {
+ // don't reply if client is simple and server is insecure
+ if (!isSimpleEnabled) {
+ IOException ioe = new AccessControlException(
+ "SIMPLE authentication is not enabled."
+ + " Available:" + server.enabledAuthMethods);
+ doSaslReply(ioe);
+ throw ioe;
+ }
+ break;
+ }
+ default: {
+ break;
+ }
+ }
+ return authProtocol;
+ }
+
+ /**
+ * Process the Sasl's Negotiate request, including the optimization of
+ * accelerating token negotiation.
+ *
+ * @return the response to Negotiate request - the list of enabled
+ * authMethods and challenge if the TOKENS are supported.
+ * @throws SaslException - if attempt to generate challenge fails.
+ * @throws IOException - if it fails to create the SASL server for Tokens
+ */
+ private RpcHeaderProtos.RpcSaslProto buildSaslNegotiateResponse()
+ throws InterruptedException, SaslException, IOException {
+ RpcHeaderProtos.RpcSaslProto negotiateMessage = server.negotiateResponse;
+ // accelerate token negotiation by sending initial challenge
+ // in the negotiation response
+ if (server.enabledAuthMethods.contains(SaslRpcServer.AuthMethod.TOKEN)) {
+ saslServer = createSaslServer(SaslRpcServer.AuthMethod.TOKEN);
+ byte[] challenge = saslServer.evaluateResponse(new byte[0]);
+ RpcHeaderProtos.RpcSaslProto.Builder negotiateBuilder =
+ RpcHeaderProtos.RpcSaslProto.newBuilder(server.negotiateResponse);
+ negotiateBuilder.getAuthsBuilder(0) // TOKEN is always first
+ .setChallenge(ByteString.copyFrom(challenge));
+ negotiateMessage = negotiateBuilder.build();
+ }
+ sentNegotiate = true;
+ return negotiateMessage;
+ }
+
+ private SaslServer createSaslServer(SaslRpcServer.AuthMethod authMethod)
+ throws IOException, InterruptedException {
+ final Map
+ * Prior to this call the connectionHeader ("hrpc...") has been handled and
+ * if SASL then SASL has been established and the buf we are passed
+ * has been unwrapped from SASL.
+ *
+ * @param bb - contains the RPC request header and the rpc request
+ * @throws IOException - internal error that should not be returned to
+ * client, typically failure to respond to client
+ * @throws InterruptedException
+ */
+ private void processOneRpc(ByteBuffer bb)
+ throws IOException, InterruptedException {
+ // exceptions that escape this method are fatal to the connection.
+ // setupResponse will use the rpc status to determine if the connection
+ // should be closed.
+ int callId = -1;
+ int retry = RpcConstants.INVALID_RETRY_COUNT;
+ try {
+ final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
+ final RpcHeaderProtos.RpcRequestHeaderProto header =
+ getMessage(RpcHeaderProtos.RpcRequestHeaderProto.getDefaultInstance(),
+ buffer);
+ callId = header.getCallId();
+ retry = header.getRetryCount();
+ if (Server.LOG.isDebugEnabled()) {
+ Server.LOG.debug(" got #" + callId);
+ }
+ checkRpcHeaders(header);
+
+ if (callId < 0) { // callIds typically used during connection setup
+ processRpcOutOfBandRequest(header, buffer);
+ } else if (!connectionContextRead) {
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection context not established");
+ } else {
+ processRpcRequest(header, buffer);
+ }
+ } catch (RpcServerException rse) {
+ // inform client of error, but do not rethrow else non-fatal
+ // exceptions will close connection!
+ if (Server.LOG.isDebugEnabled()) {
+ Server.LOG.debug(Thread.currentThread().getName() +
+ ": processOneRpc from client " + this +
+ " threw exception [" + rse + "]");
+ }
+ // use the wrapped exception if there is one.
+ Throwable t = (rse.getCause() != null) ? rse.getCause() : rse;
+ final Server.RpcCall call = server.getRpcCall(this, callId, retry);
+ server.setupResponse(call,
+ rse.getRpcStatusProto(), rse.getRpcErrorCodeProto(), null,
+ t.getClass().getName(), t.getMessage());
+ sendResponse(call);
+ }
+ }
+
+ /**
+ * Verify RPC header is valid
+ *
+ * @param header - RPC request header
+ * @throws RpcServerException - header contains invalid values
+ */
+ private void checkRpcHeaders(RpcHeaderProtos.RpcRequestHeaderProto header)
+ throws RpcServerException {
+ if (!header.hasRpcOp()) {
+ String err = " IPC Server: No rpc op in rpcRequestHeader";
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ err);
+ }
+ if (header.getRpcOp() !=
+ RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
+ String err = "IPC Server does not implement rpc header operation" +
+ header.getRpcOp();
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ err);
+ }
+ // If we know the rpc kind, get its class so that we can deserialize
+ // (Note it would make more sense to have the handler deserialize but
+ // we continue with this original design.
+ if (!header.hasRpcKind()) {
+ String err = " IPC Server: No rpc kind in rpcRequestHeader";
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ err);
+ }
+ }
+
+ /**
+ * Process an RPC Request
+ * - the connection headers and context must have been already read.
+ * - Based on the rpcKind, decode the rpcRequest.
+ * - A successfully decoded RpcCall will be deposited in RPC-Q and
+ * its response will be sent later when the request is processed.
+ *
+ * @param header - RPC request header
+ * @param buffer - stream to request payload
+ * @throws RpcServerException - generally due to fatal rpc layer issues
+ * such as invalid header or deserialization error. The call queue
+ * may also throw a fatal or non-fatal exception on overflow.
+ * @throws IOException - fatal internal error that should/could not
+ * be sent to client.
+ * @throws InterruptedException
+ */
+ private void processRpcRequest(RpcHeaderProtos.RpcRequestHeaderProto header,
+ RpcWritable.Buffer buffer)
+ throws RpcServerException,
+ InterruptedException {
+ Class extends Writable> rpcRequestClass =
+ server.getRpcRequestWrapper(header.getRpcKind());
+ if (rpcRequestClass == null) {
+ Server.LOG.warn("Unknown rpc kind " + header.getRpcKind() +
+ " from client " + getHostAddress());
+ final String err = "Unknown rpc kind in rpc header" +
+ header.getRpcKind();
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ err);
+ }
+ Writable rpcRequest;
+ try { //Read the rpc request
+ rpcRequest = buffer.newInstance(rpcRequestClass, server.conf);
+ } catch (RpcServerException rse) { // lets tests inject failures.
+ throw rse;
+ } catch (Throwable t) { // includes runtime exception from newInstance
+ Server.LOG.warn("Unable to read call parameters for client " +
+ getHostAddress() + "on connection protocol " +
+ this.protocolName + " for rpcKind " + header.getRpcKind(), t);
+ String err =
+ "IPC server unable to read call parameters: " + t.getMessage();
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
+ err);
+ }
+
+ Span span = null;
+ if (header.hasTraceInfo()) {
+ RpcHeaderProtos.RPCTraceInfoProto traceInfoProto = header.getTraceInfo();
+ if (traceInfoProto.hasSpanContext()) {
+ if (server.tracer == null) {
+ server.setTracer(Tracer.curThreadTracer());
+ }
+ if (server.tracer != null) {
+ // If the incoming RPC included tracing info, always continue the
+ // trace
+ SpanContext spanCtx = TraceUtils.byteStringToSpanContext(
+ traceInfoProto.getSpanContext());
+ if (spanCtx != null) {
+ span = server.tracer.newSpan(
+ RpcClientUtil.toTraceName(rpcRequest.toString()), spanCtx);
+ }
+ }
+ }
+ }
+
+ CallerContext callerContext = null;
+ if (header.hasCallerContext()) {
+ callerContext =
+ new CallerContext.Builder(header.getCallerContext().getContext())
+ .setSignature(header.getCallerContext().getSignature()
+ .toByteArray())
+ .build();
+ }
+
+ Server.RpcCall call = server.getRpcCall(this, header.getCallId(),
+ header.getRetryCount(), rpcRequest,
+ ProtoUtil.convert(header.getRpcKind()),
+ header.getClientId().toByteArray(), span, callerContext);
+
+ // Save the priority level assignment by the scheduler
+ call.setPriorityLevel(server.callQueue.getPriorityLevel(call));
+ call.markCallCoordinated(false);
+ if (server.alignmentContext != null && call.rpcRequest != null &&
+ (call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) {
+ // if call.rpcRequest is not RpcProtobufRequest, will skip the following
+ // step and treat the call as uncoordinated. As currently only certain
+ // ClientProtocol methods request made through RPC protobuf needs to be
+ // coordinated.
+ String methodName;
+ String protoName;
+ ProtobufRpcEngine2.RpcProtobufRequest req =
+ (ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest;
+ try {
+ methodName = req.getRequestHeader().getMethodName();
+ protoName = req.getRequestHeader().getDeclaringClassProtocolName();
+ if (server.alignmentContext.isCoordinatedCall(protoName, methodName)) {
+ call.markCallCoordinated(true);
+ long stateId;
+ stateId = server.alignmentContext.receiveRequestState(
+ header, server.getMaxIdleTime());
+ call.setClientStateId(stateId);
+ }
+ } catch (IOException ioe) {
+ throw new RpcServerException("Processing RPC request caught ", ioe);
+ }
+ }
+
+ try {
+ server.internalQueueCall(call);
+ } catch (RpcServerException rse) {
+ throw rse;
+ } catch (IOException ioe) {
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_RPC_SERVER,
+ ioe);
+ }
+ incRpcCount(); // Increment the rpc count
+ }
+
+ /**
+ * Establish RPC connection setup by negotiating SASL if required, then
+ * reading and authorizing the connection header
+ *
+ * @param header - RPC header
+ * @param buffer - stream to request payload
+ * @throws RpcServerException - setup failed due to SASL
+ * negotiation failure, premature or invalid connection context,
+ * or other state errors. This exception needs to be sent to the
+ * client.
+ * @throws IOException - failed to send a response back to the client
+ * @throws InterruptedException
+ */
+ private void processRpcOutOfBandRequest(
+ RpcHeaderProtos.RpcRequestHeaderProto header,
+ RpcWritable.Buffer buffer) throws RpcServerException,
+ IOException, InterruptedException {
+ final int callId = header.getCallId();
+ if (callId == CONNECTION_CONTEXT_CALL_ID) {
+ // SASL must be established prior to connection context
+ if (authProtocol == Server.AuthProtocol.SASL && !saslContextEstablished) {
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Connection header sent during SASL negotiation");
+ }
+ // read and authorize the user
+ processConnectionContext(buffer);
+ } else if (callId == Server.AuthProtocol.SASL.callId) {
+ // if client was switched to simple, ignore first SASL message
+ if (authProtocol != Server.AuthProtocol.SASL) {
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "SASL protocol not requested by client");
+ }
+ saslReadAndProcess(buffer);
+ } else if (callId == PING_CALL_ID) {
+ Server.LOG.debug("Received ping message");
+ } else {
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
+ "Unknown out of band call #" + callId);
+ }
+ }
+
+ /**
+ * Authorize proxy users to access this server
+ *
+ * @throws RpcServerException - user is not allowed to proxy
+ */
+ private void authorizeConnection() throws RpcServerException {
+ try {
+ // If auth method is TOKEN, the token was obtained by the
+ // real user for the effective user, therefore not required to
+ // authorize real user. doAs is allowed only for simple or kerberos
+ // authentication
+ if (user != null && user.getRealUser() != null
+ && (authMethod != SaslRpcServer.AuthMethod.TOKEN)) {
+ ProxyUsers.authorize(user, this.getHostAddress());
+ }
+ server.authorize(user, protocolName, getHostInetAddress());
+ if (Server.LOG.isDebugEnabled()) {
+ Server.LOG.debug("Successfully authorized " + connectionContext);
+ }
+ server.rpcMetrics.incrAuthorizationSuccesses();
+ } catch (AuthorizationException ae) {
+ Server.LOG.info("Connection from " + this
+ + " for protocol " + connectionContext.getProtocol()
+ + " is unauthorized for user " + user);
+ server.rpcMetrics.incrAuthorizationFailures();
+ throw new Server.FatalRpcServerException(
+ RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED,
+ ae);
+ }
+ }
+
+ /**
+ * Decode the a protobuf from the given input stream
+ *
+ * @return Message - decoded protobuf
+ * @throws RpcServerException - deserialization failed
+ */
+ @SuppressWarnings("unchecked")
+