Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Dec 12, 2016

What changes were proposed in this pull request?

NoSuchElementException will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover !unrolled.hasNext in next() function.

This change is to cover the !unrolled.hasNext and check hasNext before calling next in blockManager.getLocalValues to make it more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:

16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more

How was this patch tested?

Add unit test

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70004 has started for PR 16252 at commit 58acc06.

.setMaster("local")
.setAppName("test")
.set("spark.memory.useLegacyMode", "true")
.set("spark.storage.memoryFraction", "0.0")
Copy link
Member

@viirya viirya Dec 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't spark.storage.memoryFraction read-only now?
nvm. I checked the code, we still can set this config if spark.memory.useLegacyMode is true.


override def next(): T = {
if (unrolled == null) {
if (unrolled == null || unrolled.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with hasNext, should this be !unrolled.hasNext? Although I don't know this code, this change seems correct. I suppose you could also null out unrolled if you find it doesn't have more elements in hasNext.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we already did this null-out in hasNext.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum This bug happens because in TorrentBroadcast.readBroadcastBlock, we directly call next() without calling hasNext() first which can null out unrolled.

You can check out

blockManager.getLocalValues(broadcastId).map(_.data.next()) match {

Calling hasNext() before next() can fix this too. But this fixing is simpler and I think it should be ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, it's in releaseUnrollMemory(). Does this not throw an exception right now if next() is called before hasNext()? We kinda should also fix TorrentBroadcast to either handle the exception or check hasNext()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception thrown now if next() is called before hasNext(). Formally I think we should call hasNext() before next().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it with the change like:

   private def readBroadcastBlock(): T = Utils.tryOrIOException {
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
       val blockManager = SparkEnv.get.blockManager
-      blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
-        case Some(x) =>
-          releaseLock(broadcastId)
-          x.asInstanceOf[T]
+      blockManager.getLocalValues(broadcastId) match {
+        case Some(blockResult) =>
+          if (blockResult.data.hasNext)  {
+            val x = blockResult.data.next().asInstanceOf[T]
+            releaseLock(broadcastId)
+            x
+          } else {
+            throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
+          }

@viirya
Copy link
Member

viirya commented Dec 12, 2016

retest this please.

@viirya
Copy link
Member

viirya commented Dec 12, 2016

cc @JoshRosen

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70019 has finished for PR 16252 at commit 58acc06.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70022 has finished for PR 16252 at commit d964c54.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 13, 2016

org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false has succeeded on my local test.

@wangyum
Copy link
Member Author

wangyum commented Dec 13, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 13, 2016

Test build #70055 has finished for PR 16252 at commit d964c54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Dec 13, 2016

@wangyum how about adding @viirya 's suggested change to TorrentBroadcast? it would be even more robust.

@viirya
Copy link
Member

viirya commented Dec 13, 2016

+1

@wangyum how about adding @viirya 's suggested change to TorrentBroadcast? it would be even more robust.

@SparkQA
Copy link

SparkQA commented Dec 13, 2016

Test build #70080 has finished for PR 16252 at commit f004740.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 13, 2016

@srowen @viirya I have added it.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's pretty good then. I'd like to hear another opinion if possible.

@viirya
Copy link
Member

viirya commented Dec 13, 2016

@wangyum Thanks! LGTM


override def next(): T = {
if (unrolled == null) {
if (unrolled == null || !unrolled.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, only the null check should be there - with the !hasNext enforced as unrolled = null if false.
This is part of a tight loop, and would be better if the footprint is kept as small as possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a fair point because it should be fair to put the burden on the caller to check hasNext before calling next and now TorrentBroadcast does that. However, are there other call sites that need that type of fix too? if all callers are well behaved then I agree we could revert the added hasNext call in next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, next() without hasNext is a valid code flow, and our code should not break due to caller not invoking hasNext (at best throw NoSuchElementException if hasNext == false).
Another option is to add hasNext check here - but that would be worse (since normal flow will then check hasNext twice).

If we cant ensure "require(unrolled == null || unrolled.hasNext)", then current change is best we can do I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that next() without hasNext is a valid flow. However, the caller which behaves like that should also aware of the possibility of no element exception.

TorrentBroadcast is problematic because it doesn't call hasNext and doesn't handle this possibility.

I'd prefer to revert the added hasNext call in next. But not strong option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt this a correctness issue though? If unrolled has no more elements the correct result is to return from the other iterator not throw an exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested thrice without !unrolled.hasNext on more than 100 billion data. They all work very well. I will remove !unrolled.hasNext

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against the final state here without !unrolled.hasNext, because indeed callers should really check hasNext and if they don't it should be considered a bug. Do we think we got all the call sites for this though?

The thing that concerns me is that next will actually do the wrong thing if hasNext isn't called and unrolled has no elements. It will fail rather than just fall back to rest. Scala says it's undefined in this case; Java does not.

Copy link
Member

@zsxwing zsxwing Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I don't like to expose undefined behavior to the user, either. It's better to throw an exception instead. E.g., Scala's ConcatIterator is also implemented in this way: https://github.com/scala/scala/blob/v2.12.1/src/library/scala/collection/Iterator.scala#L216

IMO, we should not assume an Iterator follows Java Iterator's contract, but if we are implementing an Iterator, it's better to follow it to avoiding spending a lot of time on debugging misusing Iterator in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It is fair to me to add !unrolled.hasNext to next for more predicable behavior.

To throw a exception might be a little strange to me, as it still has elements so a no such element exception seems not correct. It just doesn't correctly fall back to rest.

Copy link
Member

@zsxwing zsxwing Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To throw a exception might be a little strange to me, as it still has elements so a no such element exception seems not correct. It just doesn't correctly fall back to rest.

I meant when hasNext returns false, next should throw NoSuchElementException.

@rxin
Copy link
Contributor

rxin commented Dec 14, 2016

Thanks for submitting this. We need a much better description for both the JIRA ticket and the pull request. If I understand it correctly, you are fixing an issue to support broadcasting blocks that are stored on-disk, rather than in-memory. Can you put that in the description? Also we should put the current failure mode in the description. It's great that you gave a test case, but it's unclear how the current behavior breaks.

@rxin
Copy link
Contributor

rxin commented Dec 14, 2016

cc @zsxwing also

@SparkQA
Copy link

SparkQA commented Dec 14, 2016

Test build #70137 has finished for PR 16252 at commit 0903da8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Dec 15, 2016

OK, do we have consensus to put the hasNext check back? before we ask @wangyum to change again

@viirya
Copy link
Member

viirya commented Dec 15, 2016

@srowen I am fine for that.

@srowen
Copy link
Member

srowen commented Dec 17, 2016

Thanks, so @wangyum I think you can restore that !unrolled.hasNext check.

@SparkQA
Copy link

SparkQA commented Dec 17, 2016

Test build #70308 has finished for PR 16252 at commit c13edce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 18, 2016

@srowen I have restored it.

asfgit pushed a commit that referenced this pull request Dec 18, 2016
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <[email protected]>

Closes #16252 from wangyum/SPARK-18827.

(cherry picked from commit 1e5c51f)
Signed-off-by: Sean Owen <[email protected]>
asfgit pushed a commit that referenced this pull request Dec 18, 2016
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <[email protected]>

Closes #16252 from wangyum/SPARK-18827.

(cherry picked from commit 1e5c51f)
Signed-off-by: Sean Owen <[email protected]>
@srowen
Copy link
Member

srowen commented Dec 18, 2016

merged to master/2.1/2.0

@asfgit asfgit closed this in 1e5c51f Dec 18, 2016
blendmaster pushed a commit to fullcontact/spark that referenced this pull request Jan 18, 2017
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since apache#15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <[email protected]>

Closes apache#16252 from wangyum/SPARK-18827.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since apache#15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <[email protected]>

Closes apache#16252 from wangyum/SPARK-18827.
@wangyum wangyum deleted the SPARK-18827 branch June 28, 2020 08:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants