Skip to content

Commit 83188ce

Browse files
author
Marcelo Vanzin
committed
Explicitly mark the client as timed out.
Since ctx.close() is asynchronous, this ensures that threads checking for the client being alive get the right result.
1 parent f836391 commit 83188ce

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

network/common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,20 @@ public class TransportClient implements Closeable {
7373
private final Channel channel;
7474
private final TransportResponseHandler handler;
7575
@Nullable private String clientId;
76+
private volatile boolean timedOut;
7677

7778
public TransportClient(Channel channel, TransportResponseHandler handler) {
7879
this.channel = Preconditions.checkNotNull(channel);
7980
this.handler = Preconditions.checkNotNull(handler);
81+
this.timedOut = false;
8082
}
8183

8284
public Channel getChannel() {
8385
return channel;
8486
}
8587

8688
public boolean isActive() {
87-
return channel.isOpen() || channel.isActive();
89+
return !timedOut && (channel.isOpen() || channel.isActive());
8890
}
8991

9092
public SocketAddress getSocketAddress() {
@@ -263,6 +265,11 @@ public void onFailure(Throwable e) {
263265
}
264266
}
265267

268+
/** Mark this channel as having timed out. */
269+
public void timeOut() {
270+
this.timedOut = true;
271+
}
272+
266273
@Override
267274
public void close() {
268275
// close is a local operation and should finish with milliseconds; timeout just to be safe

network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
128128
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
129129
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
130130
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
131+
client.timeOut();
131132
ctx.close();
132133
} else if (closeIdleConnections) {
133134
// While CloseIdleConnections is enable, we also close idle connection
135+
client.timeOut();
134136
ctx.close();
135137
}
136138
}

0 commit comments

Comments
 (0)