-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-2670] FetchFailedException should be thrown when local fetch has failed #1578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e310c0b
460dc01
a3a9be1
b7b8250
4fca130
5d05855
03bcb02
d353984
e8713cc
85c8938
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -200,14 +200,17 @@ object BlockFetcherIterator { | |
| // these all at once because they will just memory-map some files, so they won't consume | ||
| // any memory that might exceed our maxBytesInFlight | ||
| for (id <- localBlocksToFetch) { | ||
| getLocalFromDisk(id, serializer) match { | ||
| case Some(iter) => { | ||
| // Pass 0 as size since it's not in flight | ||
| results.put(new FetchResult(id, 0, () => iter)) | ||
| logDebug("Got local block " + id) | ||
| } | ||
| case None => { | ||
| throw new BlockException(id, "Could not get block " + id + " from local machine") | ||
| try { | ||
| // getLocalFromDisk never return None but throws BlockException | ||
| val iter = getLocalFromDisk(id, serializer).get | ||
| // Pass 0 as size since it's not in flight | ||
| results.put(new FetchResult(id, 0, () => iter)) | ||
| logDebug("Got local block " + id) | ||
| } catch { | ||
| case e: Exception => { | ||
| logError(s"Error occurred while fetching local blocks", e) | ||
| results.put(new FetchResult(id, -1, null)) | ||
| return | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we throw an exception above and then immediately catch it, instead of doing results.put above? Is there any other kind of error that can happen beyond getLocalFromDisk returning None? Also, the current code seems to forget the exception: it just puts in a failed result. Is this intentional, i.e. will get a FetchFailedException later? It seems we should return from this method ASAP if there's a problem.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, getLocalFromDisk never return None but can throw BlockException. so I think "case None" block above is useless and we should remove the "case None" block rather than doing results.put.
Yes, BlockException is thrown from getLocalFromDisk, and FileNotFoundException from DiskStore#getBytes when it failed to fetch shuffle____ from local disk.
It's for get FetchFailedException later. If we return from BasicBlockFetchIterator#getLocallocks, we can't know whether rest of blocks can be read successfully or not. |
||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.storage | ||
|
|
||
| import org.scalatest.{FunSuite, Matchers} | ||
| import org.scalatest.PrivateMethodTester._ | ||
|
|
||
| import org.mockito.Mockito._ | ||
| import org.mockito.Matchers.{any, eq => meq} | ||
| import org.mockito.stubbing.Answer | ||
| import org.mockito.invocation.InvocationOnMock | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.storage.BlockFetcherIterator._ | ||
| import org.apache.spark.network.{ConnectionManager, ConnectionManagerId, | ||
| Message} | ||
|
|
||
| class BlockFetcherIteratorSuite extends FunSuite with Matchers { | ||
|
|
||
| test("block fetch from local fails using BasicBlockFetcherIterator") { | ||
| val blockManager = mock(classOf[BlockManager]) | ||
| val connManager = mock(classOf[ConnectionManager]) | ||
| doReturn(connManager).when(blockManager).connectionManager | ||
| doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId | ||
|
|
||
| doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight | ||
|
|
||
| val blIds = Array[BlockId]( | ||
| ShuffleBlockId(0,0,0), | ||
| ShuffleBlockId(0,1,0), | ||
| ShuffleBlockId(0,2,0), | ||
| ShuffleBlockId(0,3,0), | ||
| ShuffleBlockId(0,4,0)) | ||
|
|
||
| val optItr = mock(classOf[Option[Iterator[Any]]]) | ||
| val answer = new Answer[Option[Iterator[Any]]] { | ||
| override def answer(invocation: InvocationOnMock) = Option[Iterator[Any]] { | ||
| throw new Exception | ||
| } | ||
| } | ||
|
|
||
| // 3rd block is going to fail | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) | ||
| doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) | ||
|
|
||
| val bmId = BlockManagerId("test-client", "test-client",1 , 0) | ||
| val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( | ||
| (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq) | ||
| ) | ||
|
|
||
| val iterator = new BasicBlockFetcherIterator(blockManager, | ||
| blocksByAddress, null) | ||
|
|
||
| iterator.initialize() | ||
|
|
||
| // 3rd getLocalFromDisk invocation should be failed | ||
| verify(blockManager, times(3)).getLocalFromDisk(any(), any()) | ||
|
|
||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") | ||
| // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully | ||
| assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") | ||
| assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") | ||
| // 3rd fetch should be failed | ||
| assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements") | ||
| // Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator. | ||
| // Otherwise, BasicBlockFetcherIterator hangs up. | ||
| } | ||
|
|
||
|
|
||
| test("block fetch from local succeed using BasicBlockFetcherIterator") { | ||
| val blockManager = mock(classOf[BlockManager]) | ||
| val connManager = mock(classOf[ConnectionManager]) | ||
| doReturn(connManager).when(blockManager).connectionManager | ||
| doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId | ||
|
|
||
| doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight | ||
|
|
||
| val blIds = Array[BlockId]( | ||
| ShuffleBlockId(0,0,0), | ||
| ShuffleBlockId(0,1,0), | ||
| ShuffleBlockId(0,2,0), | ||
| ShuffleBlockId(0,3,0), | ||
| ShuffleBlockId(0,4,0)) | ||
|
|
||
| val optItr = mock(classOf[Option[Iterator[Any]]]) | ||
|
|
||
| // All blocks should be fetched successfully | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) | ||
| doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) | ||
|
|
||
| val bmId = BlockManagerId("test-client", "test-client",1 , 0) | ||
| val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( | ||
| (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq) | ||
| ) | ||
|
|
||
| val iterator = new BasicBlockFetcherIterator(blockManager, | ||
| blocksByAddress, null) | ||
|
|
||
| iterator.initialize() | ||
|
|
||
| // getLocalFromDis should be invoked for all of 5 blocks | ||
| verify(blockManager, times(5)).getLocalFromDisk(any(), any()) | ||
|
|
||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") | ||
| assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") | ||
| assert(iterator.next._2.isDefined, "All elements should be defined but 2nd element is not actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") | ||
| assert(iterator.next._2.isDefined, "All elements should be defined but 3rd element is not actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements") | ||
| assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined") | ||
| assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements") | ||
| assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment, @mateiz .
But, if it returns from getLocalBlocks immediately rest of FetchResults is not set to results, and we waits on results.take() in next method forever right? results is a instance of LinkedBlockingQueue and take method is blocking method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought next() would return a failure block, and then the caller of BlockFetcherIterator will just stop. Did you see it not doing that? I think all you have to do is put one FetchResult with size = -1 in the queue and return, and everything will be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought wrong. Exactly, in current usage of BlockFetcherIterator, next() is not invoked after FetchFailedException has been thrown.
I wonder it's a little bit problem that we can invoke next() after FetchFailedException even if there are no such usages in current implementation.
I think it's better to prohibit invoking next() after FetchFailedException to clearly express the correct usage of the method.