Skip to content

Commit 82ba6f9

Browse files
author
Michael Allman
committed
[SPARK-17231][CORE] Avoid building debug or trace log messages unless
the respective log level is enabled
1 parent 29952ed commit 82ba6f9

File tree

9 files changed

+55
-45
lines changed

9 files changed

+55
-45
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.apache.spark.network.protocol.RpcRequest;
4444
import org.apache.spark.network.protocol.StreamChunkId;
4545
import org.apache.spark.network.protocol.StreamRequest;
46-
import org.apache.spark.network.util.NettyUtils;
46+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4747

4848
/**
4949
* Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
@@ -135,9 +135,10 @@ public void fetchChunk(
135135
long streamId,
136136
final int chunkIndex,
137137
final ChunkReceivedCallback callback) {
138-
final String serverAddr = NettyUtils.getRemoteAddress(channel);
139138
final long startTime = System.currentTimeMillis();
140-
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
139+
if (logger.isDebugEnabled()) {
140+
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
141+
}
141142

142143
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
143144
handler.addFetchRequest(streamChunkId, callback);
@@ -148,11 +149,13 @@ public void fetchChunk(
148149
public void operationComplete(ChannelFuture future) throws Exception {
149150
if (future.isSuccess()) {
150151
long timeTaken = System.currentTimeMillis() - startTime;
151-
logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
152-
timeTaken);
152+
if (logger.isTraceEnabled()) {
153+
logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel),
154+
timeTaken);
155+
}
153156
} else {
154157
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
155-
serverAddr, future.cause());
158+
getRemoteAddress(channel), future.cause());
156159
logger.error(errorMsg, future.cause());
157160
handler.removeFetchRequest(streamChunkId);
158161
channel.close();
@@ -173,9 +176,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
173176
* @param callback Object to call with the stream data.
174177
*/
175178
public void stream(final String streamId, final StreamCallback callback) {
176-
final String serverAddr = NettyUtils.getRemoteAddress(channel);
177179
final long startTime = System.currentTimeMillis();
178-
logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
180+
if (logger.isDebugEnabled()) {
181+
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
182+
}
179183

180184
// Need to synchronize here so that the callback is added to the queue and the RPC is
181185
// written to the socket atomically, so that callbacks are called in the right order
@@ -188,11 +192,13 @@ public void stream(final String streamId, final StreamCallback callback) {
188192
public void operationComplete(ChannelFuture future) throws Exception {
189193
if (future.isSuccess()) {
190194
long timeTaken = System.currentTimeMillis() - startTime;
191-
logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
192-
timeTaken);
195+
if (logger.isTraceEnabled()) {
196+
logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel),
197+
timeTaken);
198+
}
193199
} else {
194200
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
195-
serverAddr, future.cause());
201+
getRemoteAddress(channel), future.cause());
196202
logger.error(errorMsg, future.cause());
197203
channel.close();
198204
try {
@@ -215,9 +221,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
215221
* @return The RPC's id.
216222
*/
217223
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
218-
final String serverAddr = NettyUtils.getRemoteAddress(channel);
219224
final long startTime = System.currentTimeMillis();
220-
logger.trace("Sending RPC to {}", serverAddr);
225+
if (logger.isTraceEnabled()) {
226+
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
227+
}
221228

222229
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
223230
handler.addRpcRequest(requestId, callback);
@@ -228,10 +235,12 @@ public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
228235
public void operationComplete(ChannelFuture future) throws Exception {
229236
if (future.isSuccess()) {
230237
long timeTaken = System.currentTimeMillis() - startTime;
231-
logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
238+
if (logger.isTraceEnabled()) {
239+
logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken);
240+
}
232241
} else {
233242
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
234-
serverAddr, future.cause());
243+
getRemoteAddress(channel), future.cause());
235244
logger.error(errorMsg, future.cause());
236245
handler.removeRpcRequest(requestId);
237246
channel.close();

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
195195

