Skip to content

Conversation

@mn-mikke
Copy link
Contributor

@mn-mikke mn-mikke commented Mar 29, 2018

What changes were proposed in this pull request?

This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises:

  • An expression for flattening array structure
  • Flatten function
  • A wrapper for PySpark

How was this patch tested?

New tests added into:

  • CollectionExpressionsSuite
  • DataFrameFunctionsSuite

Codegen examples

Primitive type

val df = Seq(
  Seq(Seq(1, 2), Seq(4, 5)),
  Seq(null, Seq(1))
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen

Result:

/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 064 */               project_numElements,
/* 065 */               4);
/* 066 */             if (project_size > 2147483632) {
/* 067 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 068 */                 project_size + " bytes of data due to exceeding the limit 2147483632" +
/* 069 */                 " bytes for UnsafeArrayData.");
/* 070 */             }
/* 071 */
/* 072 */             byte[] project_array = new byte[(int)project_size];
/* 073 */             UnsafeArrayData project_tempArrayData = new UnsafeArrayData();
/* 074 */             Platform.putLong(project_array, 16, project_numElements);
/* 075 */             project_tempArrayData.pointTo(project_array, 16, (int)project_size);
/* 076 */             int project_counter = 0;
/* 077 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 078 */               ArrayData arr = inputadapter_value.getArray(k);
/* 079 */               for (int l = 0; l < arr.numElements(); l++) {
/* 080 */                 if (arr.isNullAt(l)) {
/* 081 */                   project_tempArrayData.setNullAt(project_counter);
/* 082 */                 } else {
/* 083 */                   project_tempArrayData.setInt(
/* 084 */                     project_counter,
/* 085 */                     arr.getInt(l)
/* 086 */                   );
/* 087 */                 }
/* 088 */                 project_counter++;
/* 089 */               }
/* 090 */             }
/* 091 */             project_value = project_tempArrayData;
/* 092 */
/* 093 */           }
/* 094 */
/* 095 */         }

Non-primitive type

val df = Seq(
  Seq(Seq("a", "b"), Seq(null, "d")),
  Seq(null, Seq("a"))
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen

Result:

/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             Object[] project_arrayObject = new Object[(int)project_numElements];
/* 064 */             int project_counter = 0;
/* 065 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 066 */               ArrayData arr = inputadapter_value.getArray(k);
/* 067 */               for (int l = 0; l < arr.numElements(); l++) {
/* 068 */                 project_arrayObject[project_counter] = arr.getUTF8String(l);
/* 069 */                 project_counter++;
/* 070 */               }
/* 071 */             }
/* 072 */             project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObject);
/* 073 */
/* 074 */           }
/* 075 */
/* 076 */         }

