Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong

import scala.reflect.ClassTag

import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -52,6 +54,10 @@ private[spark] class BroadcastManager(

private val nextBroadcastId = new AtomicLong(0)

private[broadcast] val cachedValues = {
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the key not a weak reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose the first thread to request the broadcast variable's value destroyed it's instance of the broadcast variable (which, I believe, is what will happen when that thread finishes processing it's partition) - if the key were a weak reference in the above cache it would become eligible for GC at that point. I'm reasonably certain at that point the associated key/value pair would be removed from the cache; in other words, if the key were a weak reference the key/value pair would be removed as soon as the key or value was garbage collected.

Note that I haven't used ReferenceMap extensively, so I could be wrong about the above - feel free to correct me if that's the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between this and "weak key, hard value"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the state of an executor at some point in time:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

After some time Thread1 finishes process the partition it's working on and starts on the next - the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

At some point the GC destroys TorrentBroadcastInstance1. Now, if the key is a weak reference, the state becomes:

Cache: Empty
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

The next thread to finish processing a partition then creates a new instance of the broadcast value:

Cache: IdInstance6 => ValueInstance2
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance2)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

On the other hand, if the key is a strong reference the the value is weak, the cached value isn't eligible for GC above. As such, when Thread3 finishes processing it's partition and starts the next, the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, because we never get the key reference outside of the map, makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I found from the doc:

Hash-based Map implementation that allows mappings to be removed by the garbage collector.
When you construct a ReferenceMap, you can specify what kind of references are used to store the map's keys and values. If non-hard references are used, then the garbage collector can remove mappings if a key or value becomes unreachable, or if the JVM's memory is running low. For information on how the different reference types behave, see Reference.

It only mentions that non-hard references can be removed by GC, please correct me if I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine even if that hard references not removed, since the memory consumption should be quite minor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - welcome back @jerryshao ! long time no see!

Copy link
Contributor

@cloud-fan cloud-fan Jan 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove mappings... That's why I say entries can be removed automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, I see. Thanks!

}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)

private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
blockManager.getLocalValues(broadcastId) match {
case Some(blockResult) =>
if (blockResult.data.hasNext) {
val x = blockResult.data.next().asInstanceOf[T]
releaseLock(broadcastId)
x
} else {
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
}
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

try {
val obj = TorrentBroadcast.unBlockifyObject[T](
blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ReferenceMap thread safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReferenceMap is not thread safe, no - however, all operations on broadcastCache occur within the context of a synchronized block; TorrentBroadcast.scala lines 208-254.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for pointing out.

setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
blockManager.getLocalValues(broadcastId) match {
case Some(blockResult) =>
if (blockResult.data.hasNext) {
val x = blockResult.data.next().asInstanceOf[T]
releaseLock(broadcastId)

if (x != null) {
broadcastCache.put(broadcastId, x)
}

x
} else {
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
}
obj
} finally {
blocks.foreach(_.dispose())
}
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

try {
val obj = TorrentBroadcast.unBlockifyObject[T](
blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}

if (obj != null) {
broadcastCache.put(broadcastId, obj)
}

obj
} finally {
blocks.foreach(_.dispose())
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,40 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
assert(broadcast.value.sum === 10)
}

test("One broadcast value instance per executor") {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("test")

sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val instances = sc.parallelize(1 to 10)
.map(x => System.identityHashCode(broadcast.value))
.collect()
.toSet

assert(instances.size === 1)
}

test("One broadcast value instance per executor when memory is constrained") {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("test")
.set("spark.memory.useLegacyMode", "true")
.set("spark.storage.memoryFraction", "0.0")

sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val instances = sc.parallelize(1 to 10)
.map(x => System.identityHashCode(broadcast.value))
.collect()
.toSet

assert(instances.size === 1)
}

/**
* Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
*
Expand Down