196196
/** Create a completely new {@link TransportClient} to the remote address. */
197197
private TransportClient createClient(InetSocketAddress address) throws IOException {
198-
logger.debug("Creating new connection to " + address);
198+
logger.debug("Creating new connection to {}", address);
199199

200200
Bootstrap bootstrap = new Bootstrap();
201201
bootstrap.group(workerGroup)

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.spark.network.protocol.StreamFailure;
3939
import org.apache.spark.network.protocol.StreamResponse;
4040
import org.apache.spark.network.server.MessageHandler;
41-
import org.apache.spark.network.util.NettyUtils;
41+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4242
import org.apache.spark.network.util.TransportFrameDecoder;
4343

4444
/**
@@ -122,7 +122,7 @@ public void channelActive() {
122122
@Override
123123
public void channelInactive() {
124124
if (numOutstandingRequests() > 0) {
125-
String remoteAddress = NettyUtils.getRemoteAddress(channel);
125+
String remoteAddress = getRemoteAddress(channel);
126126
logger.error("Still have {} requests outstanding when connection from {} is closed",
127127
numOutstandingRequests(), remoteAddress);
128128
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
@@ -132,7 +132,7 @@ public void channelInactive() {
132132
@Override
133133
public void exceptionCaught(Throwable cause) {
134134
if (numOutstandingRequests() > 0) {
135-
String remoteAddress = NettyUtils.getRemoteAddress(channel);
135+
String remoteAddress = getRemoteAddress(channel);
136136
logger.error("Still have {} requests outstanding when connection from {} is closed",
137137
numOutstandingRequests(), remoteAddress);
138138
failOutstandingRequests(cause);
@@ -141,13 +141,12 @@ public void exceptionCaught(Throwable cause) {
141141

142142
@Override
143143
public void handle(ResponseMessage message) throws Exception {
144-
String remoteAddress = NettyUtils.getRemoteAddress(channel);
145144
if (message instanceof ChunkFetchSuccess) {
146145
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
147146
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
148147
if (listener == null) {
149148
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
150-
resp.streamChunkId, remoteAddress);
149+
resp.streamChunkId, getRemoteAddress(channel));
151150
resp.body().release();
152151
} else {
153152
outstandingFetches.remove(resp.streamChunkId);
@@ -159,7 +158,7 @@ public void handle(ResponseMessage message) throws Exception {
159158
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
160159
if (listener == null) {
161160
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
162-
resp.streamChunkId, remoteAddress, resp.errorString);
161+
resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
163162
} else {
164163
outstandingFetches.remove(resp.streamChunkId);
165164
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
@@ -170,7 +169,7 @@ public void handle(ResponseMessage message) throws Exception {
170169
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
171170
if (listener == null) {
172171
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
173-
resp.requestId, remoteAddress, resp.body().size());
172+
resp.requestId, getRemoteAddress(channel), resp.body().size());
174173
} else {
175174
outstandingRpcs.remove(resp.requestId);
176175
try {
@@ -184,7 +183,7 @@ public void handle(ResponseMessage message) throws Exception {
184183
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
185184
if (listener == null) {
186185
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
187-
resp.requestId, remoteAddress, resp.errorString);
186+
resp.requestId, getRemoteAddress(channel), resp.errorString);
188187
} else {
189188
outstandingRpcs.remove(resp.requestId);
190189
listener.onFailure(new RuntimeException(resp.errorString));

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
3939
Message.Type msgType = Message.Type.decode(in);
4040
Message decoded = decode(msgType, in);
4141
assert decoded.type() == msgType;
42-
logger.trace("Received message " + msgType + ": " + decoded);
42+
logger.trace("Received message {}: {}", msgType, decoded);
4343
out.add(decoded);
4444
}
4545

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.spark.network.protocol.Message;
3030
import org.apache.spark.network.protocol.RequestMessage;
3131
import org.apache.spark.network.protocol.ResponseMessage;
32-
import org.apache.spark.network.util.NettyUtils;
32+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
3333

3434
/**
3535
* The single Transport-level Channel handler which is used for delegating requests to the
@@ -76,7 +76,7 @@ public TransportClient getClient() {
7676

7777
@Override
7878
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
79-
logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
79+
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
8080
cause);
8181
requestHandler.exceptionCaught(cause);
8282
responseHandler.exceptionCaught(cause);
@@ -139,7 +139,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
139139
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
140140
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
141141
if (responseHandler.numOutstandingRequests() > 0) {
142-
String address = NettyUtils.getRemoteAddress(ctx.channel());
142+
String address = getRemoteAddress(ctx.channel());
143143
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
144144
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
145145
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.server;
1919

20+
import java.net.SocketAddress;
2021
import java.nio.ByteBuffer;
2122

2223
import com.google.common.base.Throwables;
@@ -42,7 +43,7 @@
4243
import org.apache.spark.network.protocol.StreamFailure;
4344
import org.apache.spark.network.protocol.StreamRequest;
4445
import org.apache.spark.network.protocol.StreamResponse;
45-
import org.apache.spark.network.util.NettyUtils;
46+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4647

4748
/**
4849
* A handler that processes requests from clients and writes chunk data back. Each handler is
@@ -114,9 +115,9 @@ public void handle(RequestMessage request) {
114115
}
115116

116117
private void processFetchRequest(final ChunkFetchRequest req) {
117-
final String client = NettyUtils.getRemoteAddress(channel);
118-
119-
logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
118+
if (logger.isTraceEnabled()) {
119+
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId);
120+
}
120121

121122
ManagedBuffer buf;
122123
try {
@@ -125,7 +126,7 @@ private void processFetchRequest(final ChunkFetchRequest req) {
125126
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
126127
} catch (Exception e) {
127128
logger.error(String.format(
128-
"Error opening block %s for request from %s", req.streamChunkId, client), e);
129+
"Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e);
129130
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
130131
return;
131132
}
@@ -134,13 +135,12 @@ private void processFetchRequest(final ChunkFetchRequest req) {
134135
}
135136

136137
private void processStreamRequest(final StreamRequest req) {
137-
final String client = NettyUtils.getRemoteAddress(channel);
138138
ManagedBuffer buf;
139139
try {
140140
buf = streamManager.openStream(req.streamId);
141141
} catch (Exception e) {
142142
logger.error(String.format(
143-
"Error opening stream %s for request from %s", req.streamId, client), e);
143+
"Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
144144
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
145145
return;
146146
}
@@ -189,13 +189,13 @@ private void processOneWayMessage(OneWayMessage req) {
189189
* it will be logged and the channel closed.
190190
*/
191191
private void respond(final Encodable result) {
192-
final String remoteAddress = channel.remoteAddress().toString();
192+
final SocketAddress remoteAddress = channel.remoteAddress();
193193
channel.writeAndFlush(result).addListener(
194194
new ChannelFutureListener() {
195195
@Override
196196
public void operationComplete(ChannelFuture future) throws Exception {
197197
if (future.isSuccess()) {
198-
logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
198+
logger.trace("Sent result {} to client {}", result, remoteAddress);
199199
} else {
200200
logger.error(String.format("Error sending result %s to %s; closing connection",
201201
result, remoteAddress), future.cause());

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
130130
channelFuture.syncUninterruptibly();
131131

132132
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
133-
logger.debug("Shuffle server started on port :" + port);
133+
logger.debug("Shuffle server started on port: {}", port);
134134
}
135135

136136
@Override

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.apache.spark.network.server.StreamManager;
4343
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
4444
import org.apache.spark.network.shuffle.protocol.*;
45-
import org.apache.spark.network.util.NettyUtils;
45+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4646
import org.apache.spark.network.util.TransportConf;
4747

4848

@@ -101,11 +101,13 @@ protected void handleMessage(
101101
blocks.add(block);
102102
}
103103
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
104-
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
105-
streamId,
106-
msg.blockIds.length,
107-
client.getClientId(),
108-
NettyUtils.getRemoteAddress(client.getChannel()));
104+
if (logger.isTraceEnabled()) {
105+
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
106+
streamId,
107+
msg.blockIds.length,
108+
client.getClientId(),
109+
getRemoteAddress(client.getChannel()));
110+
}
109111
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
110112
metrics.blockTransferRateBytes.mark(totalBlockSize);
111113
} finally {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void deleteExecutorDirs(String[] dirs) {
267267
for (String localDir : dirs) {
268268
try {
269269
JavaUtils.deleteRecursively(new File(localDir));
270-
logger.debug("Successfully cleaned up directory: " + localDir);
270+
logger.debug("Successfully cleaned up directory: {}", localDir);
271271
} catch (Exception e) {
272272
logger.error("Failed to delete directory: " + localDir, e);
273273
}

0 commit comments

Comments
 (0)