:param col: name of column or expression
>>> df = spark.createDataFrame([([[1, 2, 3], [4, 5], [6]],),([None, [4, 5]],)], ['data'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick note: ,),( -> ,), (

@gatorsmile
Copy link
Member

Thanks for your contribution! Try to improve your test cases by reading the other open source code (e.g., this)?

@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Mar 30, 2018

Test build #88758 has finished for PR 20938 at commit 755d6db.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Flatten(child: Expression) extends UnaryExpression

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Apr 2, 2018

Rewrote test cases. @gatorsmile Please let me know if it's OK.

@SparkQA
Copy link

SparkQA commented Apr 3, 2018

Test build #88836 has finished for PR 20938 at commit ad46962.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Apr 3, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 3, 2018

Test build #88842 has finished for PR 20938 at commit eeab727.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Apr 8, 2018

Any other comments?

if (
ArrayType.acceptsType(child.dataType) &&
ArrayType.acceptsType(child.dataType.asInstanceOf[ArrayType].elementType)
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

child.dataType match {
  case _: ArrayType(_: ArrayType, _) =>
    TypeCheckResult.TypeCheckSuccess
  case _: => 
    TypeCheckResult.TypeCheckFailure(
      "The argument should be an array of arrays, " +
      s"but '${child.sql}' is of ${child.dataType.simpleString} type.")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one!

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
val code =
if (CodeGenerator.isPrimitiveType(dataType.elementType)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very tiny nit: shall we move this line up?

val code = if (Code...

|if(!${ev.isNull}) {
| $coreLogic
|}
""".stripMargin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

* @group collection_funcs
* @since 2.4.0
*/
def flatten(e: Column): Column = withExpr{ Flatten(e.expr) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r{ -> r {

checkAnswer(
oneRowDF.selectExpr("flatten(array(arr, array(null, 5), array(6, null)))"),
Seq(Row(Seq(1, 2, 3, null, 5, 6, null)))
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move it up.

Examples:
> SELECT _FUNC_(array(array(1, 2), array(3, 4));
[1,2,3,4]
""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  """,
  since = "2.4.0")

val elements = array.asInstanceOf[ArrayData].toObjectArray(dataType)

if (elements.contains(null)) {
null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean if input array has null in the elements, return null ignoring other elements when we are not in codegen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. The function also behaves the same way when codegen is applied. See test cases with a null array in CollectionExpressionsSuite.

We can discuss whether the function should behave the same way as in Presto and just ignore null elements... But I think that the current approach fits more into the semantics of Spark functions.

concat("a",null,"c") => null
1 + null => null
...

@SparkQA
Copy link

SparkQA commented Apr 9, 2018

Test build #89055 has finished for PR 20938 at commit a50d42e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Apr 9, 2018

Can't reproduce it locally and seems to unrelated...

@mn-mikke
Copy link
Contributor Author

mn-mikke commented Apr 9, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89093 has finished for PR 20938 at commit e213341.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89104 has finished for PR 20938 at commit e213341.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

Any idea why those tests are failing?

@gatorsmile
Copy link
Member

cc @ueshin

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89119 has finished for PR 20938 at commit b9d99f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait Node extends Serializable
  • sealed trait ClassificationNode extends Node
  • sealed trait RegressionNode extends Node
  • sealed trait LeafNode extends Node
  • sealed trait InternalNode extends Node
  • case class ExprCode(var code: String, var isNull: ExprValue, var value: ExprValue)
  • case class SubExprEliminationState(isNull: ExprValue, value: ExprValue)
  • abstract class ExprValue
  • class LiteralValue(val value: String, val javaType: String) extends ExprValue
  • case class VariableValue(
  • case class StatementValue(
  • case class GlobalValue(val value: String, val javaType: String) extends ExprValue

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89303 has finished for PR 20938 at commit 0e0def4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

coreLogic: String): String = {
s"""
|for(int z=0; z < $childVariableName.numElements(); z++) {
| ${ev.isNull} |= $childVariableName.isNullAt(z);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about breaking when null is found?

since = "2.4.0")
case class Flatten(child: Expression) extends UnaryExpression {

override def nullable: Boolean = child.nullable || dataType.containsNull
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.nullable || child.dataType.asInstanceOf[ArrayType].containsNull?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot!

val code = if (CodeGenerator.isPrimitiveType(dataType.elementType)) {
genCodeForConcatOfPrimitiveElements(ctx, c, ev.value)
} else {
genCodeForConcatOfComplexElements(ctx, c, ev.value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we say "complex" for non-primitive types?

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
val code = if (CodeGenerator.isPrimitiveType(dataType.elementType)) {
genCodeForConcatOfPrimitiveElements(ctx, c, ev.value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent.

if (elements.contains(null)) {
null
} else {
val flattened = elements.flatMap(
Copy link
Member

@kiszk kiszk Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need size check whether the total number of array elements is less than or equal to possible max array size?

Copy link
Contributor

@wajda wajda Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, especially in combination with this comment, provided that the resulted array length is known in advance. flatMap can then be replaced with a simple loop copying chanks of data into a preallocated array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been searching for a well-defined constant indicating the VM limit for array size. It seems that the limit is platform-dependent... Any idea how to get the limit for a given platform?

s"""
|$numElemCode
|$unsafeArraySizeInBytes
|byte[] $arrayName = new byte[$arraySizeName];
Copy link
Member

@kiszk kiszk Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need size check whether the total number of array elements is less than or equal to possible max array size?
If we could use long[], we can accept more array elements.

Copy link
Contributor Author

@mn-mikke mn-mikke Apr 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You made a really good point about checking the total number of array elements!

Re: long[] - It seems that UnsafeArrayData is not currently ready for that. It would require a bigger refactoring... In theory, we could push limits even further. If implemented UnsafeArrayData in a similar way like Scala Vectors but with leaves represented as byte[MAX_SIZE], the only limits would be the heap size and computing power. But is there any real case scenario where we needed to store more than 2GB into one record?

Copy link
Member

@kiszk kiszk Apr 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late comment. I think that it is fine to use byte[] for now. It is just a possibly choice to use long[].

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89518 has finished for PR 20938 at commit 9081291.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mn-mikke
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89559 has finished for PR 20938 at commit f11aa7b.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArrayPosition(left: Expression, right: Expression)

@ueshin
Copy link
Member

ueshin commented Apr 19, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89574 has finished for PR 20938 at commit 88c4971.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ElementAt(left: Expression, right: Expression) extends GetMapValueUtil
  • abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes
  • case class GetMapValue(child: Expression, key: Expression)

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89575 has finished for PR 20938 at commit 37b68cd.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ElementAt(left: Expression, right: Expression) extends GetMapValueUtil

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89582 has finished for PR 20938 at commit 37b68cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ElementAt(left: Expression, right: Expression) extends GetMapValueUtil

@ueshin
Copy link
Member

ueshin commented Apr 20, 2018

LGTM pending Jenkins.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89620 has finished for PR 20938 at commit 508fee0.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Concat(children: Seq[Expression]) extends Expression

@ueshin
Copy link
Member

ueshin commented Apr 20, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89631 has finished for PR 20938 at commit 939fc23.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Apr 20, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89658 has finished for PR 20938 at commit 939fc23.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Apr 24, 2018

@ueshin you forget to merge this? ;)

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89775 has finished for PR 20938 at commit 939fc23.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Apr 24, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89783 has finished for PR 20938 at commit 939fc23.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89788 has finished for PR 20938 at commit 939fc23.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Apr 25, 2018

I'm sorry for the delay.
Thanks! merging to master.

@asfgit asfgit closed this in 5fea17b Apr 25, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants