-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25084][SQL] "distribute by" on multiple columns (wrap in brackets) may lead to codegen issue #22066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25084][SQL] "distribute by" on multiple columns (wrap in brackets) may lead to codegen issue #22066
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -404,21 +404,26 @@ abstract class HashExpression[E] extends Expression { | |
| input: String, | ||
| result: String, | ||
| fields: Array[StructField]): String = { | ||
| val tmpInput = ctx.freshName("input") | ||
| val fieldsHash = fields.zipWithIndex.map { case (field, index) => | ||
| nullSafeElementHash(input, index.toString, field.nullable, field.dataType, result, ctx) | ||
| nullSafeElementHash(tmpInput, index.toString, field.nullable, field.dataType, result, ctx) | ||
| } | ||
| val hashResultType = CodeGenerator.javaType(dataType) | ||
| ctx.splitExpressions( | ||
| val code = ctx.splitExpressions( | ||
| expressions = fieldsHash, | ||
| funcName = "computeHashForStruct", | ||
| arguments = Seq("InternalRow" -> input, hashResultType -> result), | ||
| arguments = Seq("InternalRow" -> tmpInput, hashResultType -> result), | ||
| returnType = hashResultType, | ||
| makeSplitFunction = body => | ||
| s""" | ||
| |$body | ||
| |return $result; | ||
| """.stripMargin, | ||
| foldFunctions = _.map(funcCall => s"$result = $funcCall;").mkString("\n")) | ||
| s""" | ||
| |final InternalRow $tmpInput = $input; | ||
| |$code | ||
| """.stripMargin | ||
| } | ||
|
|
||
| @tailrec | ||
|
|
@@ -778,21 +783,22 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { | |
| input: String, | ||
| result: String, | ||
| fields: Array[StructField]): String = { | ||
| val tmpInput = ctx.freshName("input") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like to Then run tests: |
||
| val childResult = ctx.freshName("childResult") | ||
| val fieldsHash = fields.zipWithIndex.map { case (field, index) => | ||
| val computeFieldHash = nullSafeElementHash( | ||
| input, index.toString, field.nullable, field.dataType, childResult, ctx) | ||
| tmpInput, index.toString, field.nullable, field.dataType, childResult, ctx) | ||
| s""" | ||
| |$childResult = 0; | ||
| |$computeFieldHash | ||
| |$result = (31 * $result) + $childResult; | ||
| """.stripMargin | ||
| } | ||
|
|
||
| s"${CodeGenerator.JAVA_INT} $childResult = 0;\n" + ctx.splitExpressions( | ||
| val code = ctx.splitExpressions( | ||
| expressions = fieldsHash, | ||
| funcName = "computeHashForStruct", | ||
| arguments = Seq("InternalRow" -> input, CodeGenerator.JAVA_INT -> result), | ||
| arguments = Seq("InternalRow" -> tmpInput, CodeGenerator.JAVA_INT -> result), | ||
| returnType = CodeGenerator.JAVA_INT, | ||
| makeSplitFunction = body => | ||
| s""" | ||
|
|
@@ -801,6 +807,11 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { | |
| |return $result; | ||
| """.stripMargin, | ||
| foldFunctions = _.map(funcCall => s"$result = $funcCall;").mkString("\n")) | ||
| s""" | ||
| |final InternalRow $tmpInput = $input; | ||
| |${CodeGenerator.JAVA_INT} $childResult = 0; | ||
| |$code | ||
| """.stripMargin | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2831,4 +2831,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-25084: 'distribute by' on multiple columns may lead to codegen issue") { | ||
| withView("spark_25084") { | ||
| val count = 1000 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we can probably inline in the next line
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to inline? We still use it in the assert.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, disregard the comment, thanks |
||
| val df = spark.range(count) | ||
| val columns = (0 until 400).map{ i => s"id as id$i" } | ||
| val distributeExprs = (0 until 100).map(c => s"id$c").mkString(",") | ||
| df.selectExpr(columns : _*).createTempView("spark_25084") | ||
| assert( | ||
| spark.sql(s"select * from spark_25084 distribute by ($distributeExprs)").count === count) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: we can avoid creating a new variable if the
inputis already a variable. I think this can be done after we fully adopt the new codegen infra from @viiryaThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, very agree, we can improve this in the future.