-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-30556][SQL] Copy sparkContext.localproperties to child thread inSubqueryExec.executionContext #27267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a229ecb to
5042156
Compare
|
ok to test |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For WIP PR, please add [WIP] into the PR title.
How was this patch tested?
WIP
|
Closely related to #27266? |
Currently, |
|
Test build #116960 has finished for PR 27267 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
|
IMO the thread pools should not be a big issue. The subquery is guaranteed to be executed on a different thread (you can even add an assert for this). You just set some unique property on the local properties (value should also be unique), construct the something that contains a broadcast join, use an accumulator that you modify using either a UDF (easy) or a dataset operation. |
Agree. But with a pool size of 16, i will have to ensure all 16 threads are used at-least once and are alive to reproduce this issue and not to make that test flaky. Its lot easier if i can set the pool size to 1 in test to reproduce. |
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
fea9160 to
577904f
Compare
|
@ajithme why do we need to make sure all 16 threads are used? |
|
I think this is a common problem when we run code in a thread pool. I just realized that we hit a similar issue before with Shall we add a util function to easily capture the necessary thread locals and propagate them to thread pool? For example, we can add a |
sure @cloud-fan , i think similar approach was proposed by @hvanhovell , i will update my PR to introduce a utility than having a local fix. |
if not, the thread executing the subquery may be a new one ( created from pool,) thus inheriting the |
|
cc @cloud-fan @hvanhovell i have updated PR as per the suggestion. Please review |
|
This seems to be problems what i try to fix in
so should i try to fix all in one PR or should i have a separate PR for each.? i previously raised them separately so that i can complete them with individual UTs. Please suggest @cloud-fan @srowen @dongjoon-hyun @hvanhovell, i am kinda neutral if to make separate PRs or single PR |
|
I think we can fix one place in this PR, and send more PRs to fix more places later, with tests. |
|
Test build #117171 has finished for PR 27267 at commit
|
|
Test build #117173 has finished for PR 27267 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
Outdated
Show resolved
Hide resolved
|
@dongjoon-hyun Updated with fix |
|
Test build #117252 has finished for PR 27267 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master.
|
Could you make a backporting PR against branch-2.4, @ajithme ? |
|
Thanks @dongjoon-hyun @cloud-fan @hvanhovell @dongjoon-hyun sure, i will make a backport PR for this over 2.4 |
| .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") | ||
| .createWithDefault(128) | ||
|
|
||
| val SUBQUERY_MAX_THREAD_THRESHOLD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a different change right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should also be a static configuration since we can only change it at startup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
This is just opening as a configuration to make the change testable. DO you want me to raise separate PR just to make this configuration change seperate.?
-
This is part of
StaticSQLConfwhich is defined at startup, is there any other mechanism to define static conf.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its a static conf, then why isn't your unite test failing? Moreover, if its static then setting it in your test probably does not have any effect because we use the same JVM/SparkContext to run most tests, the chances are pretty high that it has been set before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see. as per documentation in
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
Line 26 in 705fc6a
| * Static SQL configuration is a cross-session, immutable Spark configuration. External users can |
BroadcastExchangeExec creates executionContext. My initial guess is executionContext is not created till first subquery hence it work for UT. i will further investigate and get back with analysis.
| } | ||
|
|
||
| // set local configuration and assert | ||
| val confValue1 = "e" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it is better to use something unique here to avoid a fluke. How about using UUID.randomUUID().toString()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am neutral about both approaches. Just wanted to have a fixed input value to get predictable output value from testcase. I can update this and raise followup if you insist
|
@ajithme can you address my comments in a follow up? |
|
@ajithme Can you submit a PR to address the comments? |
|
@xuanyuanking Could you submit a PR to address the comments? |
|
https://github.com/apache/spark/pull/27267/files#r370089158 is the major comment we need to address. |
| .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") | ||
| .createWithDefault(128) | ||
|
|
||
| val SUBQUERY_MAX_THREAD_THRESHOLD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should submit a separate PR for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could keep this in StaticSQLConf.
- I locally test the static config takes effect, it can only be set while startup.
- The UT can pass because it was used in lazy val
SubqueryExec.relationFutureon the executor side, so thewithSQLConfin UT could set the config before executor start.
cc @hvanhovell
@gatorsmile @xuanyuanking Sure, i will submit a follow up PR shortly. |
|
@ajithme Sorry, I nearly submit a follow-up PR, could you help on reviewing? Thanks. |
….withThreadLocalCaptured ### What changes were proposed in this pull request? Follow up for #27267, reset the status changed in SQLExecution.withThreadLocalCaptured. ### Why are the changes needed? For code safety. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes #27516 from xuanyuanking/SPARK-30556-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: herman <[email protected]>
….withThreadLocalCaptured ### What changes were proposed in this pull request? Follow up for #27267, reset the status changed in SQLExecution.withThreadLocalCaptured. ### Why are the changes needed? For code safety. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes #27516 from xuanyuanking/SPARK-30556-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: herman <[email protected]> (cherry picked from commit a6b91d2) Signed-off-by: herman <[email protected]>
| val localProps = Utils.cloneProperties(sc.getLocalProperties) | ||
| Future { | ||
| SparkSession.setActiveSession(activeSession) | ||
| sc.setLocalProperties(localProps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell two questions:
- Shouldn't we clone
localPropshere? in the sense that what if a concurrent thread modify them? - Does the order of setting
localPropsandactiveSessionmatter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the localProps is already a clone: https://github.com/apache/spark/pull/27267/files#diff-ab49028253e599e6e74cc4f4dcb2e3a8R178
And I think the order doesn't matter.
….withThreadLocalCaptured Follow up for apache#27267, reset the status changed in SQLExecution.withThreadLocalCaptured. For code safety. No. Existing UT. Closes apache#27516 from xuanyuanking/SPARK-30556-follow. (cherry picked from commit a6b91d2)
…tion withThreadLocalCaptured ### What changes were proposed in this pull request? Follow up for #27267, reset the status changed in SQLExecution.withThreadLocalCaptured. ### Why are the changes needed? For code safety. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. (cherry picked from commit a6b91d2) Closes #27633 from xuanyuanking/SPARK-30556-backport. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
….withThreadLocalCaptured ### What changes were proposed in this pull request? Follow up for apache#27267, reset the status changed in SQLExecution.withThreadLocalCaptured. ### Why are the changes needed? For code safety. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes apache#27516 from xuanyuanking/SPARK-30556-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: herman <[email protected]>
What changes were proposed in this pull request?
In
org.apache.spark.sql.execution.SubqueryExec#relationFuturemake a copy oforg.apache.spark.SparkContext#localPropertiesand pass it to the sub-execution thread inorg.apache.spark.sql.execution.SubqueryExec#executionContextWhy are the changes needed?
Local properties set via sparkContext are not available as TaskContext properties when executing jobs and threadpools have idle threads which are reused
Explanation:
When
SubqueryExec, the relationFuture is evaluated via a separate thread. The threads inherit thelocalPropertiesfromsparkContextas they are the child threads.These threads are created in the
executionContext(thread pools). Each Thread pool has a default keepAliveSeconds of 60 seconds for idle threads.Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via
sparkContext.runJob/submitJobDoes this PR introduce any user-facing change?
No
How was this patch tested?
Added UT