From a2683b62985fc9c7d15fb92f3bb170a4b5225058 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Thu, 8 Nov 2018 18:04:06 -0500 Subject: [PATCH 1/8] use weak reference for torrent broadcast --- .../spark/broadcast/TorrentBroadcast.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index cbd49e070f2eb..1b6356ee6533d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import java.util.zip.Adler32 import scala.collection.JavaConverters._ +import scala.ref.WeakReference import scala.reflect.ClassTag import scala.util.Random @@ -61,9 +62,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]], * which builds this value by reading blocks from the driver and/or other executors. * - * On the driver, if the value is required, it is read lazily from the block manager. + * On the driver, if the value is required, it is read lazily from the block manager. We hold + * a weak reference so that it can be garbage collected if required, as we can always reconstruct + * in the future. */ - @transient private lazy val _value: T = readBroadcastBlock() + @transient private var _value: WeakReference[T] = new WeakReference() /** The compression codec to use, or None if compression is disabled */ @transient private var compressionCodec: Option[CompressionCodec] = _ @@ -93,7 +96,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private var checksums: Array[Int] = _ override protected def getValue() = { - _value + val memoized: Option[T] = _value.get + if (memoized.isDefined) { + memoized.get + } else { + val newlyRead = readBroadcastBlock() + _value = new WeakReference(newlyRead) + newlyRead + } } private def calcChecksum(block: ByteBuffer): Int = { From 99fbeecf43a289648a56d178fa55e188ce75bdb7 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Fri, 9 Nov 2018 16:04:51 -0500 Subject: [PATCH 2/8] fix compile --- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 1b6356ee6533d..5c791b4b32ba7 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -66,7 +66,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) * a weak reference so that it can be garbage collected if required, as we can always reconstruct * in the future. */ - @transient private var _value: WeakReference[T] = new WeakReference() + @transient private var _value: WeakReference[T] = _ /** The compression codec to use, or None if compression is disabled */ @transient private var compressionCodec: Option[CompressionCodec] = _ @@ -96,12 +96,16 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private var checksums: Array[Int] = _ override protected def getValue() = { - val memoized: Option[T] = _value.get + var memoized : Option[T] = None + if (_value != null) { + memoized = _value.get + } + if (memoized.isDefined) { memoized.get } else { val newlyRead = readBroadcastBlock() - _value = new WeakReference(newlyRead) + _value = new WeakReference[T](newlyRead) newlyRead } } From 5e0a179c168a70b0166abe4bb51a1d26a2f1d666 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Fri, 9 Nov 2018 16:33:22 -0500 Subject: [PATCH 3/8] fix --- .../spark/broadcast/TorrentBroadcast.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 5c791b4b32ba7..0c89b80ce812e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -18,14 +18,13 @@ package org.apache.spark.broadcast import java.io._ +import java.lang.ref.WeakReference import java.nio.ByteBuffer import java.util.zip.Adler32 import scala.collection.JavaConverters._ -import scala.ref.WeakReference import scala.reflect.ClassTag import scala.util.Random - import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec @@ -66,7 +65,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) * a weak reference so that it can be garbage collected if required, as we can always reconstruct * in the future. */ - @transient private var _value: WeakReference[T] = _ + @transient private var _value: WeakReference[T] = new WeakReference(null.asInstanceOf[T]) /** The compression codec to use, or None if compression is disabled */ @transient private var compressionCodec: Option[CompressionCodec] = _ @@ -96,17 +95,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private var checksums: Array[Int] = _ override protected def getValue() = { - var memoized : Option[T] = None - if (_value != null) { - memoized = _value.get - } - - if (memoized.isDefined) { - memoized.get - } else { + val memoized = _value.get() + if (memoized == null) { val newlyRead = readBroadcastBlock() _value = new WeakReference[T](newlyRead) newlyRead + } else { + memoized } } From 1908b5b8dfa6c0b55db3bd9a90e21ca713e5bf25 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Fri, 9 Nov 2018 16:48:44 -0500 Subject: [PATCH 4/8] no npe --- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 0c89b80ce812e..c8c37a0c51bb6 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -25,6 +25,7 @@ import java.util.zip.Adler32 import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.Random + import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec @@ -65,7 +66,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) * a weak reference so that it can be garbage collected if required, as we can always reconstruct * in the future. */ - @transient private var _value: WeakReference[T] = new WeakReference(null.asInstanceOf[T]) + @transient private var _value: WeakReference[T] = _ /** The compression codec to use, or None if compression is disabled */ @transient private var compressionCodec: Option[CompressionCodec] = _ @@ -95,13 +96,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private var checksums: Array[Int] = _ override protected def getValue() = { - val memoized = _value.get() - if (memoized == null) { + val memoized = Option.apply(_value).flatMap(x => Option.apply(x.get)) + if (memoized.isDefined) { + memoized.get + } else { val newlyRead = readBroadcastBlock() _value = new WeakReference[T](newlyRead) newlyRead - } else { - memoized } } From 24183e5b8b63e0b4e117856ab4de7eb1b0ea6c9a Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Fri, 9 Nov 2018 16:52:21 -0500 Subject: [PATCH 5/8] no option --- .../scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index c8c37a0c51bb6..9fc5b0b607490 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -96,9 +96,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private var checksums: Array[Int] = _ override protected def getValue() = { - val memoized = Option.apply(_value).flatMap(x => Option.apply(x.get)) - if (memoized.isDefined) { - memoized.get + val memoized: T = if (value == null) null.asInstanceOf[T] else _value.get + if (memoized != null) { + memoized } else { val newlyRead = readBroadcastBlock() _value = new WeakReference[T](newlyRead) From f212da322242386ce3b71e9961a964e60b587287 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Fri, 9 Nov 2018 17:08:23 -0500 Subject: [PATCH 6/8] typo --- .../scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 9fc5b0b607490..0a2797f1daa54 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -96,7 +96,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private var checksums: Array[Int] = _ override protected def getValue() = { - val memoized: T = if (value == null) null.asInstanceOf[T] else _value.get + val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get if (memoized != null) { memoized } else { From 09ae762962098e58be7ba8f777f9dffde2f81d81 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Mon, 26 Nov 2018 14:18:54 -0500 Subject: [PATCH 7/8] code review --- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 0a2797f1daa54..77696ff161c54 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -18,7 +18,7 @@ package org.apache.spark.broadcast import java.io._ -import java.lang.ref.WeakReference +import java.lang.ref.SoftReference import java.nio.ByteBuffer import java.util.zip.Adler32 @@ -63,10 +63,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) * which builds this value by reading blocks from the driver and/or other executors. * * On the driver, if the value is required, it is read lazily from the block manager. We hold - * a weak reference so that it can be garbage collected if required, as we can always reconstruct + * a soft reference so that it can be garbage collected if required, as we can always reconstruct * in the future. */ - @transient private var _value: WeakReference[T] = _ + @transient private var _value: SoftReference[T] = _ /** The compression codec to use, or None if compression is disabled */ @transient private var compressionCodec: Option[CompressionCodec] = _ @@ -95,13 +95,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) /** The checksum for all the blocks. */ private var checksums: Array[Int] = _ - override protected def getValue() = { + override protected def getValue() = synchronized { val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get if (memoized != null) { memoized } else { val newlyRead = readBroadcastBlock() - _value = new WeakReference[T](newlyRead) + _value = new SoftReference[T](newlyRead) newlyRead } } From 4a771cf1a2877f8c0f083fd9326217f6a2653752 Mon Sep 17 00:00:00 2001 From: Brandon Krieger Date: Tue, 27 Nov 2018 12:46:51 -0500 Subject: [PATCH 8/8] change sync --- .../scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 77696ff161c54..26ead57316e18 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -215,8 +215,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) } private def readBroadcastBlock(): T = Utils.tryOrIOException { - TorrentBroadcast.synchronized { - val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + broadcastCache.synchronized { Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { setConf(SparkEnv.get.conf)