Skip to content

Conversation

@infoankitp
Copy link
Contributor

@infoankitp infoankitp commented Dec 1, 2022

What changes were proposed in this pull request?

[SPARK-41232] Adding array_append function in spark sql, Pyspark
Syntax: array_append(arr, element)

Arguments:

arr: Array of anytype of elements in which the element has to be appended.
element: Separate element type which has to be appended in the arr array. The type of element has to match with the type of elements array is holding.

select array_append(array(1, 2, 3), 4);

array_append(array(1, 2, 3), 4)
[1, 2, 3, 4]

The mainstream database supports array_append show below:

Snowflake
https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/api/snowflake.snowpark.functions.array_append.html

PostgreSQL
https://www.postgresql.org/docs/9.1/functions-array.html

MySQL
https://dev.mysql.com/doc/refman/5.7/en/json-modification-functions.html#function_json-array-append

Why are the changes needed?

New API - array_append -> To append element in a column or a value to another array column at the end

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit-tests have been added

@HyukjinKwon HyukjinKwon changed the title [SPARK-41232] Adding array_append function in spark sql, Pyspark [SPARK-41232][SQL][PYTHON Adding array_append function Dec 2, 2022
@HyukjinKwon HyukjinKwon changed the title [SPARK-41232][SQL][PYTHON Adding array_append function [SPARK-41232][SQL][PYTHON] Adding array_append function Dec 2, 2022
@HyukjinKwon
Copy link
Member

cc @zhengruifeng FYI

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.snowflake.com/en/sql-reference/functions/array_append.html

Refer to the description document of snowflake, The new element data type does not need to match the data type(s) of the existing elements in the array.

So ARRAY_ELEMENT_DIFF_TYPES should not be returned

Copy link
Contributor Author

@infoankitp infoankitp Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang Thanks for the review! Arrays are more strongly typed in Spark ! In case when we are getting mixed types of elements to append in the array, should we change the type of the array itself ?
Also, we cannot create a column with Array[AnyDataType] like below, we will eventually get an error

scala> val df3 = Seq((Array("a", "b", 2, 5d), 3)).toDF("a", "b")
org.apache.spark.SparkUnsupportedOperationException: No Encoder found for Any
- array element class: "java.lang.Object"
- field (class: "scala.Array", name: "_1")
- root class: "scala.Tuple2"

Which is why I thought that its better to analyze the types of the array at the start itself and raise if the types do not match.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is snowflake example:

select array_append(array_construct(1, 2, 3), 'HELLO');
+-------------------------------------------------+
| ARRAY_APPEND(ARRAY_CONSTRUCT(1, 2, 3), 'HELLO') |
|-------------------------------------------------|
| [                                               |
|   1,                                            |
|   2,                                            |
|   3,                                            |
|   "HELLO"                                       |
| ]                                               |
+-------------------------------------------------+

I think we should keep up with it? WDYT @zhengruifeng @HyukjinKwon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think snowflake's arrays are not strongly typed while spark's arrays' are strongly typed, since it does not allow us to mix multiple types of elements in one Array. But yes if we want to make it similar to snowflake, we will have to make spark arrays also loosely typed. Let's wait for everyone else's suggestions as well. And I think it will be a similar construct for array_insert, array_prepend as well. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documenting the behaviour of snowflake for this function in different scenarios

select array_append(array_construct(1, 2, 3), 'HELLO');
-- [ 1, 2, 3, "HELLO" ]


select array_append(array_construct(1, 2, 3), NULL);
-- [1,2,3,undefined]


select array_append(NULL, 'a');
-- null

select array_append(NULL, NULL);
-- null 

So, if the array is null, we don't create an array and return null directly while if the element is null we just insert it directly without checking if it is null.

As of now I think the function is Null Intolerant, with any null value it will return null. Need to override this behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case

select array_append(array_construct(1, 2, 3), 'HELLO'); -- [ 1, 2, 3, "HELLO" ]

Seems that Spark cannot give the same semantics, WDYT @HyukjinKwon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, I think we can refer to ArrayUnion:

scala> spark.sql(""" SELECT array_union(a,b) FROM VALUES (ARRAY(1,2,3), ARRAY(8,9), ARRAY('HELLO')) AS tab(a,b,c) """)
res7: org.apache.spark.sql.DataFrame = [array_union(a, b): array<int>]

scala> spark.sql(""" SELECT array_union(a,c) FROM VALUES (ARRAY(1,2,3), ARRAY(8,9), ARRAY('HELLO')) AS tab(a,b,c) """)
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.BINARY_ARRAY_DIFF_TYPES] Cannot resolve "array_union(a, c)" due to data type mismatch: Input to function `array_union` should have been two "ARRAY" with same element type, but it's ["ARRAY<INT>", "ARRAY<STRING>"].; line 1 pos 8;
'Project [unresolvedalias(array_union(a#13, c#15), None)]
+- SubqueryAlias tab
   +- LocalRelation [a#13, b#14, c#15]

I think we should apply same datatype validation as ArrayUnion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@beliefer beliefer Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This issue is difficult to select. cc @cloud-fan @gengliangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no other choice. Spark array type only allows one element type. We can find the wider type the array element type and the data type of the to-be-added value.

@HyukjinKwon
Copy link
Member

@LuciferYang let me know when you think it's ready to go ahead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to know if array is Array[Double](3.0, 2.0) and element is Int 1, what is the result, will it cast and return Array[Double](3.0, 2.0,1.0) or throw an exception?

Copy link
Contributor Author

@infoankitp infoankitp Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will throw an exception, since in the input types the types has to be the exact for the element and array elements. Let me add a unit test for the same as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I added another test in the DataFrameSuite and it is promoting the element to double and then appending it as a double element in the array.

Copy link
Contributor

@LuciferYang LuciferYang Dec 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for an existing function, such as ArrayUnion:

val df = Seq((Array(1.0, 2.0, 3.0), Array(4, 2))).toDF("a", "b")
val rows = df.select(array_union($"a", $"b")).collect()

the rows will be [WrappedArray(1.0, 2.0, 3.0, 4.0)]

So, I think array_append can support similar cast behavior?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, I try with other build-functions, and think we should follow array_remove in this case:

scala> spark.sql("""SELECT array_remove(a,b) FROM VALUES (ARRAY(1, 2, 3), 3) AS tab(a, b)""").show
+------------------+
|array_remove(a, b)|
+------------------+
|            [1, 2]|
+------------------+


scala> spark.sql("""SELECT array_remove(a,b) FROM VALUES (ARRAY(1.0, 2.0, 3.0), 3) AS tab(a, b)""").show
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.ARRAY_FUNCTION_DIFF_TYPES] Cannot resolve "array_remove(a, b)" due to data type mismatch: Input to `array_remove` should have been "ARRAY" followed by a value with same element type, but it's ["ARRAY<DECIMAL(2,1)>", "INT"].; line 1 pos 7;
'Project [unresolvedalias(array_remove(a#122, b#123), None)]
+- SubqueryAlias tab
   +- LocalRelation [a#122, b#123]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
...

scala> spark.sql("""SELECT array_remove(a,b) FROM VALUES (ARRAY(1, 2, 3), 3.0) AS tab(a, b)""").show
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.ARRAY_FUNCTION_DIFF_TYPES] Cannot resolve "array_remove(a, b)" due to data type mismatch: Input to `array_remove` should have been "ARRAY" followed by a value with same element type, but it's ["ARRAY<INT>", "DECIMAL(2,1)"].; line 1 pos 7;
'Project [unresolvedalias(array_remove(a#124, b#125), None)]
+- SubqueryAlias tab
   +- LocalRelation [a#124, b#125]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
...

scala> spark.sql("""SELECT array_position(a,b),a,b FROM VALUES (ARRAY(1, NULL, 3), 1.0) AS tab(a, b)""")
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.ARRAY_FUNCTION_DIFF_TYPES] Cannot resolve "array_position(a, b)" due to data type mismatch: Input to `array_position` should have been "ARRAY" followed by a value with same element type, but it's ["ARRAY<INT>", "DECIMAL(2,1)"].; line 1 pos 7;
'Project [unresolvedalias(array_position(a#564, b#565), None), a#564, b#565]
+- SubqueryAlias tab
   +- LocalRelation [a#564, b#565]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:249)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:236)
...

scala> spark.sql("""SELECT array_position(a,b),a,b FROM VALUES (ARRAY(1.0, NULL, 3.0), 1) AS tab(a, b)""")
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.ARRAY_FUNCTION_DIFF_TYPES] Cannot resolve "array_position(a, b)" due to data type mismatch: Input to `array_position` should have been "ARRAY" followed by a value with same element type, but it's ["ARRAY<DECIMAL(2,1)>", "INT"].; line 1 pos 7;
'Project [unresolvedalias(array_position(a#566, b#567), None), a#566, b#567]
+- SubqueryAlias tab
   +- LocalRelation [a#566, b#567]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
...

scala> spark.sql("""SELECT array_contains(a,b),a,b FROM VALUES (ARRAY(1.0, NULL, 3.0), 1) AS tab(a, b)""")
res50: org.apache.spark.sql.DataFrame = [array_contains(a, b): boolean, a: array<decimal(2,1)> ... 1 more field]

scala> spark.sql("""SELECT array_contains(a,b),a,b FROM VALUES (ARRAY(1, NULL, 3), 1.0) AS tab(a, b)""")
res51: org.apache.spark.sql.DataFrame = [array_contains(a, b): boolean, a: array<int> ... 1 more field]

So currently behavior (exactly matching) looks reasonable to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the new function checkInputDataTypes(), also cc @MaxGekk FYI

@LuciferYang
Copy link
Contributor

@infoankitp

2022-12-03T13:00:36.5812875Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32mExpressionsSchemaSuite:�[0m�[0m
2022-12-03T13:00:37.5439485Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- Check schemas for expression examples *** FAILED *** (838 milliseconds)�[0m�[0m
2022-12-03T13:00:37.5454687Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  417 did not equal 418 Expected 417 blocks in result file but got 418. Try regenerating the result files. (ExpressionsSchemaSuite.scala:164)�[0m�[0m

The failed test seems related to this pr, I think we need run

SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *ExpressionsSchemaSuite"

to re-generate golden files for ExpressionsSchemaSuite

@LuciferYang
Copy link
Contributor

@infoankitp Would you mind adding some sql related tests to sql-tests/inputs/array.sql?

@infoankitp
Copy link
Contributor Author

Ran the above command got below output
SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *ExpressionsSchemaSuite"

[info] ExpressionsSchemaSuite:
17:00:33.694 WARN org.apache.spark.util.Utils: Your hostname, ankit-home resolves to a loopback address: 127.0.1.1; using 10.10.0.45 instead (on interface wlp91s0)
17:00:33.700 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17:00:34.104 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

## Summary
  - Number of queries: 414
  - Number of expressions that missing example: 12
  - Expressions missing examples: bigint,binary,boolean,date,decimal,double,float,int,smallint,string,timestamp,tinyint

[info] - Check schemas for expression examples (5 seconds, 938 milliseconds)
17:00:41.385 WARN org.apache.spark.sql.ExpressionsSchemaSuite: 

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.ExpressionsSchemaSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
[info] Run completed in 9 seconds, 513 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 311 s (05:11), completed 6 Dec, 2022 5:00:41 PM



@infoankitp
Copy link
Contributor Author

@beliefer @LuciferYang Rebased the changes again! Please help review and advise if anything else needs to be done. :)

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some comment.

Examples:
> SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd');
["b","d","c","a","d"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

# Conflicts:
#	python/pyspark/sql/functions.py
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
#	sql/core/src/test/resources/sql-functions/sql-expression-schema.md
#	sql/core/src/test/resources/sql-tests/inputs/array.sql
#	sql/core/src/test/resources/sql-tests/results/ansi/array.sql.out
#	sql/core/src/test/resources/sql-tests/results/array.sql.out
#	sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@github-actions github-actions bot removed the CONNECT label Dec 30, 2022
@infoankitp
Copy link
Contributor Author

@LuciferYang @zhengruifeng @beliefer @HyukjinKwon Made the changes. Please do review once you get chance. Thanks!

@infoankitp
Copy link
Contributor Author

@LuciferYang @zhengruifeng @beliefer @cloud-fan Friendly reminder for reviewing ! Thanks!

@zhengruifeng
Copy link
Contributor

@cloud-fan @ueshin @HyukjinKwon would you mind taking a look when you find some time?

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants