Skip to content

Conversation

@alex-balikov
Copy link
Contributor

What changes were proposed in this pull request?

This PR addresses the issue raised in https://issues.apache.org/jira/browse/SPARK-39983 - broadcast relations should not be cached on the driver as they are not needed and can cause significant memory pressure (in one case the relation was 60MB )

The PR adds a new SparkContext.broadcastInternal method with parameter serializedOnly allowing the caller to specify that the broadcasted object should be stored only in serialized form. The current behavior is to also cache an unserialized form of the object.

The PR changes the broadcast implementation in TorrentBroadcast to honor the serializedOnly flag and not store the unserialized value, unless the execution is in a local mode (single process). In that case the broadcast cache is effectively shared between driver and executors and thus the unserialized value needs to be cached to satisfy the executor-side of the functionality.

Why are the changes needed?

The broadcast relations can be fairly large (observed 60MB one) and are not needed in unserialized form on the driver.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a new unit test to BroadcastSuite verifying the low-level broadcast functionality in respect to the serializedOnly flag.
Added a new unit test to BroadcastExchangeSuite verifying that broadcasted relations are not cached on the driver.

@alex-balikov alex-balikov changed the title [SPARK-39983][CORE} Do not cache unserialized broadcast relations on the driver [SPARK-39983][CORE] Do not cache unserialized broadcast relations on the driver Aug 5, 2022
@mridulm
Copy link
Contributor

mridulm commented Aug 5, 2022

Doesn't this not break in local mode ?

@JoshRosen
Copy link
Contributor

@mridulm, it doesn't break local mode because there's a carve-out to preserve the existing behavior in that case: in both places where the if(serializedOnly check changes behavior, there's a check for isLocalMaster to avoid behavior changes:

We'll still store the original object in the driver block manager at write time in local mode:

private def writeBlocks(value: T): Int = {
import StorageLevel._
val blockManager = SparkEnv.get.blockManager
if (serializedOnly && !isLocalMaster) {

There's a similar carve-out in readBroadcastBlock (although I don't think we'd ever actually hit that branch in local mode given that we would have already stored the re-assembled broadcast block in writeBlocks):

if (!serializedOnly || isLocalMaster || Utils.isInRunningSparkTask) {
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
}

@JoshRosen JoshRosen changed the title [SPARK-39983][CORE] Do not cache unserialized broadcast relations on the driver [SPARK-39983][CORE][SQL] Do not cache unserialized broadcast relations on the driver Aug 5, 2022
@mridulm
Copy link
Contributor

mridulm commented Aug 5, 2022

@JoshRosen Ah yes, missed out on that - should have taken a more detailed look.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll leave this open for a day or two to see if anyone has additional comments or requested changes, then will merge it.


I think this is a useful and sound optimization:

  • It doesn't impact local mode.
  • It doesn't impact user-created broadcast variables.
  • It only impacts Spark-internal broadcast variables when .value is accessed on the driver (e.g. in BroadcastHashJoinExec, where the value is accessed to see if the relation contains all unique keys).
    • These accesses occur very soon after the broadcast variables are created. Most likely, the _value SoftReference assigned in writeBlocks() will still be alive.
    • If the SoftReference is cleaned up by the time that .value is called then Spark will deserialize a fresh copy, but won't store it in the driver's block manager. This (internal) change in behavior means that concurrent / subsequent driver-side .value accesses could be ever-so-slightly more likely to deserialize a duplicated copy of the broadcasted value, but I think this could only occur in scenarios where the soft reference and the broadcastCache have been cleaned up but the block manager hasn't spilled blocks: I expect this situation to be rare, especially when compared to the situations where this patch's changes are beneficial.

Compared to executors, the driver has a much higher proportion of "unmanaged" memory usage: Spark tasks explicitly request their "execution memory" and this can trigger eviction of "storage memory", but the driver lacks a clean mechanism to trigger spilling of storage memory in response to "unmanaged" memory pressure (from user code or the analyzer/optimizer, for example). As a result, storing large broadcasted relations in the driver's block manager can create memory pressure that leads to high GC times or OOMs.

This patch approaches this problem by simply skipping the storage in cases where we know that it's not necessary. I expect this to deliver large benefits for broadcast relations.

In theory, we might also experience memory pressure from the storage of the serialized pieces of the broadcast variable (i.e. the other blocks created in writeBlocks()). We can't skip the storage of these blocks because executors need to read them. These blocks are compressed so they're probably much smaller than the deserialized broadcast relation itself, so in practice this might be a less severe issue. If it turns out that the broadcasted chunks are also causing excessive memory pressure then we'll have to consider additional fixes, such as a mechanism for "unmanaged memory pressure" to trigger storage memory spilling on the driver. Let's only do that if we find that it's needed in practice, though, since that potentially adds a lot of complexity.

@mridulm
Copy link
Contributor

mridulm commented Aug 8, 2022

QQ @JoshRosen, @alex-balikov - if the expectation is that the variable can be recreated (if missing) at driver - with that being a remote possibility, do we want to make it a WeakReference instead of a SoftReference ?
Softref's are kept around as much as possible by the jvm (usually until there are possibilities of OOM), unlike Weakref's

@JoshRosen
Copy link
Contributor

QQ @JoshRosen, @alex-balikov - if the expectation is that the variable can be recreated (if missing) at driver - with that being a remote possibility, do we want to make it a WeakReference instead of a SoftReference ? Softref's are kept around as much as possible by the jvm (usually until there are possibilities of OOM), unlike Weakref's

Good question. It looks like PR #22995 originally proposed to make this into a WeakReference but ended up using a SoftReference in an attempt to reduce the likelihood that a broadcast variable would be cleared on GC. I'm slightly hesitant to want to change behaviors that could impact user created broadcast variables (although maybe I'm being overly-conservative).

I don't think that changing SoftReference to WeakReference would negatively impact broadcast hash joins or other SQL operators that use internal broadcast variables: each of those operators retrieves the broadcast value and stores a strong reference to it for the duration of the task, so as long as one task is running the WeakReference here and in the cache added in #20183 should still be alive, so the cache will be effective in keeping the broadcast variable value from being duplicated.

I agree that the WeakReference could make sense for these internal broadcast variables on the driver, though. It looks like SoftReference and WeakReference share a common superclass, so we can flag the reference type based on whether this is an internal or external broadcast variable. Here's a patch implementing that idea : JoshRosen@01fd8be

@alex-balikov, @mridulm, WDYT? If that WeakRef change makes sense to you, let's cherry-pick it into this PR?

@mridulm
Copy link
Contributor

mridulm commented Aug 9, 2022

The proposed change looks good to me @JoshRosen

@alex-balikov
Copy link
Contributor Author

I applied @JoshRosen 's change

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well, so I'm merging this to master (Spark 3.4.0). Thanks!

@JoshRosen JoshRosen closed this in e17d8ec Aug 11, 2022
@sos3k
Copy link

sos3k commented Sep 2, 2022

Screenshot 2022-09-02 at 13 57 44

Hi everyone, I am Radek from HuuugeGames, we use Databricks in a version of runtime 10.4 LTS and I wanted to just let you know that after including your changes to the runtime (Databricks did that at 26.08 during their maintenance) we found our job started to behave inconsistently as from time to time we are pruning all of the source files during the scanning from s3 with using dynamic file pruning. I attached the screenshot that shows lost of broadcasted data during the DFP (1 records read from Reuse Exchange, where normally there should be 97) which results no records read from the S3. We are making join between small dim and events table and definitely something is happening here. After disable of DFP the plan has changed and the process looks stable. We also got back to the previous version of the Databricks runtime image without this changes and also process looks good even when DFP is enabled.

@JoshRosen
Copy link
Contributor

Hi @sos3k,

We investigated your bug report and determined that the root-cause was a latent bug in UnsafeHashedRelation that was triggered more frequently following the backport of this PR.

PR #35836 fixed a bug related to driver-side deserialization of UnsafeHashedRelation, but that fix was missing from DBR 9.1 and 10.4 (but is present in all newer versions). We have deployed a hotfix to backport #35836 into DBR 9.1 and 10.4.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants