-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener #28544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener #28544
Conversation
…2Listener ### What changes were proposed in this pull request? The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log. Also, in HiveSessionImpl.close(), we catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed. ### Why are the changes needed? The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this hampers with the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException. In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests Closes apache#28155 from alismess-db/hive-thriftserver-listener-update-safer. Authored-by: Ali Smesseim <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit 6994c64) Signed-off-by: Ali Smesseim <[email protected]>
|
ok to test |
|
ok to test |
|
add to whitelist |
|
Test build #122680 has finished for PR 28544 at commit
|
|
Retest this please |
|
Test build #122763 has finished for PR 28544 at commit
|
| } | ||
| private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = | ||
| Option(sessionList.get(e.sessionId)) match { | ||
| case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move the None pattern into the place after the Some pattern?
| } | ||
| private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = | ||
| Option(sessionList.get(e.sessionId)) match { | ||
| case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep processing queries even in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sessionList.get(e.sessionId).totalExecution += 1 would result in an NPE if sessionList did not have the sessionId, so it was not possible previously to continue processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu this code is in a listener that executes on the listener bus. The listener bus can drop events if it becomes to busy.
So it might happen that events are dropped, and id is unavailable. It will not affect query execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu this code is in a listener that executes on the listener bus. The listener bus can drop events if it becomes to busy.
So it might happen that events are dropped, and id is unavailable. It will not affect query execution.
Does a server get back to a normal state by just ignoring the case? What I a bit worry about is that users can get the same warning message repeatedly if a session id gets lost (I'm not sure the case can happen in event sequences).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu the server does get to a normal state. This is used only for displaying SparkUI, so wouldn't affect the server functioning.
Before this change, a NullPointerException would be thrown on sessionList.get(e.sessionId).totalExecution += 1, so I think a WARN message is strictly better than a NPE in the logs.
But good catch that it could be further improved - don't fail on missing session, just log the query with that sessionId, and just don't do sessionList.get(e.sessionId).totalExecution += 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Thanks for the explanation. Looks okay.
|
@dongjoon-hyun this PR passes "SPARK-29604 external listeners should be initialized with Spark classloader" in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122763/testReport that was failing before in #28155. I have no idea how this change could have been related to that failure... |
|
Test build #122800 has finished for PR 28544 at commit
|
|
Retest this please |
|
Test build #122815 has finished for PR 28544 at commit
|
|
It seems to be recovered finally. I'll trigger once more. |
|
Retest this please. |
|
|
||
| test("SPARK-31387 - listener update methods should not throw exception with unknown input") { | ||
| val (statusStore: HiveThriftServer2AppStatusStore, | ||
| listener: HiveThriftServer2Listener) = createAppStatusStore(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation?
| FileUtils.forceDelete(file); | ||
| } catch (Exception e) { | ||
| LOG.error("Failed to cleanup pipeout file: " + file, e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new from the original commit. If you don't mind, could you add some explanation about this into the PR description, @alismess-db ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We found it failing with a NullPointerException in some of the testing environments that we use at. It was related to how HiveSessionImplSuite mocked the session it created. We found it always a non-harmful check to check for NPE when searching for these pipeout files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the explanation, @juliuszsompolski .
|
Retest this please. |
|
Test build #122861 has finished for PR 28544 at commit
|
|
Test build #122858 has finished for PR 28544 at commit
|
|
Test build #122892 has finished for PR 28544 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, all!
Merged to master/3.0.
…erver2Listener ### What changes were proposed in this pull request? This is a recreation of #28155, which was reverted due to causing test failures. The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the `sessionList` and `executionList` respectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log. To improve robustness, we also make the following changes in HiveSessionImpl.close(): - Catch any exception thrown by `operationManager.closeOperation`. If for any reason this throws an exception, other operations are not prevented from being closed. - Handle not being able to access the scratch directory. When closing, all `.pipeout` files are removed from the scratch directory, which would have resulted in an NPE if the directory does not exist. ### Why are the changes needed? The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this changes the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException. In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests Closes #28544 from alismess-db/hive-thriftserver-listener-update-safer-2. Authored-by: Ali Smesseim <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit d40ecfa) Signed-off-by: Dongjoon Hyun <[email protected]>
|
Ur, @juliuszsompolski . In #28544 (comment), did you mean this PR is still failing in some environment?
|
|
It seems that this causes consistent failures in SBT in branch-3.0 with Hive 2.7. This is the opposite situation. Sorry again. My bad. I had a wrong assumption that this PR has no problem in SBT because it passed SBT before. For now, the new failure is only at 3.0. To stabilize branch-3.0, I'll revert this from |
No, with this PR we didn't see any failures. With the previous PR #28155, we experienced an NPE on these pipeopt filess in some environments specific to Databricks. We added this null checking for pipeout files as an additional safety check there. It runs for me, and in all testing envs at Databricks. If the change causes trouble in branch-3.0, let it stay in master only... |
What changes were proposed in this pull request?
This is a recreation of #28155, which was reverted due to causing test failures.
The update methods in HiveThriftServer2Listener now check if the parameter operation/session ID actually exist in the
sessionListandexecutionListrespectively. This prevents NullPointerExceptions if the operation or session ID is unknown. Instead, a warning is written to the log.To improve robustness, we also make the following changes in HiveSessionImpl.close():
operationManager.closeOperation. If for any reason this throws an exception, other operations are not prevented from being closed..pipeoutfiles are removed from the scratch directory, which would have resulted in an NPE if the directory does not exist.Why are the changes needed?
The listener's update methods would throw an exception if the operation or session ID is unknown. In Spark 2, where the listener is called directly, this changes the caller's control flow. In Spark 3, the exception is caught by the ListenerBus but results in an uninformative NullPointerException.
In HiveSessionImpl.close(), if an exception is thrown when closing an operation, all following operations are not closed.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests