-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31450][SQL] Make ExpressionEncoder thread-safe #28223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Reviewers please first take a look at the |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
Outdated
Show resolved
Hide resolved
...talyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala
Outdated
Show resolved
Hide resolved
| * Function that deserializes an [[InternalRow]] into an object of type `T`. Instances of this | ||
| * class are not meant to be thread-safe. | ||
| */ | ||
| abstract class Deserializer[T] extends (InternalRow => T) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation note. I opted to go with abstract classes so we can get monomorphic call sites in many cases.
|
Test build #121320 has finished for PR 28223 at commit
|
|
Test build #121319 has finished for PR 28223 at commit
|
|
Test build #121313 has finished for PR 28223 at commit
|
|
Test build #121316 has finished for PR 28223 at commit
|
|
Test build #121327 has finished for PR 28223 at commit
|
| } catch { | ||
| case e: Exception => | ||
| throw new RuntimeException(s"Error while encoding: $e\n" + | ||
| def createSerializer(): Serializer[T] = new Serializer[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This currently relies on use serializing the enclosing encoder as well. We technically don't need the entire encoder but only a couple fields. I could move this class into the companion object and just use the fields I need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea that would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also thinking about if we just need to pass the original (de)serializer expressions and do the optimization inside Serializer and Deserializer lazily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some cost to the optimization. So I would like to do it only once.
|
Thank you, @hvanhovell . Do we need a benchmark about this change? |
|
@dongjoon-hyun it should be a bit of a lateral move performance wise. The expensive bit is generating the code and compiling it, and we are definitely not avoiding that. It might be a bit quicker because it does not excessively copy the expression encoder (which is not for free because of the work which is done in the constructor). What would you like to see benchmarked? |
|
Got it. Thank you for confirmation, @hvanhovell . I was just wondering. Your comment is enough for me. |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like in a good shape already.
| } catch { | ||
| case e: Exception => | ||
| throw new RuntimeException(s"Error while decoding: $e\n" + | ||
| s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As pointed already, some fields like deserializer are in enclosing encoder, and so currently looks like we will serialize entire encoder? Actually we did serialize entire encoder currently but yea it is better we can get rid of unnecessary.
|
|
||
| private def initialize(): Unit = { | ||
| inputRow = new GenericInternalRow(1) | ||
| extractProjection = GenerateUnsafeProjection.generate(optimizedSerializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't this two be lazy val? performance concerns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, lazy vals are not free and the extractProjection is on the hot path.
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
Show resolved
Hide resolved
|
cc also @JoshRosen and @zsxwing from #26076 |
| * == Implementation == | ||
| * - Encoders are not required to be thread-safe and thus they do not need to use locks to guard | ||
| * against concurrent access if they reuse internal buffers to improve performance. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this means Encoders must be thread-safe? Do we need explicit comment for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be thread-safe.
|
I feel like @JoshRosen looked at this too a while ago |
|
Test build #121361 has finished for PR 28223 at commit
|
|
Test build #121364 has finished for PR 28223 at commit
|
|
Test build #121376 has finished for PR 28223 at commit
|
|
Thank you, @hvanhovell and all. |
### What changes were proposed in this pull request? This PR moves the `ExpressionEncoder.toRow` and `ExpressionEncoder.fromRow` functions into their own function objects(`ExpressionEncoder.Serializer` & `ExpressionEncoder.Deserializer`). This effectively makes the `ExpressionEncoder` stateless, thread-safe and (more) reusable. The function objects are not thread safe, however they are documented as such and should be used in a more limited scope (making it easier to reason about thread safety). ### Why are the changes needed? ExpressionEncoders are not thread-safe. We had various (nasty) bugs because of this. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests. Closes #28223 from hvanhovell/SPARK-31450. Authored-by: herman <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit fab4ca5) Signed-off-by: Dongjoon Hyun <[email protected]>
|
Late LGTM. |
What changes were proposed in this pull request?
This PR moves the
ExpressionEncoder.toRowandExpressionEncoder.fromRowfunctions into their own function objects(ExpressionEncoder.Serializer&ExpressionEncoder.Deserializer). This effectively makes theExpressionEncoderstateless, thread-safe and (more) reusable. The function objects are not thread safe, however they are documented as such and should be used in a more limited scope (making it easier to reason about thread safety).Why are the changes needed?
ExpressionEncoders are not thread-safe. We had various (nasty) bugs because of this.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.