Skip to content

Commit bc39695

Browse files
committed
handle non-unit time as millisecond
1 parent 5755b31 commit bc39695

File tree

3 files changed

+32
-7
lines changed

3 files changed

+32
-7
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ private static boolean isSymlink(File file) throws IOException {
226226
* The unit is also considered the default if the given string does not specify a unit.
227227
*/
228228
public static long timeStringAs(String str, TimeUnit unit) {
229+
return timeStringAs(str, unit, unit);
230+
}
231+
232+
/**
233+
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
234+
* defaultUnit is used for string which have just number and no units mentioned
235+
*/
236+
public static long timeStringAs(String str, TimeUnit unit, TimeUnit defaultUnit) {
229237
String lower = str.toLowerCase(Locale.ROOT).trim();
230238

231239
try {
@@ -243,7 +251,7 @@ public static long timeStringAs(String str, TimeUnit unit) {
243251
}
244252

245253
// If suffix is valid use that, otherwise none was provided and use the default passed
246-
return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
254+
return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : defaultUnit);
247255
} catch (NumberFormatException e) {
248256
String timeError = "Time must be specified as seconds (s), " +
249257
"milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " +

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.util.{Map => JMap}
21-
import java.util.concurrent.ConcurrentHashMap
21+
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2222

2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable.LinkedHashSet
@@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization}
2828
import org.apache.spark.deploy.history.config._
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.internal.config._
31+
import org.apache.spark.network.util.JavaUtils
3132
import org.apache.spark.serializer.KryoSerializer
3233
import org.apache.spark.util.Utils
3334

@@ -280,6 +281,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
280281
Utils.timeStringAsSeconds(get(key, defaultValue))
281282
}
282283

284+
/**
285+
* Get a time parameter as seconds, falling back to a default if not set. If no
286+
* suffix is provided then defaultUnit is assumed.
287+
* @throws NumberFormatException If the value cannot be interpreted as seconds
288+
*/
289+
def getTimeAsSeconds(key: String, defaultValue: String, defaultUnit: TimeUnit): Long =
290+
catchIllegalValue(key) {
291+
JavaUtils.timeStringAs(get(key, defaultValue), TimeUnit.SECONDS, defaultUnit)
292+
}
293+
283294
/**
284295
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
285296
* suffix is provided then milliseconds are assumed.
@@ -610,7 +621,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
610621
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
611622

612623
val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
613-
val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
624+
val executorHeartbeatInterval =
625+
getTimeAsSeconds("spark.executor.heartbeatInterval", "10s", TimeUnit.MILLISECONDS)
614626
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
615627
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
616628
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy
2828

2929
import scala.collection.JavaConverters._
3030
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
31+
import scala.concurrent.duration._
3132
import scala.util.control.NonFatal
3233

3334
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -71,6 +72,11 @@ private[spark] class Executor(
7172

7273
private val conf = env.conf
7374

75+
private val HEARTBEAT_INTERVAL_KEY = "spark.executor.heartbeatInterval"
76+
77+
private val heartbeatIntervalInSec =
78+
conf.getTimeAsSeconds(HEARTBEAT_INTERVAL_KEY, "10s", TimeUnit.MILLISECONDS).seconds
79+
7480
// No ip or host:port - just hostname
7581
Utils.checkHost(executorHostname)
7682
// must not have port specified.
@@ -832,8 +838,9 @@ private[spark] class Executor(
832838

833839
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
834840
try {
841+
835842
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
836-
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
843+
message, new RpcTimeout(heartbeatIntervalInSec, HEARTBEAT_INTERVAL_KEY))
837844
if (response.reregisterBlockManager) {
838845
logInfo("Told to re-register on heartbeat")
839846
env.blockManager.reregister()
@@ -855,9 +862,7 @@ private[spark] class Executor(
855862
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
856863
*/
857864
private def startDriverHeartbeater(): Unit = {
858-
val intervalMs = conf
859-
.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") *
860-
TimeUnit.SECONDS.toMillis(1)
865+
val intervalMs = heartbeatIntervalInSec.toMillis
861866

862867
// Wait a random interval so the heartbeats don't end up in sync
863868
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

0 commit comments

Comments
 (0)