Skip to content

Commit 177e809

Browse files
committed
MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du
1 parent 7dae5b5 commit 177e809

File tree

2 files changed

+33
-9
lines changed
  • hadoop-mapreduce-project
    • hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce

2 files changed

+33
-9
lines changed

hadoop-mapreduce-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ Release 2.6.0 - 2014-11-15
465465
MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
466466
(Emilio Coppa and jlowe via kihwal)
467467

468+
MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
469+
correctly (Junping Du via jlowe)
470+
468471
Release 2.5.2 - 2014-11-10
469472

470473
INCOMPATIBLE CHANGES

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ private void openConnectionWithRetry(MapHost host,
407407
}
408408
if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
409409
LOG.warn("Failed to connect to host: " + url + "after "
410-
+ fetchRetryTimeout + "milliseconds.");
410+
+ fetchRetryTimeout + " milliseconds.");
411411
throw e;
412412
}
413413
try {
@@ -596,7 +596,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
596596
} else {
597597
// timeout, prepare to be failed.
598598
LOG.warn("Timeout for copying MapOutput with retry on host " + host
599-
+ "after " + fetchRetryTimeout + "milliseconds.");
599+
+ "after " + fetchRetryTimeout + " milliseconds.");
600600

601601
}
602602
}
@@ -678,28 +678,49 @@ private void connect(URLConnection connection, int connectionTimeout)
678678
} else if (connectionTimeout > 0) {
679679
unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
680680
}
681+
long startTime = Time.monotonicNow();
682+
long lastTime = startTime;
683+
int attempts = 0;
681684
// set the connect timeout to the unit-connect-timeout
682685
connection.setConnectTimeout(unit);
683686
while (true) {
684687
try {
688+
attempts++;
685689
connection.connect();
686690
break;
687691
} catch (IOException ioe) {
688-
// update the total remaining connect-timeout
689-
connectionTimeout -= unit;
690-
692+
long currentTime = Time.monotonicNow();
693+
long retryTime = currentTime - startTime;
694+
long leftTime = connectionTimeout - retryTime;
695+
long timeSinceLastIteration = currentTime - lastTime;
691696
// throw an exception if we have waited for timeout amount of time
692697
// note that the updated value if timeout is used here
693-
if (connectionTimeout == 0) {
698+
if (leftTime <= 0) {
699+
int retryTimeInSeconds = (int) retryTime/1000;
700+
LOG.error("Connection retry failed with " + attempts +
701+
" attempts in " + retryTimeInSeconds + " seconds");
694702
throw ioe;
695703
}
696-
697704
// reset the connect timeout for the last try
698-
if (connectionTimeout < unit) {
699-
unit = connectionTimeout;
705+
if (leftTime < unit) {
706+
unit = (int)leftTime;
700707
// reset the connect time out for the final connect
701708
connection.setConnectTimeout(unit);
702709
}
710+
711+
if (timeSinceLastIteration < unit) {
712+
try {
713+
// sleep the left time of unit
714+
sleep(unit - timeSinceLastIteration);
715+
} catch (InterruptedException e) {
716+
LOG.warn("Sleep in connection retry get interrupted.");
717+
if (stopped) {
718+
return;
719+
}
720+
}
721+
}
722+
// update the total remaining connect-timeout
723+
lastTime = Time.monotonicNow();
703724
}
704725
}
705726
}

0 commit comments

Comments
 (0)