Skip to content

Commit f209310

Browse files
Michael Allmanzsxwing
authored andcommitted
[SPARK-17231][CORE] Avoid building debug or trace log messages unless the respective log level is enabled
(This PR addresses https://issues.apache.org/jira/browse/SPARK-17231) ## What changes were proposed in this pull request? While debugging the performance of a large GraphX connected components computation, we found several places in the `network-common` and `network-shuffle` code bases where trace or debug log messages are constructed even if the respective log level is disabled. According to YourKit, these constructions were creating substantial churn in the eden region. Refactoring the respective code to avoid these unnecessary constructions except where necessary led to a modest but measurable reduction in our job's task time, GC time and the ratio thereof. ## How was this patch tested? We computed the connected components of a graph with about 2.6 billion vertices and 1.7 billion edges four times. We used four different EC2 clusters each with 8 r3.8xl worker nodes. Two test runs used Spark master. Two used Spark master + this PR. The results from the first test run, master and master+PR: ![master](https://cloud.githubusercontent.com/assets/833693/17951634/7471cbca-6a18-11e6-9c26-78afe9319685.jpg) ![logging_perf_improvements](https://cloud.githubusercontent.com/assets/833693/17951632/7467844e-6a18-11e6-9a0e-053dc7650413.jpg) The results from the second test run, master and master+PR: ![master 2](https://cloud.githubusercontent.com/assets/833693/17951633/746dd6aa-6a18-11e6-8e27-606680b3f105.jpg) ![logging_perf_improvements 2](https://cloud.githubusercontent.com/assets/833693/17951631/74488710-6a18-11e6-8a32-08692f373386.jpg) Though modest, I believe these results are significant. Author: Michael Allman <[email protected]> Closes #14798 from mallman/spark-17231-logging_perf_improvements.
1 parent d2ae639 commit f209310

File tree

9 files changed

+55
-45
lines changed

9 files changed

+55
-45
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.apache.spark.network.protocol.RpcRequest;
4444
import org.apache.spark.network.protocol.StreamChunkId;
4545
import org.apache.spark.network.protocol.StreamRequest;
46-
import org.apache.spark.network.util.NettyUtils;
46+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4747

4848
/**
4949
* Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
@@ -135,9 +135,10 @@ public void fetchChunk(
135135
long streamId,
136136
final int chunkIndex,
137137
final ChunkReceivedCallback callback) {
138-
final String serverAddr = NettyUtils.getRemoteAddress(channel);
139138
final long startTime = System.currentTimeMillis();
140-
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
139+
if (logger.isDebugEnabled()) {
140+
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
141+
}
141142

142143
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
143144
handler.addFetchRequest(streamChunkId, callback);
@@ -148,11 +149,13 @@ public void fetchChunk(
148149
public void operationComplete(ChannelFuture future) throws Exception {
149150
if (future.isSuccess()) {
150151
long timeTaken = System.currentTimeMillis() - startTime;
151-
logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
152-
timeTaken);
152+
if (logger.isTraceEnabled()) {
153+
logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel),
154+
timeTaken);
155+
}
153156
} else {
154157
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
155-
serverAddr, future.cause());
158+
getRemoteAddress(channel), future.cause());
156159
logger.error(errorMsg, future.cause());
157160
handler.removeFetchRequest(streamChunkId);
158161
channel.close();
@@ -173,9 +176,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
173176
* @param callback Object to call with the stream data.
174177
*/
175178
public void stream(final String streamId, final StreamCallback callback) {
176-
final String serverAddr = NettyUtils.getRemoteAddress(channel);
177179
final long startTime = System.currentTimeMillis();
178-
logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
180+
if (logger.isDebugEnabled()) {
181+
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
182+
}
179183

180184
// Need to synchronize here so that the callback is added to the queue and the RPC is
181185
// written to the socket atomically, so that callbacks are called in the right order
@@ -188,11 +192,13 @@ public void stream(final String streamId, final StreamCallback callback) {
188192
public void operationComplete(ChannelFuture future) throws Exception {
189193
if (future.isSuccess()) {
190194
long timeTaken = System.currentTimeMillis() - startTime;
191-
logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
192-
timeTaken);
195+
if (logger.isTraceEnabled()) {
196+
logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel),
197+
timeTaken);
198+
}
193199
} else {
194200
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
195-
serverAddr, future.cause());
201+
getRemoteAddress(channel), future.cause());
196202
logger.error(errorMsg, future.cause());
197203
channel.close();
198204
try {
@@ -215,9 +221,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
215221
* @return The RPC's id.
216222
*/
217223
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
218-
final String serverAddr = NettyUtils.getRemoteAddress(channel);
219224
final long startTime = System.currentTimeMillis();
220-
logger.trace("Sending RPC to {}", serverAddr);
225+
if (logger.isTraceEnabled()) {
226+
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
227+
}
221228

222229
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
223230
handler.addRpcRequest(requestId, callback);
@@ -228,10 +235,12 @@ public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
228235
public void operationComplete(ChannelFuture future) throws Exception {
229236
if (future.isSuccess()) {
230237
long timeTaken = System.currentTimeMillis() - startTime;
231-
logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
238+
if (logger.isTraceEnabled()) {
239+
logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken);
240+
}
232241
} else {
233242
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
234-
serverAddr, future.cause());
243+
getRemoteAddress(channel), future.cause());
235244
logger.error(errorMsg, future.cause());
236245
handler.removeRpcRequest(requestId);
237246
channel.close();

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
195195

