-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25998] [CORE] Change TorrentBroadcast to hold weak reference of broadcast object #22995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.broadcast | ||
|
|
||
| import java.io._ | ||
| import java.lang.ref.SoftReference | ||
| import java.nio.ByteBuffer | ||
| import java.util.zip.Adler32 | ||
|
|
||
|
|
@@ -61,9 +62,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) | |
| * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]], | ||
| * which builds this value by reading blocks from the driver and/or other executors. | ||
| * | ||
| * On the driver, if the value is required, it is read lazily from the block manager. | ||
| * On the driver, if the value is required, it is read lazily from the block manager. We hold | ||
| * a soft reference so that it can be garbage collected if required, as we can always reconstruct | ||
| * in the future. | ||
| */ | ||
| @transient private lazy val _value: T = readBroadcastBlock() | ||
| @transient private var _value: SoftReference[T] = _ | ||
|
|
||
| /** The compression codec to use, or None if compression is disabled */ | ||
| @transient private var compressionCodec: Option[CompressionCodec] = _ | ||
|
|
@@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) | |
| /** The checksum for all the blocks. */ | ||
| private var checksums: Array[Int] = _ | ||
|
|
||
| override protected def getValue() = { | ||
| _value | ||
| override protected def getValue() = synchronized { | ||
| val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose there is a race condition here, in that several threads could end up simultaneously setting the reference. It won't be incorrect as the data ought to be the same. I am not sure of the access pattern for this object; maybe it's always single-threaded. But if lots are reading, you can imagine them all causing a call to Introducing another object to lock on is safe and not too much extra legwork. Might be worth it. Isn't WeakReference cleared on any GC? would SoftReference be better to hold out until memory is exhausted? to avoid re-reading. There's a tradeoff there. Good idea, just surprisingly full of possible gotchas. Nit: isn't
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. I'll make it synchronized, so it only loads one at a time. Re: WeakReference, sure, I can change it to SoftReference. That'll be closer to the original behavior, and should still give the improvement we want. When I try with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, weird, I thought it would work based on a little local example, but yeah leave the cast in of course. |
||
| if (memoized != null) { | ||
| memoized | ||
| } else { | ||
| val newlyRead = readBroadcastBlock() | ||
| _value = new SoftReference[T](newlyRead) | ||
| newlyRead | ||
| } | ||
| } | ||
|
|
||
| private def calcChecksum(block: ByteBuffer): Int = { | ||
|
|
@@ -205,8 +215,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) | |
| } | ||
|
|
||
| private def readBroadcastBlock(): T = Utils.tryOrIOException { | ||
| TorrentBroadcast.synchronized { | ||
| val broadcastCache = SparkEnv.get.broadcastManager.cachedValues | ||
| val broadcastCache = SparkEnv.get.broadcastManager.cachedValues | ||
| broadcastCache.synchronized { | ||
|
|
||
| Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { | ||
| setConf(SparkEnv.get.conf) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that below, the readBroadcastBlock() method is synchronized on the companion object, which makes me nervous. However a lazy val is also implemented with
this.synchronized, so I suspect this is fine. We pay the cost of synchronization on every call now, but I think that is OK here, as a simple flag won't help us figure out whether the SoftReference has been cleared.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the
TorrentBroadcast.synchronizedinreadBroadcastBlock, since we're already synchronizing in its only caller? Though I'm not sure why it was necessary in the first place, asreadBroadcastBlockshould only have been called once before this PR.Regardless, I agree that the perf hit should be ok. Let me know if you want any of this changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, that
TorrentBroadcast.synchronizedis from a very old version of the code in 2013, when more things used that lock. I'm pretty certain it's obsolete. However this code accesses broadcastCache and that needs synchronization. (It's kind of unfortunate where this object is). I think we could actually improve this by locking on broadcastCache for basically the whole block. I think that's a safe improvement as nothing else uses broadcastCache, nothing else is synchronized here, and should behave just the same.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean switching
TorrentBroadcast.synchronizedtobroadcastCache.synchronizedinsidereadBroadcastBlock, or changingthis.synchronizedtobroadcastCache.synchronizedinsidegetValue()(and getting rid of the lock inreadBroadcastBlock?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need two locks, one to protect the local reference to the block, and one to protect the shared cache object. I was thinking of the former.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done