Skip to content

Conversation

@ajithme
Copy link
Contributor

@ajithme ajithme commented Mar 19, 2019

What changes were proposed in this pull request?

When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds)

How was this patch tested?

Will add UTs

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

@ajithme ajithme changed the title [SPARK-27198] Heartbeat interval mismatch in driver and executor [SPARK-27198][core] Heartbeat interval mismatch in driver and executor Mar 19, 2019
@HyukjinKwon
Copy link
Member

Why is this against branch-2.4? Looks it should be against mater.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

Why is this against branch-2.4? Looks it should be against mater.

Due to #22473 in master it has been resolved, but lower versions have this problem

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes can break existing users apps in the case if the suffix is not set. Let's say there is an app for Spark 2.4 which set 10000 (10 sec) as spark.executor.heartbeatInterval. After your changes, it become 1000 * 10 sec. Isn't it?

Could you write a test when there is no suffix, and spark.executor.heartbeatInterval is set to 10000?

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

The changes can break existing users apps in the case if the suffix is not set. Let's say there is an app for Spark 2.4 which set 10000 (10 sec) as spark.executor.heartbeatInterval. After your changes, it become 1000 * 10 sec. Isn't it?

Could you write a test when there is no suffix, and spark.executor.heartbeatInterval is set to 10000?

  1. No it doesnt break. If in 2.4, user has configured spark.executor.heartbeatInterval=10000, without my change, the driver assumes it as 10000 seconds but executor assumes it as 10 seconds, so functionality is already broken

  2. Will update with a testcase

@cloud-fan
Copy link
Contributor

so functionality is already broken

I have the same feeling. Do you have some code snippet to expose this bug? Then it's much easier to convince other people that this is a bug fix.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

so functionality is already broken

I have the same feeling. Do you have some code snippet to expose this bug? Then it's much easier to convince other people that this is a bug fix.

Ok, will update with a test snippet shortly

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

Here is a test snippet
Apologies as i dint find a better way to probe heartbeat rate between driver and executor (so used reflection). Please suggest if there is a better way

package org.apache.spark

import java.util.concurrent.ConcurrentMap

import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef}
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}

/**
  * A test for the heartbeat behavior between the driver and the executors.
  */
