Skip to content

Conversation

@ho3rexqj
Copy link
Contributor

@ho3rexqj ho3rexqj commented Jan 8, 2018

When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor unless memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value.

This patch fixes the above by explicitly caching the underlying values using weak references in a ReferenceMap.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be useful, cc @cloud-fan to also take a look.


test("One broadcast value instance per executor") {
val conf = new SparkConf()
.setMaster("local[10]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: normally local[4] is sufficient.

@jiangxb1987
Copy link
Contributor

Please also update the PR title and description. Thanks!

@ho3rexqj ho3rexqj changed the title [SPARK-22986][Core] Fix/cache broadcast values [SPARK-22986][Core] Use a local cache to avoid instantiating multiple instances of broadcast variable values Jan 11, 2018
@ho3rexqj ho3rexqj changed the title [SPARK-22986][Core] Use a local cache to avoid instantiating multiple instances of broadcast variable values [SPARK-22986][Core] Use a cache to avoid instantiating multiple instances of broadcast variable values Jan 11, 2018
@ho3rexqj
Copy link
Contributor Author

Updated the above, thanks!

private val nextBroadcastId = new AtomicLong(0)

private[broadcast] val cachedValues = {
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the key not a weak reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose the first thread to request the broadcast variable's value destroyed it's instance of the broadcast variable (which, I believe, is what will happen when that thread finishes processing it's partition) - if the key were a weak reference in the above cache it would become eligible for GC at that point. I'm reasonably certain at that point the associated key/value pair would be removed from the cache; in other words, if the key were a weak reference the key/value pair would be removed as soon as the key or value was garbage collected.

Note that I haven't used ReferenceMap extensively, so I could be wrong about the above - feel free to correct me if that's the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between this and "weak key, hard value"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the state of an executor at some point in time:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

After some time Thread1 finishes process the partition it's working on and starts on the next - the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

At some point the GC destroys TorrentBroadcastInstance1. Now, if the key is a weak reference, the state becomes:

Cache: Empty
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

The next thread to finish processing a partition then creates a new instance of the broadcast value:

Cache: IdInstance6 => ValueInstance2
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance2)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

On the other hand, if the key is a strong reference the the value is weak, the cached value isn't eligible for GC above. As such, when Thread3 finishes processing it's partition and starts the next, the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, because we never get the key reference outside of the map, makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I found from the doc:

Hash-based Map implementation that allows mappings to be removed by the garbage collector.
When you construct a ReferenceMap, you can specify what kind of references are used to store the map's keys and values. If non-hard references are used, then the garbage collector can remove mappings if a key or value becomes unreachable, or if the JVM's memory is running low. For information on how the different reference types behave, see Reference.

It only mentions that non-hard references can be removed by GC, please correct me if I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine even if that hard references not removed, since the memory consumption should be quite minor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - welcome back @jerryshao ! long time no see!

Copy link
Contributor

@cloud-fan cloud-fan Jan 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove mappings... That's why I say entries can be removed automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, I see. Thanks!

throw new SparkException(s"Failed to store $broadcastId in BlockManager")
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: code style

...getOrElse {
  xxx
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks!

val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
setConf(SparkEnv.get.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to confirm, the following code is exactly same as before, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - everything within the getOrElse block is unchanged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, sorry - the cache update takes place within that block. With the exception of those blocks (lines 220-222 and lines 244-246), yes.

@cloud-fan
Copy link
Contributor

ok to test

throw new SparkException(s"Failed to store $broadcastId in BlockManager")
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ReferenceMap thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReferenceMap is not thread safe, no - however, all operations on broadcastCache occur within the context of a synchronized block; TorrentBroadcast.scala lines 208-254.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for pointing out.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86010 has finished for PR 20183 at commit 8e13585.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86016 has finished for PR 20183 at commit a6e5e81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Jan 12, 2018
…nces of broadcast variable values

When resources happen to be constrained on an executor the first time a broadcast variable is instantiated it is persisted to disk by the BlockManager. Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock from other instances of that broadcast variable spawns another instance of the underlying value. That is, broadcast variables are spawned once per executor **unless** memory is constrained, in which case every instance of a broadcast variable is provided with a unique copy of the underlying value.

This patch fixes the above by explicitly caching the underlying values using weak references in a ReferenceMap.

Author: ho3rexqj <[email protected]>

Closes #20183 from ho3rexqj/fix/cache-broadcast-values.

(cherry picked from commit cbe7c6f)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in cbe7c6f Jan 12, 2018
@ho3rexqj ho3rexqj deleted the fix/cache-broadcast-values branch January 12, 2018 14:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants