-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26665][Core]Fix a bug that BlockTransferService.fetchBlockSync may hang forever #23590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #101413 has finished for PR 23590 at commit
|
|
Retest this please. |
|
Test build #101427 has finished for PR 23590 at commit
|
|
Retest this please. |
|
Test build #101437 has finished for PR 23590 at commit
|
| ret.flip() | ||
| result.success(new NioManagedBuffer(ret)) | ||
| } catch { | ||
| case e: Throwable => result.failure(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to catching and ignoring Throwable (after result.failures) is to use a boolean flag to indicate if result.success was signalled - and if not, invoke result.failure in finally.
var success = false
try {
...
result.success(...)
success = true
} finally {
if (! success) result.failure(e)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to get the e, I think the catch is necessary here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - it is required for the failure arg.
But I dont like the throwable being ignored in the catch once failure is signalled.
For example, interruption, OOM, thread death, etc - we should rethrow it once promise is updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not ignored, the exception will be rethrown in ThreadUtils.awaitResult at the end of fetchBlockSync. Maybe we should not use Promise, and do it manually to be more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is a common anti pattern in spark; the issue is basically thread/pool specific exceptions start leaking to other threads.
Though my basic concern was regarding things like OOM, and they do get propagated - so I guess we can live with this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Promise is the most convenient way to turn an async listener to a Scala Future in order to call blocking APIs provided by Future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's propagated and the code calling fetchBlockSync will handle it.
| listener: BlockFetchingListener, | ||
| tempFileManager: DownloadFileManager): Unit = { | ||
| // Notify BlockFetchingListener with a bad ManagedBuffer asynchronously | ||
| new Thread() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we create a thread here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, BlockFetchingListener will be called in the same thread and fail at once before Thread.awaitResult is called. Then the test won't be able to reproduce the issue.
|
good catch! LGTM except a comment for the test. |
|
Thanks! Merging to master and 2.4. |
…c may hang forever ## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes #23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]> (cherry picked from commit 66450bb) Signed-off-by: Shixiong Zhu <[email protected]>
|
Hi, @zsxwing , @cloud-fan , @mridulm . |
|
@dongjoon-hyun I think SPARK-22062 just moved codes. This is a long standing issue. |
|
Oh, thank you for the info. Then, can we have this to |
…c may hang forever ## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes #23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
|
@dongjoon-hyun Sure. This is a pretty safe change. I just pushed to branch-2.3. |
|
Great! Thank you so much, @zsxwing . |
…c may hang forever ## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes apache#23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…checked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
…checked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
…checked exception missed ## What changes were proposed in this pull request? This is very like apache#23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. ## How was this patch tested? I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes apache#24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…c may hang forever ## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes apache#23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]> (cherry picked from commit 66450bb) Signed-off-by: Shixiong Zhu <[email protected]>
…c may hang forever ## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes apache#23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]> (cherry picked from commit 66450bb) Signed-off-by: Shixiong Zhu <[email protected]>
…checked exception missed This is very like apache#23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes apache#24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
…checked exception missed This is very like apache#23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes apache#24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin <[email protected]> Co-authored-by: lajin <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 0e42100) Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
ByteBuffer.allocatemay throwOutOfMemoryErrorwhen the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as itsBlockFetchingListener. onBlockFetchSuccessdoesn't completePromise.This PR catches
Throwableand uses the error to completePromise.How was this patch tested?
Added a unit test. Since I cannot make
ByteBuffer.allocatethrowOutOfMemoryError, I passed a negative size to makeByteBuffer.allocatefail. Although the error type is different, it should trigger the same code path.