Skip to content

Conversation

@robreeves
Copy link
Contributor

@robreeves robreeves commented Feb 23, 2022

What changes were proposed in this pull request?

Background

In PR #26508 (SPARK-26260) the SHS stage metric percentiles were updated to only include successful tasks when using disk storage. It did this by making the values for each metric negative when the task is not in a successful state. This approach was chosen to avoid breaking changes to disk storage. See this comment for context.

To get the percentiles, it reads the metric values, starting at 0, in ascending order. This filters out all tasks that are not successful because the values are less than 0. To get the percentile values it scales the percentiles to the list index of successful tasks. For example if there are 200 tasks and you want percentiles [0, 25, 50, 75, 100] the lookup indexes in the task collection are [0, 50, 100, 150, 199].

Issue

For metrics 1) shuffle total reads and 2) shuffle total blocks, PR #26508 incorrectly makes the metric indices positive. This means tasks that are not successful are included in the percentile calculations. The percentile lookup index calculation is still based on the number of successful task so the wrong task metric is returned for a given percentile. This was not caught because the unit test only verified values for one metric, executorRunTime.

Fix

The index values for SHUFFLE_TOTAL_READS and SHUFFLE_TOTAL_BLOCKS should not convert back to positive metric values for tasks that are not successful. I believe this was done because the metrics values are summed from two other metrics. Using the raw values still creates the desired outcome. negative + negative = negative and positive + positive = positive. There is no case where one metric will be negative and one will be positive. I also verified that these two metrics are only used in the percentile calculations where only successful tasks are used.

Why are the changes needed?

This change is required so that the SHS stage percentile metrics for shuffle read bytes and shuffle total blocks are correct.

Does this PR introduce any user-facing change?

Yes. The user will see the correct percentile values for the stage summary shuffle read bytes.

How was this patch tested?

I updated the unit test to verify the percentile values for every task metric. I also modified the unit test to have unique values for every metric. Previously the test had the same metrics for every field. This would not catch bugs like the wrong field being read by accident.

I manually validated the fix in the UI.

BEFORE
image

AFTER
image

I manually validated the fix in the task summary API (/api/v1/applications/application_123/1/stages/14/0/taskSummary\?quantiles\=0,0.25,0.5,0.75,1.0). See shuffleReadMetrics.readBytes and shuffleReadMetrics.totalBlocksFetched.

Before:

{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         5.63718681E8
      ],
      "totalBlocksFetched":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         2.0
      ],
      ...
   },
   ...
}

After:

{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         5.62865286E8,
         5.63779421E8,
         5.63941681E8,
         5.64327925E8,
         5.7674183E8
      ],
      "totalBlocksFetched":[
         2.0,
         2.0,
         2.0,
         2.0,
         2.0
      ],
      ...
   }
   ...
}

@github-actions github-actions bot added the CORE label Feb 23, 2022
@robreeves robreeves marked this pull request as ready for review February 24, 2022 01:19
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented Feb 24, 2022

Just out of curiosity, can you explain again the idea that negative = negative + negative? why add the same thing to itself? it looks funny but I'm sure there's a reason it works - may be worth a comment

@robreeves
Copy link
Contributor Author

Just out of curiosity, can you explain again the idea that negative = negative + negative? why add the same thing to itself? it looks funny but I'm sure there's a reason it works - may be worth a comment

