Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
import scala.concurrent.duration._

import org.apache.avro.{Schema, SchemaNormalization}

Expand Down Expand Up @@ -610,7 +611,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")

val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
val executorHeartbeatInterval =
getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
Expand Down Expand Up @@ -831,9 +832,11 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val heartbeatIntervalInSec =
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @gatorsmile , we found there is a bug here. If spark.executor.heartbeatInterval is less than one second, it will always be 0 and timeout. (https://github.com/scala/scala/blob/v2.11.12/src/library/scala/concurrent/impl/Promise.scala#L209)

This may break some user's tests that set a small timeout.

Copy link
Member

@gatorsmile gatorsmile Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, 2.4 release voting passed. @dbtsai Could we document it in the release note?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajithme @srowen We need to fix this ASAP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but that isn't new behavior. This was always parsed as 'seconds' here before, so anything less than a second would have resulted in 0. It's a separate bug but does sound like a problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I was not clear. I meant, for example, if spark.executor.heartbeatInterval is 900 without a time unit, it will be converted to 0 now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree that this is a closely-related bug and fix; the master change fixed both but this change just fixes the unit inconsistency, not also the truncation of this value to seconds.

Release notes probably can't hurt but I am not clear a setting of < "1000" would have ever even worked in practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put it in the release note. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #24329 to fix the issue

try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit in the master branch is different from the unit in 2.4 after this fix. right?

cc @mukulmurthy @zsxwing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying problem was that it was parsed differently on the driver, vs executor. That was fixed in a different way already in master. The behavior of "2m" doesn't change at all. But a unitless value like "1000" was interpreted as "1000 seconds" here vs "1000 milliseconds" on the driver. It's a bug fix, and I'm not sure it would have ever worked with a unitless string like "1000", as the driver would be expecting heartbeats 1000x more frequently than the executor sent them.

Hence I don't know if there was a working behavior that changed here. I don't mind adding a release note just to be sure; my only hesitation is loading up the release notes with items that may not actually affect users. If you feel it should, I suggest you add this to "Docs text" in the JIRA:

The value of spark.executor.heartbeatInterval, when specified without time units like "1000", was interpreted differently on the driver and executor. Without units, values are now consistently interpreted as milliseconds. It's best to specify this value with units, like "10s", as this was and is correctly interpreted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the default time unit in master is ms, which is the same after this fix.

Before this fix, when a time unit is not provided, for example, using 1000, the behavior is sending the heartbeat every 1000ms and the timeout of sending the heartbeat message is 1000s (which I think is a bug introduced in #10365).

I'm +1 for this fix since it has the same behavior as the master branch.

However, I suggest to apply the same changes related to spark.executor.heartbeatInterval from 9362c5c#diff-6bdad48cfc34314e89599655442ff210 rather than this patch to make all places consistent. @ajithme could you submit a follow PR to make the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against that so much, but, master just has a different implementation of all the configs. I don't know if it helps much to back-port part of it to achieve the same behavior. It won't be exactly the same change no matter what.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the current fix in 2.4 has a bug. See https://github.com/apache/spark/pull/24140/files#r271067277

if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
Expand Down