class HeartbeatExpirySuite
  extends SparkFunSuite
    with BeforeAndAfterEach
    with PrivateMethodTester
    with LocalSparkContext {

  test("heartbeat of 5 seconds : Driver must not receive heartbeats shorter than 5 seconds") {

    val heartbeatInterval = 5 // assuming this is 5 seconds without units
    val localExecID = "driver" // local exec id

    // driver assumes heartbeatInterval is 5 seconds
    try{
      new SparkConf()
        .set("spark.network.timeout", (heartbeatInterval - 1).toString)
        .set("spark.executor.heartbeatInterval", heartbeatInterval.toString).validateSettings()
      assert(false)
    } catch {
      case e : IllegalArgumentException => e.getMessage.contains("spark.executor.heartbeatInterval=5s")
    }

    // executor assumes heartbeatInterval is 5 milliseconds

    val conf = new SparkConf()
      .setMaster("local[1]")
      .setAppName("test")
      .set("spark.executor.heartbeatInterval", heartbeatInterval.toString)
    sc = new SparkContext(conf)

    // now lets extract the last seen executor map from driver's heartbeat receiver
    val execLastSeen = getExecLastSeen

    // nap for a duration < heartbeat interval
    // take 3 readings to eliminate overlapping scenario
    val heartbeatReportedTime1 = execLastSeen.get(localExecID)
    Thread.sleep((heartbeatInterval * 1000) / 2) // wait for a duration shorter than heartbeat
    val heartbeatReportedTime2 = execLastSeen.get(localExecID)
    Thread.sleep((heartbeatInterval * 1000) / 2) // wait for a duration shorter than heartbeat
    val heartbeatReportedTime3 = execLastSeen.get(localExecID)

    assert(heartbeatReportedTime1 == heartbeatReportedTime2 ||
      heartbeatReportedTime2 == heartbeatReportedTime3)
  }

  /**
    * get HeartbeatReceiver#executorLastSeen via reflection
    *
    * @return map of executor vs last heartbeat time
    */
  private def getExecLastSeen = {
    val nettyRpcClass = Class.forName("org.apache.spark.rpc.netty.NettyRpcEnv")
    val dispatcherField = nettyRpcClass.getDeclaredField("dispatcher")
    dispatcherField.setAccessible(true)
    val dispatcher = dispatcherField.get(sc.env.rpcEnv)
    val dispatcherClass = Class.forName("org.apache.spark.rpc.netty.Dispatcher")
    val endpointRefsField = dispatcherClass.getDeclaredField("endpointRefs")
    endpointRefsField.setAccessible(true)
    val endpointRefs = endpointRefsField.get(dispatcher).asInstanceOf[ConcurrentMap[RpcEndpoint, RpcEndpointRef]]
    import scala.collection.JavaConverters._
    val heartbeatReciver = endpointRefs.keySet().asScala.filter(_.isInstanceOf[HeartbeatReceiver]).head
    val _executorLastSeen = PrivateMethod[collection.Map[String, Long]]('executorLastSeen)
    heartbeatReciver.invokePrivate(_executorLastSeen())
  }
}

The test fails with

Some(1553017599197) did not equal Some(1553017601716), and Some(1553017601716) did not equal Some(1553017604240)
ScalaTestFailureLocation: org.apache.spark.HeartbeatExpirySuite$$anonfun$1 at (HeartbeatExpirySuite.scala:68)

Here we can see that

  1. driver validates heartbeat at rate of 1 per 5 Second
  2. executor sends heartbeat at rate of 1 per 5 MilliSecond

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

also at executor, rpc time out for heartbeat is considered in seconds https://github.com/apache/spark/blob/v2.4.1-rc8/core/src/main/scala/org/apache/spark/executor/Executor.scala#L835

*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val intervalMs = conf
Copy link
Member

@srowen srowen Mar 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In branch 2.4 this is accessed both with getTimeAsSeconds and getTimeAsMs, yes. The only question is which interpretation was intended? it's not a best practice to not specify the units, but hey it can happen.

In master, this is interpreted as milliseconds by default. I think we should instead switch how it's read in SparkConf instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, seems more reasonable as it also avoids the scenario @MaxGekk mentioned. Will modify the PR accordingly

@ajithme
Copy link
Contributor Author

ajithme commented Mar 21, 2019

@srowen @cloud-fan @MaxGekk @HyukjinKwon i have updated the PR as per suggestion. Please review

@MaxGekk
Copy link
Member

MaxGekk commented Mar 21, 2019

It looks nice. Let's trigger a build and run tests.

* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
* defaultUnit is used for string which have just number and no units mentioned
*/
public static long timeStringAs(String str, TimeUnit unit, TimeUnit defaultUnit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why do we want all this? I think we want to standardize the behavior w.r.t. how a unitless number is interpreted, even if it means changing behavior. The behavior is currently inconsistent and I think it should be fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without looking at the PR, +1 to Sean's comment. The "no unit" thing was always a backwards compatibility hack. I think it's time we drop support for that since it's confusing.

(I'd also vote for making all time configs return milliseconds but that's a much noisier change.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i see two options

  1. Handle all other time related configurations also to fall back as milliseconds by default when units are not mentioned
  2. Or disallow any time related configuration to be configured without specifying unit

Please suggest which one i can proceed with.?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not disallow unit-less time, not here, though I think that's reasonable to consider separately for Spark 3. Here I think we should just focus on fixing the inconsistency in handling this one parameter, and make it consistent with how it's handled in master. I don't think we need more than a few lines of change for that.

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103778 has finished for PR 24140 at commit bc39695.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 22, 2019

@srowen @attilapiros @MaxGekk @cloud-fan @vanzin Thank you for all your inputs. I have updated the PR to fit as per latest suggestion. Please have a look.

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103806 has finished for PR 24140 at commit 76570b7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 24, 2019

@srowen i have updated as per comments. Please have a look

@SparkQA
Copy link

SparkQA commented Mar 24, 2019

Test build #103869 has finished for PR 24140 at commit 596c9b2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 24, 2019

Test build #103871 has finished for PR 24140 at commit 6c17bf8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 24, 2019

Test build #103870 has finished for PR 24140 at commit 39d9b59.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK pending tests. I guess I don't mind about using TimeUnit

@SparkQA
Copy link

SparkQA commented Mar 25, 2019

Test build #4664 has finished for PR 24140 at commit 6c17bf8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Mar 25, 2019

Merged to 2.4

srowen pushed a commit that referenced this pull request Mar 25, 2019
## What changes were proposed in this pull request?

When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds)

## How was this patch tested?

Will add UTs

Closes #24140 from ajithme/intervalissue.

Authored-by: Ajith <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
@cloud-fan cloud-fan closed this Mar 25, 2019
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit in the master branch is different from the unit in 2.4 after this fix. right?

cc @mukulmurthy @zsxwing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying problem was that it was parsed differently on the driver, vs executor. That was fixed in a different way already in master. The behavior of "2m" doesn't change at all. But a unitless value like "1000" was interpreted as "1000 seconds" here vs "1000 milliseconds" on the driver. It's a bug fix, and I'm not sure it would have ever worked with a unitless string like "1000", as the driver would be expecting heartbeats 1000x more frequently than the executor sent them.

Hence I don't know if there was a working behavior that changed here. I don't mind adding a release note just to be sure; my only hesitation is loading up the release notes with items that may not actually affect users. If you feel it should, I suggest you add this to "Docs text" in the JIRA:

The value of spark.executor.heartbeatInterval, when specified without time units like "1000", was interpreted differently on the driver and executor. Without units, values are now consistently interpreted as milliseconds. It's best to specify this value with units, like "10s", as this was and is correctly interpreted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the default time unit in master is ms, which is the same after this fix.

Before this fix, when a time unit is not provided, for example, using 1000, the behavior is sending the heartbeat every 1000ms and the timeout of sending the heartbeat message is 1000s (which I think is a bug introduced in #10365).

I'm +1 for this fix since it has the same behavior as the master branch.

However, I suggest to apply the same changes related to spark.executor.heartbeatInterval from 9362c5c#diff-6bdad48cfc34314e89599655442ff210 rather than this patch to make all places consistent. @ajithme could you submit a follow PR to make the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against that so much, but, master just has a different implementation of all the configs. I don't know if it helps much to back-port part of it to achieve the same behavior. It won't be exactly the same change no matter what.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the current fix in 2.4 has a bug. See https://github.com/apache/spark/pull/24140/files#r271067277


val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val heartbeatIntervalInSec =
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @gatorsmile , we found there is a bug here. If spark.executor.heartbeatInterval is less than one second, it will always be 0 and timeout. (https://github.com/scala/scala/blob/v2.11.12/src/library/scala/concurrent/impl/Promise.scala#L209)

This may break some user's tests that set a small timeout.

Copy link
Member

@gatorsmile gatorsmile Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, 2.4 release voting passed. @dbtsai Could we document it in the release note?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajithme @srowen We need to fix this ASAP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but that isn't new behavior. This was always parsed as 'seconds' here before, so anything less than a second would have resulted in 0. It's a separate bug but does sound like a problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I was not clear. I meant, for example, if spark.executor.heartbeatInterval is 900 without a time unit, it will be converted to 0 now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree that this is a closely-related bug and fix; the master change fixed both but this change just fixes the unit inconsistency, not also the truncation of this value to seconds.

Release notes probably can't hurt but I am not clear a setting of < "1000" would have ever even worked in practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put it in the release note. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #24329 to fix the issue

kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
## What changes were proposed in this pull request?

When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds)

## How was this patch tested?

Will add UTs

Closes apache#24140 from ajithme/intervalissue.

Authored-by: Ajith <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
## What changes were proposed in this pull request?

When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds)

## How was this patch tested?

Will add UTs

Closes apache#24140 from ajithme/intervalissue.

Authored-by: Ajith <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
## What changes were proposed in this pull request?

When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds)

## How was this patch tested?

Will add UTs

Closes apache#24140 from ajithme/intervalissue.

Authored-by: Ajith <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.