Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,37 +67,61 @@ trait BinaryArrayExpressionWithImplicitCast extends BinaryExpression


/**
* Given an array or map, returns its size. Returns -1 if null.
* Given an array or map, returns total number of elements in it.
*/
@ExpressionDescription(
usage = "_FUNC_(expr) - Returns the size of an array or a map. Returns -1 if null.",
usage = """
_FUNC_(expr) - Returns the size of an array or a map.
The function returns -1 if its input is null and spark.sql.legacy.sizeOfNull is set to true.
If spark.sql.legacy.sizeOfNull is set to false, the function returns null for null input.
By default, the spark.sql.legacy.sizeOfNull parameter is set to true.
""",
examples = """
Examples:
> SELECT _FUNC_(array('b', 'd', 'c', 'a'));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the description of this function?

4
> SELECT _FUNC_(map('a', 1, 'b', 2));
2
> SELECT _FUNC_(NULL);
-1
""")
case class Size(child: Expression) extends UnaryExpression with ExpectsInputTypes {
case class Size(
child: Expression,
legacySizeOfNull: Boolean)
extends UnaryExpression with ExpectsInputTypes {

def this(child: Expression) =
this(
child,
legacySizeOfNull = SQLConf.get.getConf(SQLConf.LEGACY_SIZE_OF_NULL))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since now we can access the conf also on executor side, do we need these changes? Can't we just get this value as a val?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it works now, I will try to read the config on executor's side. I was just struggling to an issue in tests for another PR when SQL configs were not propagated to executors. For example:

val serializer = new JavaSerializer(new SparkConf()).newInstance
val resolver = ResolveTimeZone(new SQLConf)
resolver.resolveTimeZones(serializer.deserialize(serializer.serialize(expression)))

Also I see some other places where configs are read via passing to constructors:

forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was made possible in #21376


override def dataType: DataType = IntegerType
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(ArrayType, MapType))
override def nullable: Boolean = false
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable

override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null) {
-1
if (legacySizeOfNull) -1 else null
} else child.dataType match {
case _: ArrayType => value.asInstanceOf[ArrayData].numElements()
case _: MapType => value.asInstanceOf[MapData].numElements()
case other => throw new UnsupportedOperationException(
s"The size function doesn't support the operand type ${other.getClass.getCanonicalName}")
}
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
ev.copy(code = code"""
if (legacySizeOfNull) {
val childGen = child.genCode(ctx)
ev.copy(code = code"""
boolean ${ev.isNull} = false;
${childGen.code}
${CodeGenerator.javaType(dataType)} ${ev.value} = ${childGen.isNull} ? -1 :
(${childGen.value}).numElements();""", isNull = FalseLiteral)
} else {
defineCodeGen(ctx, ev, c => s"($c).numElements()")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,12 @@ object SQLConf {
"Other column values can be ignored during parsing even if they are malformed.")
.booleanConf
.createWithDefault(true)

val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.function.sizeOfNull is better? btw, If major releases happen in Spark, we can remove these kinds of options for back-compatibility? (just a question)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds more consistent to have spark.sql.function prefix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would like to reserve the namespace spark.sql.legacy.* for all legacy configurations. /cc @marmbrus @rxin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to fix other things accordingly too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we would like to change/improve external behavior under flags in the spark.sql.legacy.* namespace. All those flags should be removed in the next major release 3.0. Please, share your thoughts about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing is fine. I am good to have such prefix but I wonder what's changed after #21427 (comment). Sounds basically similar to what I suggested. Where did that discussion happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created https://issues.apache.org/jira/browse/SPARK-24625 to track it.

It's similar to #21427 (comment) , but as I replied in that PR, having version specific config is an overkill, while legacy is simpler and more explicit that it will be removed in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically the same except that the postfix includes a specific version, which was just a rough idea.

.doc("If it is set to true, size of null returns -1. This behavior was inherited from Hive. " +
"The size function returns null for null input if the flag is disabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps you should say this will be updated to false in spark 3.0?

.booleanConf
.createWithDefault(true)
}

/**
Expand Down Expand Up @@ -1686,6 +1692,8 @@ class SQLConf extends Serializable with Logging {

def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)

def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,37 @@ import org.apache.spark.sql.types._

class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

test("Array and Map Size") {
def testSize(legacySizeOfNull: Boolean, sizeOfNull: Any): Unit = {
val a0 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType))
val a1 = Literal.create(Seq[Integer](), ArrayType(IntegerType))
val a2 = Literal.create(Seq(1, 2), ArrayType(IntegerType))

checkEvaluation(Size(a0), 3)
checkEvaluation(Size(a1), 0)
checkEvaluation(Size(a2), 2)
checkEvaluation(Size(a0, legacySizeOfNull), 3)
checkEvaluation(Size(a1, legacySizeOfNull), 0)
checkEvaluation(Size(a2, legacySizeOfNull), 2)

val m0 = Literal.create(Map("a" -> "a", "b" -> "b"), MapType(StringType, StringType))
val m1 = Literal.create(Map[String, String](), MapType(StringType, StringType))
val m2 = Literal.create(Map("a" -> "a"), MapType(StringType, StringType))

checkEvaluation(Size(m0), 2)
checkEvaluation(Size(m1), 0)
checkEvaluation(Size(m2), 1)
checkEvaluation(Size(m0, legacySizeOfNull), 2)
checkEvaluation(Size(m1, legacySizeOfNull), 0)
checkEvaluation(Size(m2, legacySizeOfNull), 1)

checkEvaluation(
Size(Literal.create(null, MapType(StringType, StringType)), legacySizeOfNull),
expected = sizeOfNull)
checkEvaluation(
Size(Literal.create(null, ArrayType(StringType)), legacySizeOfNull),
expected = sizeOfNull)
}

test("Array and Map Size - legacy") {
testSize(legacySizeOfNull = true, sizeOfNull = -1)
}

checkEvaluation(Size(Literal.create(null, MapType(StringType, StringType))), -1)
checkEvaluation(Size(Literal.create(null, ArrayType(StringType))), -1)
test("Array and Map Size") {
testSize(legacySizeOfNull = false, sizeOfNull = null)
}

test("MapKeys/MapValues") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,7 @@ object functions {
* @group collection_funcs
* @since 1.5.0
*/
def size(e: Column): Column = withExpr { Size(e.expr) }
def size(e: Column): Column = withExpr { new Size(e.expr) }

/**
* Sorts the input array for the given column in ascending order,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,26 +487,29 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
}.getMessage().contains("only supports array input"))
}

test("array size function") {
def testSizeOfArray(sizeOfNull: Any): Unit = {
val df = Seq(
(Seq[Int](1, 2), "x"),
(Seq[Int](), "y"),
(Seq[Int](1, 2, 3), "z"),
(null, "empty")
).toDF("a", "b")
checkAnswer(
df.select(size($"a")),
Seq(Row(2), Row(0), Row(3), Row(-1))
)
checkAnswer(
df.selectExpr("size(a)"),
Seq(Row(2), Row(0), Row(3), Row(-1))
)

checkAnswer(
df.selectExpr("cardinality(a)"),
Seq(Row(2L), Row(0L), Row(3L), Row(-1L))
)
checkAnswer(df.select(size($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
checkAnswer(df.selectExpr("size(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
checkAnswer(df.selectExpr("cardinality(a)"), Seq(Row(2L), Row(0L), Row(3L), Row(sizeOfNull)))
}

test("array size function - legacy") {
withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "true") {
testSizeOfArray(sizeOfNull = -1)
}
}

test("array size function") {
withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "false") {
testSizeOfArray(sizeOfNull = null)
}
}

test("dataframe arrays_zip function") {
Expand Down Expand Up @@ -567,21 +570,28 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
}
}

test("map size function") {
def testSizeOfMap(sizeOfNull: Any): Unit = {
val df = Seq(
(Map[Int, Int](1 -> 1, 2 -> 2), "x"),
(Map[Int, Int](), "y"),
(Map[Int, Int](1 -> 1, 2 -> 2, 3 -> 3), "z"),
(null, "empty")
).toDF("a", "b")
checkAnswer(
df.select(size($"a")),
Seq(Row(2), Row(0), Row(3), Row(-1))
)
checkAnswer(
df.selectExpr("size(a)"),
Seq(Row(2), Row(0), Row(3), Row(-1))
)

checkAnswer(df.select(size($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
checkAnswer(df.selectExpr("size(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
}

test("map size function - legacy") {
withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "true") {
testSizeOfMap(sizeOfNull = -1: Int)
}
}

test("map size function") {
withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "false") {
testSizeOfMap(sizeOfNull = null)
}
}

test("map_keys/map_values function") {
Expand Down