Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void channelActive() {

@Override
public void channelInactive() {
if (numOutstandingRequests() > 0) {
if (hasOutstandingRequests()) {
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
Expand All @@ -150,7 +150,7 @@ public void channelInactive() {

@Override
public void exceptionCaught(Throwable cause) {
if (numOutstandingRequests() > 0) {
if (hasOutstandingRequests()) {
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
Expand Down Expand Up @@ -275,6 +275,12 @@ public int numOutstandingRequests() {
(streamActive ? 1 : 0);
}

/** Check if there are any outstanding requests (fetch requests + rpcs) */
public Boolean hasOutstandingRequests() {
return streamActive || !outstandingFetches.isEmpty() || !outstandingRpcs.isEmpty() ||
!streamCallbacks.isEmpty();
}

/** Returns the time in nanoseconds of when the last request was sent out. */
public long getTimeOfLastRequestNs() {
return timeOfLastRequestNs.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
if (hasInFlightRequests) {
if (responseHandler.hasOutstandingRequests()) {
String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust" +
Expand Down