-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21274][SQL] Add a new generator function replicate_rows to support EXCEPT ALL and INTERSECT ALL #21240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…EXCEPT ALL and INTERSECT ALL
|
Why we need this? I thinks it's ok to add a new rewriting rule for |
|
Also, ISTM this function is less useful for end users. |
|
https://issues.apache.org/jira/browse/HIVE-14768 did the same thing. |
|
Ah, ok. |
|
Test build #90219 has finished for PR 21240 at commit
|
| Row(1, null) :: Row(2, null) :: Nil) | ||
| } | ||
|
|
||
| test("ReplicateRows generator") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate tests? I feel udtf_replicate_rows.sql is enough for tests.
| ) | ||
| } | ||
|
|
||
| test("type coercion for ReplicateRows") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this tests into sql-tests/inputs/typeCoercion/native?
| * if necessary. | ||
| */ | ||
| object ReplicateRowsCoercion extends TypeCoercionRule { | ||
| private val acceptedTypes = Seq(LongType, IntegerType, ShortType, ByteType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: LongType seems not necessary be here. Can avoid re-entering the following pattern matching if it is already long type.
| import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not need to introduce this breaking line.
| } | ||
|
|
||
| /** | ||
| * Replicate the row based N times. N is specified as the first argument to the function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Replicate N times the row.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, using n to match following expression description?
| Row(3, "row1") :: Row(3, "row1") :: Row(3, "row1") :: Nil) | ||
| checkAnswer(df.selectExpr("replicate_rows(-1, 2.5)"), Nil) | ||
|
|
||
| // The data for the same column should have the same type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This copied comment can be removed.
| * }}} | ||
| */ | ||
| @ExpressionDescription( | ||
| usage = "_FUNC_(n, expr1, ..., exprk) - Replicates `expr1`, ..., `exprk` into `n` rows.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replicates `n`, `expr1`, ..., `exprk` into `n` rows.?
| private val acceptedTypes = Seq(IntegerType, ShortType, ByteType) | ||
| override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
| case s @ ReplicateRows(children) if s.childrenResolved && | ||
| s.children.head.dataType != LongType && acceptedTypes.contains(s.children.head.dataType) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check if s.children isn't empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| AS tab1(c1, c2, c3); | ||
|
|
||
| -- Requires 2 arguments at minimum. | ||
| SELECT replicate_rows(c1) FROM tab1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add one case SELECT replicate_rows()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| * }}} | ||
| */ | ||
| @ExpressionDescription( | ||
| usage = "_FUNC_(n, expr1, ..., exprk) - Replicates `n`, `expr1`, ..., `exprk` into `n` rows.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the design doc for INTERSECT ALL and EXCEPT ALL. Looks like the n is always stripped and useless after Generate operation. So why we need to keep n in ReplicateRows outputs? Can we do it like:
> SELECT _FUNC_(2, "val1", "val2");
val1 val2
val1 val2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya I did think about it Simon. But then, i decided to match the output with Hive.
|
Test build #90262 has finished for PR 21240 at commit
|
| private val acceptedTypes = Seq(IntegerType, ShortType, ByteType) | ||
| override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
| case s @ ReplicateRows(children) if s.children.nonEmpty && s.childrenResolved && | ||
| s.children.head.dataType != LongType && acceptedTypes.contains(s.children.head.dataType) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s.children.head.dataType != LongType is redundant because we have acceptedTypes.contains(...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thanks. I will fix.
| 2 val1 val2 | ||
| 2 val1 val2 | ||
| """) | ||
| case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be easily implemented in codegen so we don't need CodegenFallback. We can deal with it in follow-up if you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya If you don't mind, i would like to do it in a follow-up.
|
This generator function implementation itself LGTM. I have other thoughts regarding the rewrite rule but it's better to discuss on JIRA. cc @cloud-fan @kiszk |
|
Test build #90266 has finished for PR 21240 at commit
|
|
Test build #90265 has finished for PR 21240 at commit
|
|
retest this please |
|
Test build #90267 has finished for PR 21240 at commit
|
|
|
||
| override def eval(input: InternalRow): TraversableOnce[InternalRow] = { | ||
| val numRows = children.head.eval(input).asInstanceOf[Long] | ||
| val values = children.map(_.eval(input)).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
children.head seems getting evaluated twice here, can we avoid it?
|
Test build #90287 has finished for PR 21240 at commit
|
|
retest this please. |
|
Test build #90295 has finished for PR 21240 at commit
|
|
retest this please. |
|
Test build #90303 has finished for PR 21240 at commit
|
|
retest this please. |
|
Test build #90305 has finished for PR 21240 at commit
|
| object ReplicateRowsCoercion extends TypeCoercionRule { | ||
| private val acceptedTypes = Seq(IntegerType, ShortType, ByteType) | ||
| override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
| case s @ ReplicateRows(children) if s.children.nonEmpty && s.childrenResolved && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
children is not used. How about this?
case s @ ReplicateRows(children) if children.nonEmpty && s.childrenResolved &&
acceptedTypes.contains(children.head.dataType) =>
ReplicateRows(Cast(children.head, LongType) +: children.tail)
| if (numColumns < 2) { | ||
| TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 arguments.") | ||
| } else if (children.head.dataType != LongType) { | ||
| TypeCheckResult.TypeCheckFailure("The number of rows must be a positive long value.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this message? The first argument type must be byte, short, int, or long, but ${children.head.dataType} found. BTW, it seems we don't reject negative values? (The current message says the number must be positive though...?)
| -- Requires 2 arguments at minimum. | ||
| SELECT replicate_rows(c1) FROM tab1; | ||
|
|
||
| -- First argument should be a numeric type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think numeric generally includes float and double, too. integral type?
| (1, 'row1', 1.1), | ||
| (2, 'row2', 2.2), | ||
| (0, 'row3', 3.3), | ||
| (-1,'row4', 4.4), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behaviour of the negative value case is the same with the hive one?
|
Like what @maropu commented at the beginning, |
|
ping @dilipbiswal for an update. |
What changes were proposed in this pull request?
Add a new UDTF replicate_rows. This function replicates the values based on the first argument to the function. This will be used in EXCEPT ALL AND INTERSECT ALL transformation (future PR) mainly
to preserve "retain duplicates" semantics. Please refer to Link for design. The transformation code changes are in Code
Example
Result
Returns 3 rows based on the first parameter value.
How was this patch tested?
Added tests in GeneratorFunctionSuite, TypeCoercionSuite, SQLQueryTestSuite