196196
/** Create a completely new {@link TransportClient} to the remote address. */
197197
private TransportClient createClient(InetSocketAddress address) throws IOException {
198-
logger.debug("Creating new connection to " + address);
198+
logger.debug("Creating new connection to {}", address);
199199

200200
Bootstrap bootstrap = new Bootstrap();
201201
bootstrap.group(workerGroup)

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.spark.network.protocol.StreamFailure;
3939
import org.apache.spark.network.protocol.StreamResponse;
4040
import org.apache.spark.network.server.MessageHandler;
41-
import org.apache.spark.network.util.NettyUtils;
41+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4242
import org.apache.spark.network.util.TransportFrameDecoder;
4343

4444
/**
@@ -122,7 +122,7 @@ public void channelActive() {
122122
@Override
123123
public void channelInactive() {
124124
if (numOutstandingRequests() > 0) {
125-
String remoteAddress = NettyUtils.getRemoteAddress(channel);
125+
String remoteAddress = getRemoteAddress(channel);
126126
logger.error("Still have {} requests outstanding when connection from {} is closed",
127127
numOutstandingRequests(), remoteAddress);
128128
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
@@ -132,7 +132,7 @@ public void channelInactive() {
132132
@Override
133133
public void exceptionCaught(Throwable cause) {
134134
if (numOutstandingRequests() > 0) {
135-
String remoteAddress = NettyUtils.getRemoteAddress(channel);
135+
String remoteAddress = getRemoteAddress(channel);
136136
logger.error("Still have {} requests outstanding when connection from {} is closed",
137137
numOutstandingRequests(), remoteAddress);
138138
failOutstandingRequests(cause);
@@ -141,13 +141,12 @@ public void exceptionCaught(Throwable cause) {
141141

142142
@Override
143143
public void handle(ResponseMessage message) throws Exception {
144-
String remoteAddress = NettyUtils.getRemoteAddress(channel);
145144
if (message instanceof ChunkFetchSuccess) {
146145
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
147146
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
148147
if (listener == null) {
149148
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
150-
resp.streamChunkId, remoteAddress);
149+
resp.streamChunkId, getRemoteAddress(channel));
151150
resp.body().release();
152151
} else {
153152
outstandingFetches.remove(resp.streamChunkId);
@@ -159,7 +158,7 @@ public void handle(ResponseMessage message) throws Exception {
159158
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
160159
if (listener == null) {
161160
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
162-
resp.streamChunkId, remoteAddress, resp.errorString);
161+
resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
163162
} else {
164163
outstandingFetches.remove(resp.streamChunkId);
165164
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
@@ -170,7 +169,7 @@ public void handle(ResponseMessage message) throws Exception {
170169
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
171170
if (listener == null) {
172171
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
173-
resp.requestId, remoteAddress, resp.body().size());
172+
resp.requestId, getRemoteAddress(channel), resp.body().size());
174173
} else {
175174
outstandingRpcs.remove(resp.requestId);
176175
try {
@@ -184,7 +183,7 @@ public void handle(ResponseMessage message) throws Exception {
184183
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
185184
if (listener == null) {
186185
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
187-
resp.requestId, remoteAddress, resp.errorString);
186+
resp.requestId, getRemoteAddress(channel), resp.errorString);
188187
} else {
189188
outstandingRpcs.remove(resp.requestId);
190189
listener.onFailure(new RuntimeException(resp.errorString));

common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
3939
Message.Type msgType = Message.Type.decode(in);
4040
Message decoded = decode(msgType, in);
4141
assert decoded.type() == msgType;
42-
logger.trace("Received message " + msgType + ": " + decoded);
42+
logger.trace("Received message {}: {}", msgType, decoded);
4343
out.add(decoded);
4444
}
4545

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.spark.network.protocol.Message;
3030
import org.apache.spark.network.protocol.RequestMessage;
3131
import org.apache.spark.network.protocol.ResponseMessage;
32-
import org.apache.spark.network.util.NettyUtils;
32+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
3333

3434
/**
3535
* The single Transport-level Channel handler which is used for delegating requests to the
@@ -76,7 +76,7 @@ public TransportClient getClient() {
7676

7777
@Override
7878
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
79-
logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
79+
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
8080
cause);
8181
requestHandler.exceptionCaught(cause);
8282
responseHandler.exceptionCaught(cause);
@@ -139,7 +139,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
139139
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
140140
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
141141
if (responseHandler.numOutstandingRequests() > 0) {
142-
String address = NettyUtils.getRemoteAddress(ctx.channel());
142+
String address = getRemoteAddress(ctx.channel());
143143
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
144144
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
145145
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network.server;
1919

20+
import java.net.SocketAddress;
2021
import java.nio.ByteBuffer;
2122

2223
import com.google.common.base.Throwables;
@@ -42,7 +43,7 @@
4243
import org.apache.spark.network.protocol.StreamFailure;
4344
import org.apache.spark.network.protocol.StreamRequest;
4445
import org.apache.spark.network.protocol.StreamResponse;
45-
import org.apache.spark.network.util.NettyUtils;
46+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4647

4748
/**
4849
* A handler that processes requests from clients and writes chunk data back. Each handler is
@@ -114,9 +115,9 @@ public void handle(RequestMessage request) {
114115
}
115116

116117
private void processFetchRequest(final ChunkFetchRequest req) {
117-
final String client = NettyUtils.getRemoteAddress(channel);
118-
119-
logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
118+
if (logger.isTraceEnabled()) {
119+
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId);
120+
}
120121

121122
ManagedBuffer buf;
122123
try {
@@ -125,7 +126,7 @@ private void processFetchRequest(final ChunkFetchRequest req) {
125126
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
126127
} catch (Exception e) {
127128
logger.error(String.format(
128-
"Error opening block %s for request from %s", req.streamChunkId, client), e);
129+
"Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e);
129130
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
130131
return;
131132
}
@@ -134,13 +135,12 @@ private void processFetchRequest(final ChunkFetchRequest req) {
134135
}
135136

136137
private void processStreamRequest(final StreamRequest req) {
137-
final String client = NettyUtils.getRemoteAddress(channel);
138138
ManagedBuffer buf;
139139
try {
140140
buf = streamManager.openStream(req.streamId);
141141
} catch (Exception e) {
142142
logger.error(String.format(
143-
"Error opening stream %s for request from %s", req.streamId, client), e);
143+
"Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
144144
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
145145
return;
146146
}
@@ -189,13 +189,13 @@ private void processOneWayMessage(OneWayMessage req) {
189189
* it will be logged and the channel closed.
190190
*/
191191
private void respond(final Encodable result) {
192-
final String remoteAddress = channel.remoteAddress().toString();
192+
final SocketAddress remoteAddress = channel.remoteAddress();
193193
channel.writeAndFlush(result).addListener(
194194
new ChannelFutureListener() {
195195
@Override
196196
public void operationComplete(ChannelFuture future) throws Exception {
197197
if (future.isSuccess()) {
198-
logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
198+
logger.trace("Sent result {} to client {}", result, remoteAddress);
199199
} else {
200200
logger.error(String.format("Error sending result %s to %s; closing connection",
201201
result, remoteAddress), future.cause());

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
130130
channelFuture.syncUninterruptibly();
131131

132132
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
133-
logger.debug("Shuffle server started on port :" + port);
133+
logger.debug("Shuffle server started on port: {}", port);
134134
}
135135

136136
@Override

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.apache.spark.network.server.StreamManager;
4343
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
4444
import org.apache.spark.network.shuffle.protocol.*;
45-
import org.apache.spark.network.util.NettyUtils;
45+
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
4646
import org.apache.spark.network.util.TransportConf;
4747

4848

@@ -101,11 +101,13 @@ protected void handleMessage(
101101
blocks.add(block);
102102
}
103103
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
104-
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
105-
streamId,
106-
msg.blockIds.length,
107-
client.getClientId(),
108-
NettyUtils.getRemoteAddress(client.getChannel()));
104+
if (logger.isTraceEnabled()) {
105+
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
106+
streamId,
107+
msg.blockIds.length,
108+
client.getClientId(),
109+
getRemoteAddress(client.getChannel()));
110+
}
109111
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
110112
metrics.blockTransferRateBytes.mark(totalBlockSize);
111113
} finally {

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ private void deleteExecutorDirs(String[] dirs) {
267267
for (String localDir : dirs) {
268268
try {
269269
JavaUtils.deleteRecursively(new File(localDir));
270-
logger.debug("Successfully cleaned up directory: " + localDir);
270+
logger.debug("Successfully cleaned up directory: {}", localDir);
271271
} catch (Exception e) {
272272
logger.error("Failed to delete directory: " + localDir, e);
273273
}

0 commit comments

Comments
 (0)