-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22590][SQL] Copy sparkContext.localproperties to child thread in BroadcastExchangeExec.executionContext #27266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for pinging me, @ajithme . |
|
ok to test |
|
Closely related to #27267 ? |
|
Test build #116962 has finished for PR 27266 at commit
|
8c9fbe6 to
f850797
Compare
|
Test build #117305 has finished for PR 27266 at commit
|
|
Test build #117310 has finished for PR 27266 at commit
|
|
this failure seems unrelated. Please retest this |
|
gentle ping cc @dongjoon-hyun @cloud-fan @hvanhovell |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we make this also produce a (java) Future? If you do this then we can unify this method with the other withThreadLocalCaptured method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see previously it was using a Future which was moved to Callable in #24595
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to be cancellable. So we are using java futures instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand, will update to use java Futures instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell org.apache.spark.sql.execution.SubqueryExec#executeCollect needs now to await over java Future instead of scala future which would mean we have to have a new method under org.apache.spark.util.ThreadUtils#awaitResult, is this acceptable.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to have another awaitResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Test build #117438 has finished for PR 27266 at commit
|
|
Test build #117439 has finished for PR 27266 at commit
|
|
Test build #117441 has finished for PR 27266 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually don't like such changes. Wonder if we can keep the indentation same and make it easier to track the history of commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I have tried to adjust the indent so that diff seems to show only lines i have modified. Does it seem ok now.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indentation is wrong now although the diff is smaller. how about
private[sql] lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
SQLExecution.withThreadLocalCaptured ... {
doBroadcast(..)
}
}
private def doBroadcast ... {
// original code
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan updated as per your suggestion, but still it changes indent than original. Is it ok.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry I misread the code. Seems we can't avoid changing the indentation as it was so nested before. I'm OK with your original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
|
@ajithme are you still working on it? I'd like to get this in before 3.0 release and take over this one if you don't have time to complete it. |
Yes. I will update the PR shortly. Sorry for delay |
5eb5a96 to
0f39043
Compare
|
@cloud-fan @hvanhovell @HyukjinKwon Updated as per all the comments, please review |
|
Test build #118583 has finished for PR 27266 at commit
|
|
Test build #118581 has finished for PR 27266 at commit
|
|
Test build #118585 has finished for PR 27266 at commit
|
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - pending jenkins
|
Test build #118586 has finished for PR 27266 at commit
|
|
thanks, merging to master/3.0! |
…in BroadcastExchangeExec.executionContext ### What changes were proposed in this pull request? In `org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#relationFuture` make a copy of `org.apache.spark.SparkContext#localProperties` and pass it to the broadcast execution thread in `org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#executionContext` ### Why are the changes needed? When executing `BroadcastExchangeExec`, the relationFuture is evaluated via a separate thread. The threads inherit the `localProperties` from `sparkContext` as they are the child threads. These threads are created in the executionContext (thread pools). Each Thread pool has a default `keepAliveSeconds` of 60 seconds for idle threads. Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via `sparkContext.runJob/submitJob` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT Closes #27266 from ajithme/broadcastlocalprop. Authored-by: Ajith <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2854091) Signed-off-by: Wenchen Fan <[email protected]>
|
Test build #118582 has finished for PR 27266 at commit
|
|
+1 from me too. Thanks @ajithme. |
…in BroadcastExchangeExec.executionContext ### What changes were proposed in this pull request? In `org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#relationFuture` make a copy of `org.apache.spark.SparkContext#localProperties` and pass it to the broadcast execution thread in `org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#executionContext` ### Why are the changes needed? When executing `BroadcastExchangeExec`, the relationFuture is evaluated via a separate thread. The threads inherit the `localProperties` from `sparkContext` as they are the child threads. These threads are created in the executionContext (thread pools). Each Thread pool has a default `keepAliveSeconds` of 60 seconds for idle threads. Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via `sparkContext.runJob/submitJob` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT Closes apache#27266 from ajithme/broadcastlocalprop. Authored-by: Ajith <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec#relationFuturemake a copy oforg.apache.spark.SparkContext#localPropertiesand pass it to the broadcast execution thread inorg.apache.spark.sql.execution.exchange.BroadcastExchangeExec#executionContextWhy are the changes needed?
When executing
BroadcastExchangeExec, the relationFuture is evaluated via a separate thread. The threads inherit thelocalPropertiesfromsparkContextas they are the child threads.These threads are created in the executionContext (thread pools). Each Thread pool has a default
keepAliveSecondsof 60 seconds for idle threads.Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via
sparkContext.runJob/submitJobDoes this PR introduce any user-facing change?
No
How was this patch tested?
Added UT