Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,8 @@ public static void registerProtocolEngine(RPC.RpcKind rpcKind,
throw new IllegalArgumentException("ReRegistration of rpcKind: " +
rpcKind);
}
if (LOG.isDebugEnabled()) {
LOG.debug("rpcKind=" + rpcKind +
", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
", rpcInvoker=" + rpcInvoker);
}
LOG.debug("rpcKind={}, rpcRequestWrapperClass={}, rpcInvoker={}.",
rpcKind, rpcRequestWrapperClass, rpcInvoker);
}

public Class<? extends Writable> getRpcRequestWrapper(
Expand Down Expand Up @@ -1212,9 +1209,7 @@ public Void run() throws Exception {
deltaNanos = Time.monotonicNowNanos() - startNanos;
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Deferring response for callId: " + this.callId);
}
LOG.debug("Deferring response for callId: {}", this.callId);
}
return null;
}
Expand Down Expand Up @@ -1711,9 +1706,7 @@ private void doRunLoop() {
// If there were some calls that have not been sent out for a
// long time, discard them.
//
if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses.");
}
LOG.debug("Checking for old call responses.");
ArrayList<RpcCall> calls;

// get the list of channels from list of keys.
Expand Down Expand Up @@ -1813,9 +1806,8 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
//
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
}

LOG.debug("{}: responding to {}.", Thread.currentThread().getName(), call);
//
// Send as much data as we can in the non-blocking fashion
//
Expand All @@ -1832,10 +1824,8 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
LOG.debug("{}: responding to {} Wrote {} bytes.",
Thread.currentThread().getName(), call, numBytes);
} else {
//
// If we were unable to write the entire response out, then
Expand All @@ -1860,10 +1850,8 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
decPending();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote partial " + numBytes + " bytes.");
}
LOG.debug("{}: responding to {} Wrote partial {} bytes.",
Thread.currentThread().getName(), call, numBytes);
}
error = false; // everything went off well
}
Expand Down Expand Up @@ -2209,13 +2197,11 @@ private void saslProcess(RpcSaslProto saslMessage)

if (saslServer != null && saslServer.isComplete()) {
if (LOG.isDebugEnabled()) {
LOG.debug("SASL server context established. Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
LOG.debug("SASL server context established. Negotiated QoP is {}.",
saslServer.getNegotiatedProperty(Sasl.QOP));
}
user = getAuthorizedUgi(saslServer.getAuthorizationID());
if (LOG.isDebugEnabled()) {
LOG.debug("SASL server successfully authenticated client: " + user);
}
LOG.debug("SASL server successfully authenticated client: {}.", user);
rpcMetrics.incrAuthenticationSuccesses();
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user + " from " + toString());
saslContextEstablished = true;
Expand Down Expand Up @@ -2320,10 +2306,8 @@ private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)
throw new SaslException("Client did not send a token");
}
byte[] saslToken = saslMessage.getToken().toByteArray();
if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.evaluateResponse()");
}
LOG.debug("Have read input token of size {} for processing by saslServer.evaluateResponse()",
saslToken.length);
saslToken = saslServer.evaluateResponse(saslToken);
return buildSaslResponse(
saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
Expand All @@ -2338,9 +2322,8 @@ private void switchToSimple() {

private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will send " + state + " token of size "
+ ((replyToken != null) ? replyToken.length : null)
+ " from saslServer.");
LOG.debug("Will send {} token of size {} from saslServer.", state,
((replyToken != null) ? replyToken.length : null));
}
RpcSaslProto.Builder response = RpcSaslProto.newBuilder();
response.setState(state);
Expand Down Expand Up @@ -2664,10 +2647,8 @@ private void processConnectionContext(RpcWritable.Buffer buffer)
*/
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + inBuf.length
+ " for processing by saslServer.unwrap()");
}
LOG.debug("Have read input token of size {} for processing by saslServer.unwrap()",
inBuf.length);
inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
inBuf));
Expand Down Expand Up @@ -2729,9 +2710,7 @@ private void processOneRpc(ByteBuffer bb)
getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
callId = header.getCallId();
retry = header.getRetryCount();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
LOG.debug(" got #{}", callId);
checkRpcHeaders(header);