This doesn't add the same number together twice. This refers to the sign (positive or negative) for the composite indices modified in this PR, TaskDataWrapper.shuffleTotalReads and TaskDataWrapper.shuffleTotalBlocks. The sign is important because tasks that aren't successful should have a negative sign (implemented in PR #26508).

@srowen
Copy link
Member

srowen commented Feb 24, 2022

Oh disregard, I read the new code as adding the same variable to itself -- they're local vs remote values. I got it
I don't know quite enough to say I can evaluate the change, but looks reasonable and sounds like you've researched this.
@shahidki31 WDYT?

@robreeves
Copy link
Contributor Author

@srowen if @shahidki31 doesn't answer is there anyone else we can ping to verify the changes? Let me know if I can help.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a specific unit test to validate that the sign is not flipping ?

private def getTaskMetrics(seed: Int): TaskMetrics = {
val random = new Random(seed)
val randomMax = 1000
def nextInt(): Int = random.nextInt(randomMax)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This can end up becoming zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have no impact on the validity of the tests here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might have mentioned this due to the metric value being made negative for tasks that aren't successful. Zero is accounted for here.

@robreeves
Copy link
Contributor Author

robreeves commented Mar 1, 2022

Can we add a specific unit test to validate that the sign is not flipping ?

The test SPARK-26260: summary should contain only successful tasks' metrics (store = disk leveldb) in AppStatusStoreSuite would catch this because it would mean a task is either included or excluded when it shouldn't be. This test is more like a mini-integration test.

After researching if we could write a lower level test directly for the index value I am leaning towards the above test being the appropriate level to test at, but am open to changing this if you disagree.

  • TaskDataWrapper is only a data class so we could pass in whatever field values we want for the test, including making the sign flip. A test here would not really test any meaningful behavior. Additionally, we'd be making the fields shuffleTotalReads and shuffleTotalBlocks public just for testing. Not a huge deal and I am not dogmatic on this, but I generally consider exposing data/logic just for testing an anti-pattern and feel unit tests should focus on what a class, or classes, needs to expose publicly.
  • LiveTask is also private. I could change its access level to LiveTask and TaskDataWrapper to test it, but it is well covered in the AppStatusStoreSuite test. Since the behavior in AppStatusStore is what we really are concerned about and these are more helper classes I felt this is acceptable.

@mridulm
Copy link
Contributor

mridulm commented Mar 4, 2022

Since the existing test was not catching this issue, I want to make sure that we are testing for this behavior.
Given that we have exhaustively tested the current metrics - including the one which had an issue, this looks fine to me.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Any other thoughts @srowen ?
+CC @shahidki31

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-38309][Core] Fix incorrect SHS stage percentile metrics for shuffle read bytes and shuffle total blocks [SPARK-38309][CORE] Fix SHS shuffleTotalReads and shuffleTotalBlocks percentile metrics Mar 7, 2022
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM, too.

@srowen
Copy link
Member

srowen commented Mar 7, 2022

Backport to 3.2/3.1 I presume?

@shahidki31
Copy link
Contributor

shahidki31 commented Mar 7, 2022

@robreeves The PR #26508 was originally intended to compute summary metrics for only successful tasks. That is why made all the non successful tasks' metrics negative, initially. So do you see ShuffleTotalReads and ShuffleTotalBlocks negative even for successful tasks?

@robreeves
Copy link
Contributor Author

@robreeves The PR #26508 was originally intended to compute summary metrics for only successful tasks. That is why made all the non successful tasks' metrics negative, initially. So do you see ShuffleTotalReads and ShuffleTotalBlocks negative even for successful tasks?

@shahidki31 The problem for ShuffleTotalReads and ShuffleTotalBlocks is that the index values are not ever negative because getMetricValue is called within the index method. Because of this non successful tasks are included in the percentile computations for these metrics.

@shahidki31
Copy link
Contributor

shahidki31 commented Mar 7, 2022

Ok. We will be converting to the actual shuffle metrics value here.

getMetricValue(shuffleRemoteBlocksFetched),
getMetricValue(shuffleLocalBlocksFetched),
getMetricValue(shuffleFetchWaitTime),
getMetricValue(shuffleRemoteBytesRead),
getMetricValue(shuffleRemoteBytesReadToDisk),
getMetricValue(shuffleLocalBytesRead),
getMetricValue(shuffleRecordsRead)),

But this method is used only for computing the summary metrics.

@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE)
private def shuffleTotalReads: Long = {
if (hasMetrics) {
shuffleLocalBytesRead + shuffleRemoteBytesRead
} else {
-1L
}
}

LGTM.

@asfgit asfgit closed this in 0ad7677 Mar 8, 2022
asfgit pushed a commit that referenced this pull request Mar 8, 2022
…ks` percentile metrics

### What changes were proposed in this pull request?

#### Background
In PR #26508 (SPARK-26260) the SHS stage metric percentiles were updated to only include successful tasks when using disk storage. It did this by making the values for each metric negative when the task is not in a successful state. This approach was chosen to avoid breaking changes to disk storage. See [this comment](#26508 (comment)) for context.

To get the percentiles, it reads the metric values, starting at 0, in ascending order. This filters out all tasks that are not successful because the values are less than 0. To get the percentile values it scales the percentiles to the list index of successful tasks. For example if there are 200 tasks and you want percentiles [0, 25, 50, 75, 100] the lookup indexes in the task collection are [0, 50, 100, 150, 199].

#### Issue
For metrics 1) shuffle total reads and 2) shuffle total blocks, PR #26508 incorrectly makes the metric indices positive. This means tasks that are not successful are included in the percentile calculations. The percentile lookup index calculation is still based on the number of successful task so the wrong task metric is returned for a given percentile. This was not caught because the unit test only verified values for one metric, `executorRunTime`.

#### Fix
The index values for `SHUFFLE_TOTAL_READS` and `SHUFFLE_TOTAL_BLOCKS` should not convert back to positive metric values for tasks that are not successful. I believe this was done because the metrics values are summed from two other metrics. Using the raw values still creates the desired outcome. `negative + negative = negative` and `positive + positive = positive`. There is no case where one metric will be negative and one will be positive. I also verified that these two metrics are only used in the percentile calculations where only successful tasks are used.

### Why are the changes needed?
This change is required so that the SHS stage percentile metrics for shuffle read bytes and shuffle total blocks are correct.

### Does this PR introduce _any_ user-facing change?
Yes. The user will see the correct percentile values for the stage summary shuffle read bytes.

### How was this patch tested?
I updated the unit test to verify the percentile values for every task metric. I also modified the unit test to have unique values for every metric. Previously the test had the same metrics for every field. This would not catch bugs like the wrong field being read by accident.

I manually validated the fix in the UI.

**BEFORE**
![image](https://user-images.githubusercontent.com/5604993/155433460-322078c5-1821-4f2e-8e53-8fc3902eb7fe.png)

**AFTER**
![image](https://user-images.githubusercontent.com/5604993/155433491-25ce3acf-290b-4b83-a0a9-0f9b71c7af04.png)

I manually validated the fix in the task summary API (`/api/v1/applications/application_123/1/stages/14/0/taskSummary\?quantiles\=0,0.25,0.5,0.75,1.0`). See `shuffleReadMetrics.readBytes` and `shuffleReadMetrics.totalBlocksFetched`.

Before:
```json
{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         5.63718681E8
      ],
      "totalBlocksFetched":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         2.0
      ],
      ...
   },
   ...
}
```

After:
```json
{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         5.62865286E8,
         5.63779421E8,
         5.63941681E8,
         5.64327925E8,
         5.7674183E8
      ],
      "totalBlocksFetched":[
         2.0,
         2.0,
         2.0,
         2.0,
         2.0
      ],
      ...
   }
   ...
}
```

Closes #35637 from robreeves/SPARK-38309.

Authored-by: Rob Reeves <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0ad7677)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
asfgit pushed a commit that referenced this pull request Mar 8, 2022
…ks` percentile metrics

### What changes were proposed in this pull request?

#### Background
In PR #26508 (SPARK-26260) the SHS stage metric percentiles were updated to only include successful tasks when using disk storage. It did this by making the values for each metric negative when the task is not in a successful state. This approach was chosen to avoid breaking changes to disk storage. See [this comment](#26508 (comment)) for context.

To get the percentiles, it reads the metric values, starting at 0, in ascending order. This filters out all tasks that are not successful because the values are less than 0. To get the percentile values it scales the percentiles to the list index of successful tasks. For example if there are 200 tasks and you want percentiles [0, 25, 50, 75, 100] the lookup indexes in the task collection are [0, 50, 100, 150, 199].

#### Issue
For metrics 1) shuffle total reads and 2) shuffle total blocks, PR #26508 incorrectly makes the metric indices positive. This means tasks that are not successful are included in the percentile calculations. The percentile lookup index calculation is still based on the number of successful task so the wrong task metric is returned for a given percentile. This was not caught because the unit test only verified values for one metric, `executorRunTime`.

#### Fix
The index values for `SHUFFLE_TOTAL_READS` and `SHUFFLE_TOTAL_BLOCKS` should not convert back to positive metric values for tasks that are not successful. I believe this was done because the metrics values are summed from two other metrics. Using the raw values still creates the desired outcome. `negative + negative = negative` and `positive + positive = positive`. There is no case where one metric will be negative and one will be positive. I also verified that these two metrics are only used in the percentile calculations where only successful tasks are used.

### Why are the changes needed?
This change is required so that the SHS stage percentile metrics for shuffle read bytes and shuffle total blocks are correct.

### Does this PR introduce _any_ user-facing change?
Yes. The user will see the correct percentile values for the stage summary shuffle read bytes.

### How was this patch tested?
I updated the unit test to verify the percentile values for every task metric. I also modified the unit test to have unique values for every metric. Previously the test had the same metrics for every field. This would not catch bugs like the wrong field being read by accident.

I manually validated the fix in the UI.

