Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Nov 3, 2016

What changes were proposed in this pull request?

When profiling heap dumps from the HistoryServer and live Spark web UIs, I found a large amount of memory being wasted on duplicated objects and strings. This patch's changes remove most of this duplication, resulting in over 40% memory savings for some benchmarks.

  • Task metrics (6441f06): previously, every TaskUIData object would have its own instances of InputMetricsUIData, OutputMetricsUIData, ShuffleReadMetrics, and ShuffleWriteMetrics, but for many tasks these metrics are irrelevant because they're all zero. This patch changes how we construct these metrics in order to re-use a single immutable "empty" value for the cases where these metrics are empty.
  • TaskInfo.accumulables (ade86db): Previously, every TaskInfo object had its own empty ListBuffer for holding updates from named accumulators. Tasks which didn't use named accumulators still paid for the cost of allocating and storing this empty buffer. To avoid this overhead, I changed the val with a mutable buffer into a var which holds an immutable Scala list, allowing tasks which do not have named accumulator updates to share the same singleton Nil object.
  • String.intern() in JSONProtocol (7e05630): in the HistoryServer, executor hostnames and ids are deserialized from JSON, leading to massive duplication of these string objects. By calling String.intern() on the deserialized values we can remove all of this duplication. Since Spark now requires Java 7+ we don't have to worry about string interning exhausting the permgen (see http://java-performance.info/string-intern-in-java-6-7-8/).

How was this patch tested?

I ran

sc.parallelize(1 to 100000, 100000).count()

in spark-shell with event logging enabled, then loaded that event log in the HistoryServer, performed a full GC, and took a heap dump. According to YourKit, the changes in this patch reduced memory consumption by roughly 28 megabytes (or 770k Java objects):

image

Here's a table illustrating the drop in objects due to deduplication (the drop is <100k for some objects because some events were dropped from the listener bus; this is a separate, existing bug that I'll address separately after CPU-profiling):

image

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68038 has finished for PR 15743 at commit 7e05630.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68041 has finished for PR 15743 at commit 738cb5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 3, 2016

Aside: if this and other processes see a lot of memory used by duplicate strings then maybe we should recommend trying -XX+UseStringDeduplication for a related and automatic win.

@mridulm
Copy link
Contributor

mridulm commented Nov 3, 2016

+1 on UseStringDeduplication ! Though jdk8 specific ....

val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val executorId = (json \ "Executor ID").extract[String].intern()
val host = (json \ "Host").extract[String].intern()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general intern'ing can be dangerous.
I dont expect issues with host, but executorId (for long running jobs) can essentially OOM the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scratch that, jdk7 improvements help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading http://java-performance.info/string-intern-in-java-6-7-8/ it seems significantly safer in Java 7 (I also had the impression from tribal lore that intern() had been a bad idea in older JVMs) but not necessarily great for performance if you're going to intern hundreds of thousands of strings. But if this is a pretty targeted use for a hotspot, seems OK.

if (acc.name.isDefined && !updates.isZero) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
event.taskInfo.accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be equivalent, no ?
ListBuffer.+= does essentially :: on last

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, type change ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I was a bit unsure of but want to revisit now:

It looks like the loop

 for (accumulableInfo <- info.accumulables) {
        stageData.accumulables(accumulableInfo.id) = accumulableInfo
      }

in JobProgressListener.onTaskEnd() may assume that accumulator updates are appended to the list so that the newest update for a particular accumulator appears last, but prepending here will change that. I don't think that this behavior is covered by any existing unit tests, though. Instead of using a list here, I might want to use an immutable Queue. Let me go ahead and make this change now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, code on the StagePage uses accumulables like this:

    val externalAccumulableReadable = info.accumulables
      .filterNot(_.internal)
      .flatMap { a =>
        (a.name, a.update) match {
          case (Some(name), Some(update)) => Some(StringEscapeUtils.escapeHtml4(s"$name: $update"))
          case _ => None
        }
      }

This seems to be assuming that you'll only have one update per task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further inspection, however, it appears that this updateAccumulators method is only called once per task upon task completion or failure. The accumulator updates themselves come from Task.collectAccumulatorUpdates(), which is run on the executor and only produces one update per accumulator. Based on this, I'm pretty sure that we'll never get duplicates in TaskInfo.accumulables and therefore believe that the current code should be safe.

recordsRead = metrics.recordsRead)
}
}
val EMPTY = InputMetricsUIData(0, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
object OutputMetricsUIData {
def apply(metrics: OutputMetrics): OutputMetricsUIData = {
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the else block is more common ?
If yes, would be good to invert the condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For OutputMetrics, I'd actually assume the opposite: these metrics are referring to bytes written to an external system, not bytes written to shuffle, so the majority of tasks won't have non-zero values for this metric (all but the last stage in a multi-stage job, for example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@mridulm
Copy link
Contributor

mridulm commented Nov 3, 2016

Looks good to me, some minor queries and comments.

@JoshRosen
Copy link
Contributor Author

@srowen, that's a great tip about UseStringDeduplication; I'll give this a try in my deployment.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68075 has finished for PR 15743 at commit f8aee5d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please

* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different IDs to exist in a task.
*/
val accumulables = ListBuffer[AccumulableInfo]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to maintain binary compatibility, I could rewrite this to be a lazy val that returns a ListBuffer formed from the "real" accumulables which can remain private. I might go ahead and do that just to avoid any chance of incompatibility-related problems, although I don't anticipate this being an issue in practice.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68082 has finished for PR 15743 at commit f8aee5d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68086 has finished for PR 15743 at commit 4c867f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please

@ajbozarth
Copy link
Member

Looking through the code this looks good to me, but I can only talk to the code for the first two bullets, I don't know enough about the intern function to comment on its use. This will give us some great speed ups to go with our other recent SHS updates.

@SparkQA
Copy link

SparkQA commented Nov 4, 2016

Test build #68095 has finished for PR 15743 at commit 4c867f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* same name but different IDs to exist in a task.
*/
val accumulables = ListBuffer[AccumulableInfo]()
lazy val accumulables: ListBuffer[AccumulableInfo] = ListBuffer(_accumulables: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a def here the right thing? Otherwise, if _accumulables changes after this lazy val is evaluated, things will get out of sync.

Or given how the semantics here now are getting pretty weird, maybe changing it to an immutable list and just breaking compatibility would be better. Then you can just return _accumulables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, kind of a toss-up. My original patch broke compatibility and I did this in 4c867f1 to restore binary compatibility, but it risks changing semantics. I think I'll just revert 4c867f1 and add the minor compatibility break to a lesser-used @DeveloperAPI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, though, if we're going to expose a public API for this at all then I think it should be read-only. Let me go ahead and refactor this to expose a read-only Seq.

@JoshRosen
Copy link
Contributor Author

Alright, just pushed an update which re-introduces the binary compatibility break but chooses to expose a nicer + narrower public interface.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),

// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess mima still fails to ignore @DeveloperApi changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's intentional; we removed the logic for excluding that annotation in #11751. The rationale is discussed in https://issues.apache.org/jira/browse/SPARK-13920:

Our MIMA binary compatibility checks currently ignore APIs which are marked as @Experimentalor @DeveloperApi, but I don't think this makes sense. Even if those annotations reserve the right to break binary compatibility, we should still avoid compatibility breaks whenever possible and should be informed explicitly when compatibility breaks.

@SparkQA
Copy link

SparkQA commented Nov 7, 2016

Test build #68290 has finished for PR 15743 at commit 4c7067e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Nov 7, 2016

LGTM

@ajbozarth
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Nov 7, 2016

Test build #68292 has finished for PR 15743 at commit 9662163.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Thanks for reviewing! I'm going to merge this to master.

@asfgit asfgit closed this in 3a710b9 Nov 8, 2016
@JoshRosen JoshRosen deleted the spark-ui-memory-usage branch November 8, 2016 00:18
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

When profiling heap dumps from the HistoryServer and live Spark web UIs, I found a large amount of memory being wasted on duplicated objects and strings. This patch's changes remove most of this duplication, resulting in over 40% memory savings for some benchmarks.

- **Task metrics** (6441f06): previously, every `TaskUIData` object would have its own instances of `InputMetricsUIData`, `OutputMetricsUIData`, `ShuffleReadMetrics`, and `ShuffleWriteMetrics`, but for many tasks these metrics are irrelevant because they're all zero. This patch changes how we construct these metrics in order to re-use a single immutable "empty" value for the cases where these metrics are empty.
- **TaskInfo.accumulables** (ade86db): Previously, every `TaskInfo` object had its own empty `ListBuffer` for holding updates from named accumulators. Tasks which didn't use named accumulators still paid for the cost of allocating and storing this empty buffer. To avoid this overhead, I changed the `val` with a mutable buffer into a `var` which holds an immutable Scala list, allowing tasks which do not have named accumulator updates to share the same singleton `Nil` object.
- **String.intern() in JSONProtocol** (7e05630): in the HistoryServer, executor hostnames and ids are deserialized from JSON, leading to massive duplication of these string objects. By calling `String.intern()` on the deserialized values we can remove all of this duplication. Since Spark now requires Java 7+ we don't have to worry about string interning exhausting the permgen (see http://java-performance.info/string-intern-in-java-6-7-8/).

## How was this patch tested?

I ran

```
sc.parallelize(1 to 100000, 100000).count()
```

in `spark-shell` with event logging enabled, then loaded that event log in the HistoryServer, performed a full GC, and took a heap dump. According to YourKit, the changes in this patch reduced memory consumption by roughly 28 megabytes (or 770k Java objects):

![image](https://cloud.githubusercontent.com/assets/50748/19953276/4f3a28aa-a129-11e6-93df-d7fa91396f66.png)

Here's a table illustrating the drop in objects due to deduplication (the drop is <100k for some objects because some events were dropped from the listener bus; this is a separate, existing bug that I'll address separately after CPU-profiling):

![image](https://cloud.githubusercontent.com/assets/50748/19953290/6a271290-a129-11e6-93ad-b825f1448886.png)

Author: Josh Rosen <[email protected]>

Closes apache#15743 from JoshRosen/spark-ui-memory-usage.
yoonlee95 pushed a commit to yoonlee95/spark that referenced this pull request Aug 17, 2017
When profiling heap dumps from the HistoryServer and live Spark web UIs, I found a large amount of memory being wasted on duplicated objects and strings. This patch's changes remove most of this duplication, resulting in over 40% memory savings for some benchmarks.

- **Task metrics** (6441f06): previously, every `TaskUIData` object would have its own instances of `InputMetricsUIData`, `OutputMetricsUIData`, `ShuffleReadMetrics`, and `ShuffleWriteMetrics`, but for many tasks these metrics are irrelevant because they're all zero. This patch changes how we construct these metrics in order to re-use a single immutable "empty" value for the cases where these metrics are empty.
- **TaskInfo.accumulables** (ade86db): Previously, every `TaskInfo` object had its own empty `ListBuffer` for holding updates from named accumulators. Tasks which didn't use named accumulators still paid for the cost of allocating and storing this empty buffer. To avoid this overhead, I changed the `val` with a mutable buffer into a `var` which holds an immutable Scala list, allowing tasks which do not have named accumulator updates to share the same singleton `Nil` object.
- **String.intern() in JSONProtocol** (7e05630): in the HistoryServer, executor hostnames and ids are deserialized from JSON, leading to massive duplication of these string objects. By calling `String.intern()` on the deserialized values we can remove all of this duplication. Since Spark now requires Java 7+ we don't have to worry about string interning exhausting the permgen (see http://java-performance.info/string-intern-in-java-6-7-8/).

I ran

```
sc.parallelize(1 to 100000, 100000).count()
```

in `spark-shell` with event logging enabled, then loaded that event log in the HistoryServer, performed a full GC, and took a heap dump. According to YourKit, the changes in this patch reduced memory consumption by roughly 28 megabytes (or 770k Java objects):

![image](https://cloud.githubusercontent.com/assets/50748/19953276/4f3a28aa-a129-11e6-93df-d7fa91396f66.png)

Here's a table illustrating the drop in objects due to deduplication (the drop is <100k for some objects because some events were dropped from the listener bus; this is a separate, existing bug that I'll address separately after CPU-profiling):

![image](https://cloud.githubusercontent.com/assets/50748/19953290/6a271290-a129-11e6-93ad-b825f1448886.png)

Author: Josh Rosen <[email protected]>

Closes apache#15743 from JoshRosen/spark-ui-memory-usage.

Conflicts:
	project/MimaExcludes.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants