Skip to content

Conversation

@advancedxy
Copy link
Contributor

What changes were proposed in this pull request?

The ultimate goal is for listeners to onTaskEnd to receive metrics when a task is killed intentionally, since the data is currently just thrown away. This is already done for ExceptionFailure, so this just copies the same approach.

How was this patch tested?

Updated existing tests.

This is a rework of #17422, all credits should go to @noodle-fb

@advancedxy
Copy link
Contributor Author

cc @squito @jiangxb1987

@squito
Copy link
Contributor

squito commented Apr 26, 2018

Ok to test

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one small change, otherwise lgtm assuming tests pass

* 3. Set the finished flag to true and clear current thread's interrupt status
*/
private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = {
reportGCAndExecutorTimeIfPossible(taskStart)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the extra reportGCAndExecutorTimeIfPossible is necessary, you can just inline it. and also the original if (task != null) is probably easier to follow than Option(task).map

* 2. Collect accumulator updates
* 3. Set the finished flag to true and clear current thread's interrupt status
*/
private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito after address your comment, do you think we should come up with a more specific method name?

private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
extends TaskFailedReason {

override def toErrorString: String = "TaskKilled ($reason)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s"TaskKilled ($reason)"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Didn't notice s is missing

case exceptionFailure: ExceptionFailure =>
// Nothing left to do, already handled above for accumulator updates.

case _: TaskKilled =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: combine this with the ExceptionFailure case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

@jiangxb1987
Copy link
Contributor

I'm not against the change, but since this changes the semantics of accumulators, we should document the changes in a migration document or something, WDYT @cloud-fan @gatorsmile ?

@jiangxb1987
Copy link
Contributor

Also, please update the PR title:
[Spark-20087][CORE] Attach accumulators / metrics to 'TaskKilled' end reason

@advancedxy advancedxy changed the title Spark 20087: Attach accumulators / metrics to 'TaskKilled' end reason [Spark 20087][CORE] Attach accumulators / metrics to 'TaskKilled' end reason Apr 26, 2018
@advancedxy
Copy link
Contributor Author

we should document the changes in a migration document or something,

I think documentation is necessary, will update the documentation tomorrow (Beijing time)

@jiangxb1987
Copy link
Contributor

It should be [Spark-20087] instead of [Spark 20087] in the title.

@advancedxy advancedxy changed the title [Spark 20087][CORE] Attach accumulators / metrics to 'TaskKilled' end reason [Spark-20087][CORE] Attach accumulators / metrics to 'TaskKilled' end reason Apr 26, 2018
@advancedxy
Copy link
Contributor Author

advancedxy commented Apr 26, 2018

It should be [Spark-20087] instead of [Spark 20087] in the title.

Thanks. Updated.

@advancedxy
Copy link
Contributor Author

I add a note for accumulator update. Please comment if more document is needed.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 27, 2018

I do agree task killed event should carry metrics update, as it's reasonable to count killed tasks for something like how many bytes were read from files.

However, I don't agree user side accumulators should get updates from killed tasks, that changes the semantic of accumulators. And I don't think end-users need to care about killed tasks. Similarly, when we implement task metrics, we need to count failed tasks, but user side accumulator still skips failed tasks. I think we should also follow that approach.

I haven't read the PR yet, but please make sure this patch only touches internal accumulators that are used for metrics reporting.

@advancedxy
Copy link
Contributor Author

However, I don't agree user side accumulators should get updates from killed tasks, that changes the semantic of accumulators. And I don't think end-users need to care about killed tasks. Similarly, when we implement task metrics, we need to count failed tasks, but user side accumulator still skips failed tasks. I think we should also follow that approach.

I don't agree that end-user didn't care killed tasks. For example user may want to record CPU time for every task and get the total CPU time for the application. However the default behaviour should keep backward-compatibility with existing behaviour.

private[spark] case class AccumulatorMetadata(
    id: Long,
    name: Option[String],
    countFailedValues: Boolean) extends Serializable

The metadata has countFailedValues field, we can use this or add a new field?

However we didn't expose this field to end user...

@cloud-fan
Copy link
Contributor

For example user may want to record CPU time for every task and get the total CPU time for the application.

The problem is, shall we allow end users to collect metrics via accumulators? Currently only Spark can do that via internal accumulators which count failed tasks. We need a careful API design about how to expose this ability in the end users.

In the meanwhile, since we already count failed tasks, it makes sense to also count killed tasks for internal metrics collecting.

We should not do these 2 things together, and to me the second one is way simpler to get in and we should do it first.

@advancedxy
Copy link
Contributor Author

We should not do these 2 things together, and to me the second one is way simpler to get in and we should do it first.

Agreed. For the scope of this pr, let's get killed tasks's accumulators into metrics first. After that we can discuss the possibility to expose the ability under users' request.

but please make sure this patch only touches internal accumulators that are used for metrics reporting.

After a second look, this part is already be handled by Task's collectAccumulatorUpdates:

  def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = {
    if (context != null) {
      // Note: internal accumulators representing task metrics always count failed values
      context.taskMetrics.nonZeroInternalAccums() ++
        // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
        // filter them out.
        context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
    } else {
      Seq.empty
    }
  }

@advancedxy
Copy link
Contributor Author

@jiangxb1987 @cloud-fan I think it's ready for review.

case class TaskKilled(
reason: String,
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we use AccumulableInfo to expose accumulator information to end users. Now AccumulatorV2 is already a public classs and we don't need to do it anymore, I think we can just do

case class TaskKilled(reason: String, accums: Seq[AccumulatorV2[_, _]])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed accumUpdates: Seq[AccumulableInfo] is only used in JsonProtocol. Is that for a reason?

The current impl is constructed to be sync with existing TaskEndReason such as ExceptionFailure

@DeveloperApi
case class ExceptionFailure(
    className: String,
    description: String,
    stackTrace: Array[StackTraceElement],
    fullStackTrace: String,
    private val exceptionWrapper: Option[ThrowableSerializationWrapper],
    accumUpdates: Seq[AccumulableInfo] = Seq.empty,
    private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)

I'd prefer to keep in sync, leave two options for cleanup:

  1. leave it as it is, then cleanup with ExceptionFailure together
  2. Cleanup ExceptionFailure first.

@cloud-fan what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's clean up ExceptionFailure at the same time, and use only AccumulatorV2 in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan After a second look, I don't think we can clean up ExceptionFailure unless we can break JsonProtocol

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now the question is: shall we keep the unnecessary Seq[AccumulableInfo] in new APIs, to make the API consistent? I'd like to not keep the Seq[AccumulableInfo], we may deprecate it in the existing APIs in the near future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with not keeping Seq[AccumulableInfo]. But it means inconsistent logic and api and may make future refactoring a bit difficult.

Let's see what I can do.

I'd like to not keep the Seq[AccumulableInfo], we may deprecate it in the existing APIs in the near future.

BTW, I think we have already deprecated AccumulableInfo. Unless we are planing to remove it in Spark 3.0 and Spark 3.0 is the next release, AccumulableInfo will be there for a long time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cloud-fan, I have looked at how to remove Seq[AccumulableInfo] tonight.
It turns out that we cannot because JsonProtocol calls taskEndReasonFromJson to reconstruct TaskEndReasons. Since AccumulatorV2 is an abstract class, we cannot simply construct AccumulatorV2s from json.

Even we are promoting AccumulatorV2, we still need AccumulableInfo when (de)serializing json.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense, let's keep AccumulableInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan so, could you trigger the test and have a look?

And looks like I am not in the whitelist again...

@advancedxy
Copy link
Contributor Author

ping @cloud-fan

@cloud-fan
Copy link
Contributor

ok to test


</div>

In new version of Spark(> 2.3), the semantic of Accumulator has been changed a bit: it now includes updates from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not needed now

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90585 has finished for PR 21165 at commit 05d1d9c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = {
// Report executor runtime and JVM gc time
Option(task).foreach(t => {
t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskStartTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e, startStart is already defined previously. Do you think we need to replace all the taskStart to taskStartTime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should at least rename it in this method as it's newly added code. We can also update the existing code if it's not a lot of work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it..

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90595 has finished for PR 21165 at commit 945c1d5.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

Looks like that simply add fields with default values into case class will break binary compatibility.
How should we deal with that? Add to MimaExcludes or add missing methods? @cloud-fan

@cloud-fan
Copy link
Contributor

I think we can just update MimaExcludes, since it's developer API. cc @JoshRosen

Rename taskStart -> taskStartTime in executor
@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90643 has finished for PR 21165 at commit 59c2807.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

Gently ping @cloud-fan again.

val exceptionFailure = new ExceptionFailure(
new SparkException("fondue?"),
accumInfo).copy(accums = accumUpdates)
accumInfo1).copy(accums = accumUpdates1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not caused by you but why we do a copy instead of passing accumUpdates1 to the constructor directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid the copy call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this copy call cannot be avoided as only the 2 arguments constructor
private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) is defined.


val taskKilled = new TaskKilled(
"test",
accumInfo2).copy(accums = accumUpdates2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid this copy call

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90891 has finished for PR 21165 at commit 74911b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

retest this please

1 similar comment
@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented May 21, 2018

Test build #90901 has finished for PR 21165 at commit 74911b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 82fb5bf May 22, 2018
@advancedxy advancedxy deleted the SPARK-20087 branch May 23, 2018 03:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants