Skip to content

Commit f8ff786

Browse files
weixiulisrowen
authored andcommitted
[SPARK-37984][SHUFFLE] Avoid calculating all outstanding requests to improve performance
### What changes were proposed in this pull request? Avoid calculating all outstanding requests to improve performance. ### Why are the changes needed? Follow the comment (#34711 (review)) , we can implement a "has outstanding requests" method in the response handler that doesn't even need to get a count,let's do this with PR. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Exist unittests. Closes #35276 from weixiuli/SPARK-37984. Authored-by: weixiuli <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 3922b9b commit f8ff786

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void channelActive() {
140140

141141
@Override
142142
public void channelInactive() {
143-
if (numOutstandingRequests() > 0) {
143+
if (hasOutstandingRequests()) {
144144
String remoteAddress = getRemoteAddress(channel);
145145
logger.error("Still have {} requests outstanding when connection from {} is closed",
146146
numOutstandingRequests(), remoteAddress);
@@ -150,7 +150,7 @@ public void channelInactive() {
150150

151151
@Override
152152
public void exceptionCaught(Throwable cause) {
153-
if (numOutstandingRequests() > 0) {
153+
if (hasOutstandingRequests()) {
154154
String remoteAddress = getRemoteAddress(channel);
155155
logger.error("Still have {} requests outstanding when connection from {} is closed",
156156
numOutstandingRequests(), remoteAddress);
@@ -275,6 +275,12 @@ public int numOutstandingRequests() {
275275
(streamActive ? 1 : 0);
276276
}
277277

278+
/** Check if there are any outstanding requests (fetch requests + rpcs) */
279+
public Boolean hasOutstandingRequests() {
280+
return streamActive || !outstandingFetches.isEmpty() || !outstandingRpcs.isEmpty() ||
281+
!streamCallbacks.isEmpty();
282+
}
283+
278284
/** Returns the time in nanoseconds of when the last request was sent out. */
279285
public long getTimeOfLastRequestNs() {
280286
return timeOfLastRequestNs.get();

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
161161
boolean isActuallyOverdue =
162162
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
163163
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
164-
boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
165-
if (hasInFlightRequests) {
164+
if (responseHandler.hasOutstandingRequests()) {
166165
String address = getRemoteAddress(ctx.channel());
167166
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
168167
"requests. Assuming connection is dead; please adjust" +

0 commit comments

Comments
 (0)