@@ -733,6 +733,11 @@ public void onFailure(Exception e) {
733733 "failed to notify channel of error message for action [{}]" , action ), inner );
734734 }
735735 }
736+
737+ @ Override
738+ public String toString () {
739+ return "processing of [" + requestId + "][" + action + "]: " + request ;
740+ }
736741 });
737742 }
738743
@@ -946,7 +951,7 @@ private void checkForTimeout(long requestId) {
946951 assert responseHandlers .contains (requestId ) == false ;
947952 TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers .remove (requestId );
948953 if (timeoutInfoHolder != null ) {
949- long time = System . currentTimeMillis ();
954+ long time = threadPool . relativeTimeInMillis ();
950955 logger .warn ("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " +
951956 "action [{}], node [{}], id [{}]" , time - timeoutInfoHolder .sentTime (), time - timeoutInfoHolder .timeoutTime (),
952957 timeoutInfoHolder .action (), timeoutInfoHolder .node (), requestId );
@@ -1009,7 +1014,7 @@ protected void traceRequestSent(DiscoveryNode node, long requestId, String actio
10091014 final class TimeoutHandler implements Runnable {
10101015
10111016 private final long requestId ;
1012- private final long sentTime = System . currentTimeMillis ();
1017+ private final long sentTime = threadPool . relativeTimeInMillis ();
10131018 private final String action ;
10141019 private final DiscoveryNode node ;
10151020 volatile ScheduledFuture future ;
@@ -1023,7 +1028,7 @@ final class TimeoutHandler implements Runnable {
10231028 @ Override
10241029 public void run () {
10251030 if (responseHandlers .contains (requestId )) {
1026- long timeoutTime = System . currentTimeMillis ();
1031+ long timeoutTime = threadPool . relativeTimeInMillis ();
10271032 timeoutInfoHandlers .put (requestId , new TimeoutInfoHolder (node , action , sentTime , timeoutTime ));
10281033 // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id
10291034 final Transport .ResponseContext holder = responseHandlers .remove (requestId );
@@ -1049,6 +1054,11 @@ public void cancel() {
10491054 "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers" ;
10501055 FutureUtils .cancel (future );
10511056 }
1057+
1058+ @ Override
1059+ public String toString () {
1060+ return "timeout handler for [" + requestId + "][" + action + "]" ;
1061+ }
10521062 }
10531063
10541064 static class TimeoutInfoHolder {
@@ -1176,7 +1186,17 @@ public void sendResponse(final TransportResponse response, TransportResponseOpti
11761186 if (ThreadPool .Names .SAME .equals (executor )) {
11771187 processResponse (handler , response );
11781188 } else {
1179- threadPool .executor (executor ).execute (() -> processResponse (handler , response ));
1189+ threadPool .executor (executor ).execute (new Runnable () {
1190+ @ Override
1191+ public void run () {
1192+ processResponse (handler , response );
1193+ }
1194+
1195+ @ Override
1196+ public String toString () {
1197+ return "delivery of response to [" + requestId + "][" + action + "]: " + response ;
1198+ }
1199+ });
11801200 }
11811201 }
11821202 }
@@ -1201,7 +1221,17 @@ public void sendResponse(Exception exception) throws IOException {
12011221 if (ThreadPool .Names .SAME .equals (executor )) {
12021222 processException (handler , rtx );
12031223 } else {
1204- threadPool .executor (handler .executor ()).execute (() -> processException (handler , rtx ));
1224+ threadPool .executor (handler .executor ()).execute (new Runnable () {
1225+ @ Override
1226+ public void run () {
1227+ processException (handler , rtx );
1228+ }
1229+
1230+ @ Override
1231+ public String toString () {
1232+ return "delivery of failure response to [" + requestId + "][" + action + "]: " + exception ;
1233+ }
1234+ });
12051235 }
12061236 }
12071237 }
0 commit comments