-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23736][SQL] Extending the concat function to support array columns #20858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23736][SQL] Extending the concat function to support array columns #20858
Conversation
…tenating multiple array columns into one.
| expression[MapValues]("map_values"), | ||
| expression[Size]("size"), | ||
| expression[SortArray]("sort_array"), | ||
| expression[ConcatArrays]("concat_arrays"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not reusing concat?
concat(array1, array2, ..., arrayN) -> array ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already played with this option in my mind, but I'm not sure how concat would be categorized.
Currently, concat is defined as a pure string operation:
/**
Whereas the functionality in this PR belongs rather to the collection_funcs group.
Having just one function for both expressions would be elegant, but can you advise what group should be assigned to concat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about move it to collection functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will merge the functions into one. Do you find having one expression class concatenation per the concatenation type ok?
I'm afraid if I incorporate all the logic into one expression class then the code will become messy since each codeGen and eveluation has a different nature.
|
Thinks for this work! One question; why do you think we need to support this api in Spark native? Other libraries support this as first-class? |
|
@maropu What other libraries do you mean? I'm not aware of any library providing this functionality on top Spark SQL. When using Spark SQL as an ETL tool for structured and nested data, people are forced to use UDFs for transforming arrays since current api for array columns is lacking. This approach brings several drawbacks:
So my colleagues and I decided to extend the current Spark SQL API with well-known collection functions like concat, flatten, zipWithIndex, etc. We don't want to keep this functionality just in our fork of Spark, but would like to share it with others. |
| * @param f a function that accepts a sequence of non-null evaluation result names of children | ||
| * and returns Java code to compute the output. | ||
| */ | ||
| protected def nullSafeCodeGen( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method looks almost the same with the one in BinaryExpression. Can you avoid the code duplication ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will combine it with concat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeichenXu123 I do agree that there are strong similarities in the code.
If you take a look at UniryExpression, BinaryExpression, TernaryExpression, you will see that methods responsible for null save evaluation and code generation are the same except the number of parameters. My intention has been to generalize the methods into the NullSaveEvaluation trait and remove the original methods in a different PR once the trait is in. I didn't want to create a big bang PR because of one additional function in API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's ok to discuss this in follow-up activities cuz this is less related to this pr. So, can you make this pr minimal as much as possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will try.
|
ok, I'll check later! |
| trait UserDefinedExpression | ||
|
|
||
| /** | ||
| * The trait covers logic for performing null save evaluation and code generation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: null safe.
| * override this. | ||
| */ | ||
| override def eval(input: InternalRow): Any = | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark usually use the style like:
override def eval(input: InternalRow): Any = {
val values = children.map(_.eval(input))
if (values.contains(null)) {
null
} else {
nullSafeEval(values)
}
}You could follow the style of other codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other places where the braces {} style doesn't follow Spark codes. We should keep the same code style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think I fixed all style differences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the style fix is missed here.
| */ | ||
| override def eval(input: InternalRow): Any = | ||
| { | ||
| val values = children.map(_.eval(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need to evaluate all children. Once any child expression is null, we can just return null.
|
|
||
| override def checkInputDataTypes(): TypeCheckResult = { | ||
| val arrayCheck = checkInputDataTypesAreArrays | ||
| if(arrayCheck.isFailure) arrayCheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style issue:
if (...) {
...
} else {
...
}| /** | ||
| * The trait covers logic for performing null save evaluation and code generation. | ||
| */ | ||
| trait NullSafeEvaluation extends Expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to bring in NullSafeEvaluation? If only ConcatArray uses it, we may not need to add this.
| $resultCode | ||
| """ /: children.zip(gens)) { | ||
| case (acc, (child, gen)) => | ||
| gen.code + ctx.nullSafeExec(child.nullable, gen.isNull)(acc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, for a binary expression, doesn't this generate code like:
rightGen.code + ctx.nullSafeExec(right.nullable, rightGen.isNull) {
leftGen.code + ctx.nullSafeExec(left.nullable, leftGen.isNull) {
${ev.isNull} = false; // resultCode could change nullability.
$resultCode
}
}Although for deterministic expressions, the evaluation order doesn't matter. But for non-deterministic, I'm little concerned that it may cause unexpected change.
| * Concatenates multiple arrays into one. | ||
| */ | ||
| @ExpressionDescription( | ||
| usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defines that the element types of the arrays must be the same.
| val primitiveValueTypeName = CodeGenerator.primitiveTypeName(elementType) | ||
| val assignments = elements.map { el => | ||
| s""" | ||
| |for(int z = 0; z < $el.numElements(); z++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stype: for (
| val assignments = elements.map { el => | ||
| s""" | ||
| |for(int z = 0; z < $el.numElements(); z++) { | ||
| | if($el.isNullAt(z)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: if ().
| |Object[] $arrayName = new Object[$numElemName]; | ||
| |int $counter = 0; | ||
| |$assignments | ||
| |$arrayDataName = new $genericArrayClass($arrayName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we concate complex elements into UnsafeArrayData?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, can we reuse the UnsafeArrayWriter logic for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really like this idea! I think it would require moving the complex type insertion logic from InterprettedUnsafeProjection directly to UnsafeDataWriter and introduce in that way write methods for complex type fields. I'm not sure whether this big refactoring task is still in the scope of this PR.
Also see that we could improve codeGen of CreateArray in the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You couldn't use UnsafeArrayData in the complex case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, currently there are no write methods on UnsafeArrayWriter or set methods on UnsafeArrayData that we could leverage for complex types. In theory, we could follow the same approach as in InterprettedUnsafeProjection and each complex type to a byte array and subsequently insert the produced byte array into the target UnsafeArrayData. Since this logic could be utilized from more places (e.g. CreateArray), it should be encapsulated into UnsafeArrayWriter or UnsafeArrayData at first. What do you think?
| * @group collection_funcs | ||
| * @since 2.4.0 | ||
| */ | ||
| def concat_arrays(columns: Column*): Column = withExpr { ConcatArrays(columns.map(_.expr)) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to add this func. in sql/functions here? It seems we might recommend users to use these kinds of functions via selectExpr, so is it okay to add this only in FunctionRegistry in terms of code simplicity and maintainablity? Thoughts? @viirya @gatorsmile
|
We should handle different (and compatible) typed arrays in this funs? Also, could you add more tests for this case in |
| > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6)); | ||
| [1,2,3,4,5,6] | ||
| """) | ||
| case class ConcatArrays(children: Seq[Expression]) extends Expression with NullSafeEvaluation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a common base class (e.g., ConcatLike) for handling nested ConcatArrays in the optimizer(CombineConcat)?
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
Line 649 in e4bec7c
| object CombineConcats extends Rule[LogicalPlan] { |
|
Also, |
| Examples: | ||
| > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6)); | ||
| [1,2,3,4,5,6] | ||
| """) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add since too?
...
[1,2,3,4,5,6]
""",
since = "2.4.0")
| else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), s"function $prettyName") | ||
| } | ||
|
|
||
| private def checkInputDataTypesAreArrays(): TypeCheckResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just put this in checkInputDataTypes?
| override def dataType: ArrayType = | ||
| children | ||
| .headOption.map(_.dataType.asInstanceOf[ArrayType]) | ||
| .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we allow empty children? I can't think of a use case for now and we should better disallow it first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely share your opinion, but I think we should be consistent across the whole Spark SQL API. Functions like concat and concat_ws accept empty children as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm .. but then this is array<null> when the children are empty. Seems CreateArray's type is array<string> in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, changing to return type array<string> when no children are provided. Also I've created the jira ticket SPARK-23798 since I don't see any reason why it couldn't return a default concrete type in this case. Hope I don't miss anything.
python/pyspark/sql/functions.py
Outdated
| Collection function: Concatenates multiple arrays into one. | ||
| :param cols: list of column names (string) or list of :class:`Column` expressions that have | ||
| the same data type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we note cols are expected to be array type?
|
Merged concat and concat_arrays functions into one via an unresolved expression and subsequent resolution. Do you have any objections to this approach? |
|
ok to test |
|
@mn-mikke Could you update the PR title? |
|
Test build #88596 has finished for PR 20858 at commit
|
|
retest please |
|
Test build #88598 has finished for PR 20858 at commit
|
|
retest please |
|
Test build #88605 has finished for PR 20858 at commit
|
| [Row(arr=[1, 2, 3, 4, 5]), Row(arr=None)] | ||
| """ | ||
| sc = SparkContext._active_spark_context | ||
| return Column(sc._jvm.functions.concat(_to_seq(sc, cols, _to_java_column))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we move this down .. ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole file is divide into sections according to groups of functions. Based on @gatorsmile's suggestion, the concat function should be categorized as a collection function. So I moved the function to comply with the file structure.
|
It seems that we experienced the same problem with failing "RateSourceV2Suite.basic microbatch execution" test reported here |
|
Test build #89286 has finished for PR 20858 at commit
|
|
Test build #89302 has finished for PR 20858 at commit
|
|
Test build #89323 has finished for PR 20858 at commit
|
| ("UnsafeArrayData", arrayData), | ||
| ("int[]", counter))) | ||
|
|
||
| s"""new Object() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
s"""
|new Object() {
...
| if (inputs.contains(null)) { | ||
| null | ||
| } else { | ||
| val elements = inputs.flatMap(_.asInstanceOf[ArrayData].toObjectArray(elementType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we always allocate an concatenated array? I think that the total array element size may be overflow in some cases.
| arguments = Seq( | ||
| (s"${javaType}[]", "args"), | ||
| ("UnsafeArrayData", arrayData), | ||
| ("int[]", counter))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can simply use for-loop here?
for (int $idx = 0; $idx < ${children.length}; $idx++) {
for (int z = 0; z < args[$idx].numElements(); z++) {
...
}
}| |int[] $tempVariableName = new int[]{0}; | ||
| |$assignmentSection | ||
| |final int $numElementsConstant = $tempVariableName[0]; | ||
| """.stripMargin, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can simply use for-loop here?
int $tempVariableName = 0;
for (int $idx = 0; $idx < ${children.length}; $idx++) {
$tempVariableName += args[$idx].numElements();
}
final int $numElementsConstant = $tempVariableName;| |boolean[] $isNullVariable = new boolean[]{false}; | ||
| |$assignmentSection; | ||
| |if ($isNullVariable[0]) return null; | ||
| """.stripMargin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can simply use for-loop here?
for (int $idx = 0; $idx < ${children.length}; $idx++) {
if (args[$idx] == null) {
return null;
}
}We can return as soon as we found null in this case.
| val assignmentSection = ctx.splitExpressions( | ||
| expressions = assignments, | ||
| funcName = "complexArrayConcat", | ||
| arguments = Seq((s"${javaType}[]", "args"), ("Object[]", arrayData), ("int[]", counter))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can simply use for-loop here?
for (int $idx = 0; $idx < ${children.length}; $idx++) {
for (int z = 0; z < args[$idx].numElements(); z++) {
...
}
}| val assignments = (0 until children.length).map { idx => | ||
| s""" | ||
| |for (int z = 0; z < args[$idx].numElements(); z++) { | ||
| | $arrayData[$counter[0]] = ${CodeGenerator.getValue(s"args[$idx]", elementType, "z")}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we operate only with non-primitive types where null is treated as a regular value so the null check shouldn't be necessary.
The added tests should cover this scenario.
|
Test build #89402 has finished for PR 20858 at commit
|
|
Test build #89456 has finished for PR 20858 at commit
|
|
|
||
| override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) | ||
|
|
||
| lazy val javaType: String = CodeGenerator.javaType(dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move this into doGenCode() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! But I think it would be better to reuse javaType also in genCodeForPrimitiveArrays and genCodeForNonPrimitiveArrays.
| ev.copy(s""" | ||
| $initCode | ||
| $codes | ||
| ${javaType} ${ev.value} = $concatenator.concat($args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: $javaType
|
LGTM except for nits. |
|
Test build #89495 has finished for PR 20858 at commit
|
|
Test build #89504 has finished for PR 20858 at commit
|
|
Test build #89560 has finished for PR 20858 at commit
|
|
Test build #89573 has finished for PR 20858 at commit
|
|
Thanks! merging to master. |
|
|
||
| override def foldable: Boolean = children.forall(_.foldable) | ||
|
|
||
| override def eval(input: InternalRow): Any = dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this pattern match will probably cause significant regression in the interpreted (non-codegen) mode, due to the way scala pattern matching is implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
How was this patch tested?
New tests added into:
Codegen examples
Primitive-type elements
Result:
Non-primitive-type elements
Result: