Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,8 @@ object BlockFetcherIterator {
// any memory that might exceed our maxBytesInFlight
for (id <- localBlocksToFetch) {
try {
// getLocalFromDisk never return None but throws BlockException
val iter = getLocalFromDisk(id, serializer).get
// Pass 0 as size since it's not in flight
readMetrics.localBlocksFetched += 1
results.put(new FetchResult(id, 0, () => iter))
results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
logDebug("Got local block " + id)
} catch {
case e: Exception => {
Expand Down
22 changes: 2 additions & 20 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1039,26 +1039,8 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()

def getIterator: Iterator[Any] = {
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator
}

if (blockId.isShuffle) {
/* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
* at the beginning. The wrapping will cost some memory (compression instance
* initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
* wrapping lazily to save memory. */
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
lazy val proxy = f
override def hasNext: Boolean = proxy.hasNext
override def next(): Any = proxy.next()
}
new LazyProxyIterator(getIterator)
} else {
getIterator
}
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator
}

def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {

iterator.initialize()

// 3rd getLocalFromDisk invocation should be failed
verify(blockManager, times(3)).getLocalFromDisk(any(), any())
// Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
verify(blockManager, times(0)).getLocalFromDisk(any(), any())

assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
// the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully
assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined")
assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined")
verify(blockManager, times(1)).getLocalFromDisk(any(), any())

assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined")
assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined")
verify(blockManager, times(2)).getLocalFromDisk(any(), any())

assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
// 3rd fetch should be failed
assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined")
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
// Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator.
// Otherwise, BasicBlockFetcherIterator hangs up.
intercept[Exception] {
iterator.next()
}
verify(blockManager, times(3)).getLocalFromDisk(any(), any())
}


Expand Down Expand Up @@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {

iterator.initialize()

// getLocalFromDis should be invoked for all of 5 blocks
verify(blockManager, times(5)).getLocalFromDisk(any(), any())
// Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
verify(blockManager, times(0)).getLocalFromDisk(any(), any())

assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined")
Expand All @@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined")
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")

verify(blockManager, times(5)).getLocalFromDisk(any(), any())
}

test("block fetch from remote fails using BasicBlockFetcherIterator") {
Expand Down