**BEFORE**
![image](https://user-images.githubusercontent.com/5604993/155433460-322078c5-1821-4f2e-8e53-8fc3902eb7fe.png)

**AFTER**
![image](https://user-images.githubusercontent.com/5604993/155433491-25ce3acf-290b-4b83-a0a9-0f9b71c7af04.png)

I manually validated the fix in the task summary API (`/api/v1/applications/application_123/1/stages/14/0/taskSummary\?quantiles\=0,0.25,0.5,0.75,1.0`). See `shuffleReadMetrics.readBytes` and `shuffleReadMetrics.totalBlocksFetched`.

Before:
```json
{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         5.63718681E8
      ],
      "totalBlocksFetched":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         2.0
      ],
      ...
   },
   ...
}
```

After:
```json
{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         5.62865286E8,
         5.63779421E8,
         5.63941681E8,
         5.64327925E8,
         5.7674183E8
      ],
      "totalBlocksFetched":[
         2.0,
         2.0,
         2.0,
         2.0,
         2.0
      ],
      ...
   }
   ...
}
```

Closes #35637 from robreeves/SPARK-38309.

Authored-by: Rob Reeves <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0ad7677)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Mar 8, 2022

Merged to master/branch-3.2/branch-3.1

Thanks for working on this @robreeves !
Thanks for the reviews @dongjoon-hyun, @shahidki31 :-)

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…ks` percentile metrics

### What changes were proposed in this pull request?

#### Background
In PR apache#26508 (SPARK-26260) the SHS stage metric percentiles were updated to only include successful tasks when using disk storage. It did this by making the values for each metric negative when the task is not in a successful state. This approach was chosen to avoid breaking changes to disk storage. See [this comment](apache#26508 (comment)) for context.

To get the percentiles, it reads the metric values, starting at 0, in ascending order. This filters out all tasks that are not successful because the values are less than 0. To get the percentile values it scales the percentiles to the list index of successful tasks. For example if there are 200 tasks and you want percentiles [0, 25, 50, 75, 100] the lookup indexes in the task collection are [0, 50, 100, 150, 199].

#### Issue
For metrics 1) shuffle total reads and 2) shuffle total blocks, PR apache#26508 incorrectly makes the metric indices positive. This means tasks that are not successful are included in the percentile calculations. The percentile lookup index calculation is still based on the number of successful task so the wrong task metric is returned for a given percentile. This was not caught because the unit test only verified values for one metric, `executorRunTime`.

#### Fix
The index values for `SHUFFLE_TOTAL_READS` and `SHUFFLE_TOTAL_BLOCKS` should not convert back to positive metric values for tasks that are not successful. I believe this was done because the metrics values are summed from two other metrics. Using the raw values still creates the desired outcome. `negative + negative = negative` and `positive + positive = positive`. There is no case where one metric will be negative and one will be positive. I also verified that these two metrics are only used in the percentile calculations where only successful tasks are used.

### Why are the changes needed?
This change is required so that the SHS stage percentile metrics for shuffle read bytes and shuffle total blocks are correct.

### Does this PR introduce _any_ user-facing change?
Yes. The user will see the correct percentile values for the stage summary shuffle read bytes.

### How was this patch tested?
I updated the unit test to verify the percentile values for every task metric. I also modified the unit test to have unique values for every metric. Previously the test had the same metrics for every field. This would not catch bugs like the wrong field being read by accident.

I manually validated the fix in the UI.

**BEFORE**
![image](https://user-images.githubusercontent.com/5604993/155433460-322078c5-1821-4f2e-8e53-8fc3902eb7fe.png)

**AFTER**
![image](https://user-images.githubusercontent.com/5604993/155433491-25ce3acf-290b-4b83-a0a9-0f9b71c7af04.png)

I manually validated the fix in the task summary API (`/api/v1/applications/application_123/1/stages/14/0/taskSummary\?quantiles\=0,0.25,0.5,0.75,1.0`). See `shuffleReadMetrics.readBytes` and `shuffleReadMetrics.totalBlocksFetched`.

Before:
```json
{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         5.63718681E8
      ],
      "totalBlocksFetched":[
         -2.0,
         -2.0,
         -2.0,
         -2.0,
         2.0
      ],
      ...
   },
   ...
}
```

After:
```json
{
   "quantiles":[
      0.0,
      0.25,
      0.5,
      0.75,
      1.0
   ],
   "shuffleReadMetrics":{
      "readBytes":[
         5.62865286E8,
         5.63779421E8,
         5.63941681E8,
         5.64327925E8,
         5.7674183E8
      ],
      "totalBlocksFetched":[
         2.0,
         2.0,
         2.0,
         2.0,
         2.0
      ],
      ...
   }
   ...
}
```

Closes apache#35637 from robreeves/SPARK-38309.

Authored-by: Rob Reeves <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0ad7677)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
(cherry picked from commit e067b12)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants