-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23581][SQL] Add interpreted unsafe projection #20750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * @param expressions that produces the resulting fields. These expressions must be bound | ||
| * to a schema. | ||
| */ | ||
| class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation takes a two step approach, first it evaluates the expressions and puts them in an intermediate row and it then converts this row to an UnsafeRow. We could also just create a converter from InternalRow to UnsafeRow and punt the projection work of to a InterpretedMutableProjection.
| checkEvalutionWithUnsafeProjection(expression, expected, inputRow, InterpretedUnsafeProjection) | ||
| } | ||
|
|
||
| protected def checkEvalutionWithUnsafeProjection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo in Evalu(a)tion
|
sorry for the basic question, but I don't see where it is added the switch between the evaluated and the generated version. Maybe I am just missing something since I am not an expert on how Spark switches between the execution modes, but I expected that in the |
|
Test build #88008 has finished for PR 20750 at commit
|
|
@mgaido91 you are right. It is on purpose. I do not like to introduce these things in one big bang (this makes it hard to review and might make it hard to merge). In this PR we just introduce the |
|
Test build #88006 has finished for PR 20750 at commit
|
# Conflicts: # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
|
Test build #88009 has finished for PR 20750 at commit
|
| /** | ||
| * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. | ||
| */ | ||
| protected def createProjection(exprs: Seq[Expression]): UnsafeProjection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems no place is calling this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that was pretty stupid. It is fixed now :)
|
Test build #88017 has finished for PR 20750 at commit
|
# Conflicts: # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
| writer.write(i, null.asInstanceOf[Decimal], precision, scale) | ||
| } | ||
| } | ||
| case (_, true) if dt.defaultSize == 1 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a little tricky to depend on the default size, can we match ByteType directly here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
|
Test build #88064 has finished for PR 20750 at commit
|
|
Test build #88062 has finished for PR 20750 at commit
|
| } | ||
|
|
||
| @Override | ||
| public void setNullByte(int ordinal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which is the reason why these methods have been introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods are needed for writing UnsafeArrayData, we fill the slot with 0s if we set it to null. The slot size in UnsafeArrayData is dependent on the data type we are storing in it.
I wanted to avoid writing a lot of duplicate code in the InterpretedUsafeProjection, and this is why I added these methods the UnsafeWriter parent class, and this also why they are in the UnsafeRowWriter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but I am not sure about having only some of these methods here. I mean, in UnsafeArrayData we have also setNullDouble, setNullFloat, etc. etc. It seems a bit weird to me that some of them are set at the parent level and some other no. It's not a big deal but I'd prefer for consistency to have all of them. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also name them differently e.g.: setNull1Byte, setNull2Bytes, setNull4Bytes & setNull8Bytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it is a good idea, because then everyone while writing code should know exactly how many bytes each type is. I prefer the current approach. I would rather either reintroduce the setNullAt method with a match in the UnsafeArrayData's implementation or let is as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty low level stuff, so you should know how many bytes things contain at this point. I'd rather leave as it is. Doing a type match on such a hot code path doesn't seem like a good idea.
|
Test build #88084 has finished for PR 20750 at commit
|
| } | ||
|
|
||
| public void setOffsetAndSize(int ordinal, long currentCursor, int size) { | ||
| public void setOffsetAndSize(int ordinal, long currentCursor, long size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we check if size fits in an integer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can safely change the signature to take two ints.
| * Base class for writing Unsafe* structures. | ||
| */ | ||
| public abstract class UnsafeWriter { | ||
| public abstract void setNullByte(int ordinal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty weird. At the first glance I'm wondering why we don't have setBoolean/Float/Double, then I realized we don't need to, because we just need a way to set null for 1/2/4/8 bytes.
maybe it's better to name them setNull1/2/4/8Bytes, and ask the UnsafeArrayWriter to follow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my previous discussion with @mgaido91. I am fine either way, I can also add the missing methods an be done with it, that will just make the interpreted code path a bit messier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel setNull1/2/4/8Bytes is better. It's also easy to codegen, just setNull${dt.defaultSize}Bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this was my mistake... I thought Platform.setInt(0) is different from Platform.setFloat(0.0f), and that's why I introduced a setNull method for each primitive type.
| override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { | ||
| // We need to make sure that we do not reuse stateful non deterministic expressions. | ||
| val cleanedExpressions = exprs.map(_.transform { | ||
| case s: StatefulNondeterministic => s.freshCopy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's not a problem for the codegen version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In codegen the state is put in the generated class, if you happen to visit the same expression twice the state is added twice and is not shared during evaluation. In interpreted mode the Expression will be the same, and the same state will modified twice during evaluation.
| * A stateful nondeterministic expression. These expressions contain state | ||
| * that is not stored in the parameter list. | ||
| */ | ||
| trait StatefulNondeterministic extends Nondeterministic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hive stateful and deterministic are orthogonal. If we wanna add this new trait, I think it's time to figure out the corrected semantic. Shall we have a new trait called Stateful, or add an assumption that stateful functions must be nondeterministic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from Hive doc
A stateful UDF is considered to be non-deterministic, irrespective of what deterministic() returns.
This is corrected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can just call it Stateful while still extending Nondeterministic, and in the doc we say that stateful expressions imply it's nondeterministic.
|
Test build #88243 has finished for PR 20750 at commit
|
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty good!
| /** | ||
| * Return a fresh uninitialized copy of the stateful expression. | ||
| */ | ||
| def freshCopy(): Stateful = this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to not provide this default implementation, to avoid mistakes in the future.
| */ | ||
| private def generateStructWriter( | ||
| bufferHolder: BufferHolder, | ||
| rowWriter: UnsafeRowWriter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add UnsafeWriter#getBufferHolder, so that we don't need to pass 2 parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we could do that. I think we need to refactor the writers a little bit anyway, but I would like to do that in a follow-up.
| if (!v.isNullAt(i)) { | ||
| unsafeWriter(v, i) | ||
| } else { | ||
| writer.setNull8Bytes(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null type will hit this branch, can we add a test to make sure it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah it's consistent with the codegen version. Maybe we should fix it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or just leave it, as array of null type doesn't make sense and maybe no one will do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a test that actually tests this for arrays: https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala#L80
|
Test build #88270 has finished for PR 20750 at commit
|
|
Test build #88312 has finished for PR 20750 at commit
|
|
thanks, merging to master! |
|
@cloud-fan has some issue with his mac, so I will be merging :)... Thanks for the reviews! |
## What changes were proposed in this pull request? We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query. This PR adds an interpreted version of `UnsafeProjection`. The implementation is modeled after `InterpretedMutableProjection`. It stores the expression results in a `GenericInternalRow`, and it then uses a conversion function to convert the `GenericInternalRow` into an `UnsafeRow`. This PR does not implement the actual code generated to interpreted fallback logic. This will be done in a follow-up. ## How was this patch tested? I am piggybacking on exiting `UnsafeProjection` tests, and I have added an interpreted version for each of these. Author: Herman van Hovell <[email protected]> Closes apache#20750 from hvanhovell/SPARK-23581.
We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query. This PR adds an interpreted version of `UnsafeProjection`. The implementation is modeled after `InterpretedMutableProjection`. It stores the expression results in a `GenericInternalRow`, and it then uses a conversion function to convert the `GenericInternalRow` into an `UnsafeRow`. This PR does not implement the actual code generated to interpreted fallback logic. This will be done in a follow-up. I am piggybacking on exiting `UnsafeProjection` tests, and I have added an interpreted version for each of these. Author: Herman van Hovell <[email protected]> Closes apache#20750 from hvanhovell/SPARK-23581. Ref: LIHADOOP-48531
What changes were proposed in this pull request?
We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query.
This PR adds an interpreted version of
UnsafeProjection. The implementation is modeled afterInterpretedMutableProjection. It stores the expression results in aGenericInternalRow, and it then uses a conversion function to convert theGenericInternalRowinto anUnsafeRow.This PR does not implement the actual code generated to interpreted fallback logic. This will be done in a follow-up.
How was this patch tested?
I am piggybacking on exiting
UnsafeProjectiontests, and I have added an interpreted version for each of these.