Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
case Some(x) =>
releaseLock(broadcastId)
x.asInstanceOf[T]

blockManager.getLocalValues(broadcastId) match {
case Some(blockResult) =>
if (blockResult.data.hasNext) {
val x = blockResult.data.next().asInstanceOf[T]
releaseLock(broadcastId)
x
} else {
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
}
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ private[storage] class PartiallyUnrolledIterator[T](
}

override def next(): T = {
if (unrolled == null) {
if (unrolled == null || !unrolled.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, only the null check should be there - with the !hasNext enforced as unrolled = null if false.
This is part of a tight loop, and would be better if the footprint is kept as small as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a fair point because it should be fair to put the burden on the caller to check hasNext before calling next and now TorrentBroadcast does that. However, are there other call sites that need that type of fix too? if all callers are well behaved then I agree we could revert the added hasNext call in next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, next() without hasNext is a valid code flow, and our code should not break due to caller not invoking hasNext (at best throw NoSuchElementException if hasNext == false).
Another option is to add hasNext check here - but that would be worse (since normal flow will then check hasNext twice).

If we cant ensure "require(unrolled == null || unrolled.hasNext)", then current change is best we can do I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that next() without hasNext is a valid flow. However, the caller which behaves like that should also aware of the possibility of no element exception.

TorrentBroadcast is problematic because it doesn't call hasNext and doesn't handle this possibility.

I'd prefer to revert the added hasNext call in next. But not strong option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt this a correctness issue though? If unrolled has no more elements the correct result is to return from the other iterator not throw an exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested thrice without !unrolled.hasNext on more than 100 billion data. They all work very well. I will remove !unrolled.hasNext

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against the final state here without !unrolled.hasNext, because indeed callers should really check hasNext and if they don't it should be considered a bug. Do we think we got all the call sites for this though?

The thing that concerns me is that next will actually do the wrong thing if hasNext isn't called and unrolled has no elements. It will fail rather than just fall back to rest. Scala says it's undefined in this case; Java does not.

Copy link
Member

@zsxwing zsxwing Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I don't like to expose undefined behavior to the user, either. It's better to throw an exception instead. E.g., Scala's ConcatIterator is also implemented in this way: https://github.com/scala/scala/blob/v2.12.1/src/library/scala/collection/Iterator.scala#L216

IMO, we should not assume an Iterator follows Java Iterator's contract, but if we are implementing an Iterator, it's better to follow it to avoiding spending a lot of time on debugging misusing Iterator in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It is fair to me to add !unrolled.hasNext to next for more predicable behavior.

To throw a exception might be a little strange to me, as it still has elements so a no such element exception seems not correct. It just doesn't correctly fall back to rest.

Copy link
Member

@zsxwing zsxwing Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To throw a exception might be a little strange to me, as it still has elements so a no such element exception seems not correct. It just doesn't correctly fall back to rest.

I meant when hasNext returns false, next should throw NoSuchElementException.

rest.next()
} else {
unrolled.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
}

test("Cache broadcast to disk") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.memory.useLegacyMode", "true")
.set("spark.storage.memoryFraction", "0.0")
Copy link
Member

@viirya viirya Dec 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't spark.storage.memoryFraction read-only now?
nvm. I checked the code, we still can set this config if spark.memory.useLegacyMode is true.

sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
assert(broadcast.value.sum === 10)
}

/**
* Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
*
Expand Down