if (callId < 0) { // callIds typically used during connection setup
Expand All @@ -2746,11 +2725,8 @@ private void processOneRpc(ByteBuffer bb)
} catch (RpcServerException rse) {
// inform client of error, but do not rethrow else non-fatal
// exceptions will close connection!
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() +
": processOneRpc from client " + this +
" threw exception [" + rse + "]");
}
LOG.debug("{}: processOneRpc from client {} threw exception [{}]",
Thread.currentThread().getName(), this, rse);
// use the wrapped exception if there is one.
Throwable t = (rse.getCause() != null) ? rse.getCause() : rse;
final RpcCall call = new RpcCall(this, callId, retry);
Expand Down Expand Up @@ -2962,9 +2938,7 @@ private void authorizeConnection() throws RpcServerException {
ProxyUsers.authorize(user, this.getHostAddress());
}
authorize(user, protocolName, getHostInetAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + connectionContext);
}
LOG.debug("Successfully authorized {}.", connectionContext);
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
LOG.info("Connection from " + this
Expand Down Expand Up @@ -3081,7 +3055,7 @@ public Handler(int instanceNumber) {

@Override
public void run() {
LOG.debug(Thread.currentThread().getName() + ": starting");
LOG.debug("{}: starting", Thread.currentThread().getName());
SERVER.set(Server.this);
while (running) {
TraceScope traceScope = null;
Expand Down Expand Up @@ -3115,9 +3089,7 @@ public void run() {
call = null;
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind);
CurCall.set(call);
if (call.span != null) {
traceScope = tracer.activateSpan(call.span);
Expand Down Expand Up @@ -3152,15 +3124,14 @@ public void run() {
IOUtils.cleanupWithLogger(LOG, traceScope);
if (call != null) {
updateMetrics(call, startTimeNanos, connDropped);
ProcessingDetails.LOG.debug(
"Served: [{}]{} name={} user={} details={}",
ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}",
call, (call.isResponseDeferred() ? ", deferred" : ""),
call.getDetailedMetricsName(), call.getRemoteUser(),
call.getProcessingDetails());
}
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
LOG.debug("{}: exiting", Thread.currentThread().getName());
}

private void requeueCall(Call call)
Expand Down Expand Up @@ -3389,14 +3360,13 @@ private List<AuthMethod> getAuthMethods(SecretManager<?> secretManager,
" authentication requires a secret manager");
}
} else if (secretManager != null) {
LOG.debug(AuthenticationMethod.TOKEN +
" authentication enabled for secret manager");
LOG.debug("{} authentication enabled for secret manager", AuthenticationMethod.TOKEN);
// most preferred, go to the front of the line!
authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod());
}
authMethods.add(confAuthenticationMethod.getAuthMethod());

LOG.debug("Server accepts auth methods:" + authMethods);
LOG.debug("Server accepts auth methods:{}", authMethods);
return authMethods;
}

Expand Down Expand Up @@ -3556,9 +3526,7 @@ private void wrapWithSasl(RpcCall call) throws IOException {
synchronized (call.connection.saslServer) {
token = call.connection.saslServer.wrap(token, 0, token.length);
}
if (LOG.isDebugEnabled())
LOG.debug("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
LOG.debug("Adding saslServer wrapped token of size {} as call response.", token.length);
// rebuild with sasl header and payload
RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
.setCallId(AuthProtocol.SASL.callId)
Expand Down Expand Up @@ -4004,21 +3972,17 @@ Connection register(SocketChannel channel, int ingressPort,
Connection connection = new Connection(channel, Time.now(),
ingressPort, isOnAuxiliaryPort);
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
"; # active connections: " + size() +
"; # queued calls: " + callQueue.size());
}
LOG.debug("Server connection from {}; # active connections: {}; # queued calls: {}.",
connection, size(), callQueue.size());
return connection;
}

boolean close(Connection connection) {
boolean exists = remove(connection);
if (exists) {
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() +
": disconnecting client " + connection +
". Number of active connections: "+ size());
LOG.debug("{}: disconnecting client {}. Number of active connections: {}.",
Thread.currentThread().getName(), connection, size());
}
// only close if actually removed to avoid double-closing due
// to possible races
Expand Down Expand Up @@ -4080,9 +4044,7 @@ public void run() {
if (!running) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName()+": task running");
}
LOG.debug("{}: task running", Thread.currentThread().getName());
try {
closeIdle(false);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,7 @@ protected Node chooseRandom(final String scope, String excludedScope,
}
}
if (numOfDatanodes <= 0) {
LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\")."
+ " numOfDatanodes={}",
LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\"). numOfDatanodes={}",
scope, excludedScope, numOfDatanodes);
return null;
}
Expand All @@ -539,10 +538,12 @@ protected Node chooseRandom(final String scope, String excludedScope,
netlock.readLock().unlock();
}
}
LOG.debug("Choosing random from {} available nodes on node {},"
+ " scope={}, excludedScope={}, excludeNodes={}. numOfDatanodes={}.",
availableNodes, innerNode, scope, excludedScope, excludedNodes,
numOfDatanodes);
if (LOG.isDebugEnabled()) {
LOG.debug("Choosing random from {} available nodes on node {}, scope={},"
+ " excludedScope={}, excludeNodes={}. numOfDatanodes={}.",
availableNodes, innerNode, scope, excludedScope, excludedNodes,
numOfDatanodes);
}
Node ret = null;
if (availableNodes > 0) {
ret = chooseRandom(innerNode, node, excludedNodes, numOfDatanodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,9 @@ public void recoverUnfinalizedSegments() throws IOException {
LOG.info("Successfully started new epoch " + loggers.getEpoch());

if (LOG.isDebugEnabled()) {
LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
QuorumCall.mapToString(resps));
LOG.debug("newEpoch({}) responses:\n{}", loggers.getEpoch(), QuorumCall.mapToString(resps));
}

long mostRecentSegmentTxId = Long.MIN_VALUE;
for (NewEpochResponseProto r : resps.values()) {
if (r.hasLastSegmentTxId()) {
Expand Down Expand Up @@ -518,10 +517,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
// the cache used for RPC calls is not enabled; fall back to using the
// streaming mechanism to serve such requests
if (inProgressOk && inProgressTailingEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Tailing edits starting from txn ID " + fromTxnId +
" via RPC mechanism");
}
LOG.debug("Tailing edits starting from txn ID {} via RPC mechanism", fromTxnId);
try {
Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
Expand Down Expand Up @@ -585,8 +581,8 @@ private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
if (maxAllowedTxns == 0) {
LOG.debug("No new edits available in logs; requested starting from " +
"ID {}", fromTxnId);
LOG.debug("No new edits available in logs; requested starting from ID {}",
fromTxnId);
return;
}
LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
Expand Down
Loading