Skip to content

Commit 59f1e76

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-31020][SPARK-31023][SPARK-31025][SPARK-31044][SQL] Support foldable args by from_csv/json and schema_of_csv/json
### What changes were proposed in this pull request? In the PR, I propose: 1. To replace matching by `Literal` in `ExprUtils.evalSchemaExpr()` to checking foldable property of the `schema` expression. 2. To replace matching by `Literal` in `ExprUtils.evalTypeExpr()` to checking foldable property of the `schema` expression. 3. To change checking of the input parameter in the `SchemaOfCsv` expression, and allow foldable `child` expression. 4. To change checking of the input parameter in the `SchemaOfJson` expression, and allow foldable `child` expression. ### Why are the changes needed? This should improve Spark SQL UX for `from_csv`/`from_json`. Currently, Spark expects only literals: ```sql spark-sql> select from_csv('1,Moscow', replace('dpt_org_id INT, dpt_org_city STRING', 'dpt_org_', '')); Error in query: Schema should be specified in DDL format as a string literal or output of the schema_of_csv function instead of replace('dpt_org_id INT, dpt_org_city STRING', 'dpt_org_', '');; line 1 pos 7 spark-sql> select from_json('{"id":1, "city":"Moscow"}', replace('dpt_org_id INT, dpt_org_city STRING', 'dpt_org_', '')); Error in query: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of replace('dpt_org_id INT, dpt_org_city STRING', 'dpt_org_', '');; line 1 pos 7 ``` and only string literals are acceptable as CSV examples by `schema_of_csv`/`schema_of_json`: ```sql spark-sql> select schema_of_csv(concat_ws(',', 0.1, 1)); Error in query: cannot resolve 'schema_of_csv(concat_ws(',', CAST(0.1BD AS STRING), CAST(1 AS STRING)))' due to data type mismatch: The input csv should be a string literal and not null; however, got concat_ws(',', CAST(0.1BD AS STRING), CAST(1 AS STRING)).; line 1 pos 7; 'Project [unresolvedalias(schema_of_csv(concat_ws(,, cast(0.1 as string), cast(1 as string))), None)] +- OneRowRelation spark-sql> select schema_of_json(regexp_replace('{"item_id": 1, "item_price": 0.1}', 'item_', '')); Error in query: cannot resolve 'schema_of_json(regexp_replace('{"item_id": 1, "item_price": 0.1}', 'item_', ''))' due to data type mismatch: The input json should be a string literal and not null; however, got regexp_replace('{"item_id": 1, "item_price": 0.1}', 'item_', '').; line 1 pos 7; 'Project [unresolvedalias(schema_of_json(regexp_replace({"item_id": 1, "item_price": 0.1}, item_, )), None)] +- OneRowRelation ``` ### Does this PR introduce any user-facing change? Yes, after the changes users can pass any foldable string expression as the `schema` parameter to `from_csv()/from_json()`. For the example above: ```sql spark-sql> select from_csv('1,Moscow', replace('dpt_org_id INT, dpt_org_city STRING', 'dpt_org_', '')); {"id":1,"city":"Moscow"} spark-sql> select from_json('{"id":1, "city":"Moscow"}', replace('dpt_org_id INT, dpt_org_city STRING', 'dpt_org_', '')); {"id":1,"city":"Moscow"} ``` After change the `schema_of_csv`/`schema_of_json` functions accept foldable expressions, for example: ```sql spark-sql> select schema_of_csv(concat_ws(',', 0.1, 1)); struct<_c0:double,_c1:int> spark-sql> select schema_of_json(regexp_replace('{"item_id": 1, "item_price": 0.1}', 'item_', '')); struct<id:bigint,price:double> ``` ### How was this patch tested? Added new test to `CsvFunctionsSuite` and to `JsonFunctionsSuite`. Closes #27804 from MaxGekk/foldable-arg-csv-json-func. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent afb84e9 commit 59f1e76

File tree

7 files changed

+83
-35
lines changed

7 files changed

+83
-35
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,36 +27,29 @@ import org.apache.spark.unsafe.types.UTF8String
2727

2828
object ExprUtils {
2929

30-
def evalSchemaExpr(exp: Expression): StructType = {
31-
// Use `DataType.fromDDL` since the type string can be struct<...>.
32-
val dataType = exp match {
33-
case Literal(s, StringType) =>
34-
DataType.fromDDL(s.toString)
35-
case e @ SchemaOfCsv(_: Literal, _) =>
36-
val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
37-
DataType.fromDDL(ddlSchema.toString)
38-
case e => throw new AnalysisException(
30+
def evalTypeExpr(exp: Expression): DataType = {
31+
if (exp.foldable) {
32+
exp.eval() match {
33+
case s: UTF8String if s != null => DataType.fromDDL(s.toString)
34+
case _ => throw new AnalysisException(
35+
s"The expression '${exp.sql}' is not a valid schema string.")
36+
}
37+
} else {
38+
throw new AnalysisException(
3939
"Schema should be specified in DDL format as a string literal or output of " +
40-
s"the schema_of_csv function instead of ${e.sql}")
40+
s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}")
4141
}
42+
}
4243

44+
def evalSchemaExpr(exp: Expression): StructType = {
45+
val dataType = evalTypeExpr(exp)
4346
if (!dataType.isInstanceOf[StructType]) {
4447
throw new AnalysisException(
4548
s"Schema should be struct type but got ${dataType.sql}.")
4649
}
4750
dataType.asInstanceOf[StructType]
4851
}
4952

50-
def evalTypeExpr(exp: Expression): DataType = exp match {
51-
case Literal(s, StringType) => DataType.fromDDL(s.toString)
52-
case e @ SchemaOfJson(_: Literal, _) =>
53-
val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
54-
DataType.fromDDL(ddlSchema.toString)
55-
case e => throw new AnalysisException(
56-
"Schema should be specified in DDL format as a string literal or output of " +
57-
s"the schema_of_json function instead of ${e.sql}")
58-
}
59-
6053
def convertToMapData(exp: Expression): Map[String, String] = exp match {
6154
case m: CreateMap
6255
if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,14 @@ case class SchemaOfCsv(
165165
@transient
166166
private lazy val csv = child.eval().asInstanceOf[UTF8String]
167167

168-
override def checkInputDataTypes(): TypeCheckResult = child match {
169-
case Literal(s, StringType) if s != null => super.checkInputDataTypes()
170-
case _ => TypeCheckResult.TypeCheckFailure(
171-
s"The input csv should be a string literal and not null; however, got ${child.sql}.")
168+
override def checkInputDataTypes(): TypeCheckResult = {
169+
if (child.foldable && csv != null) {
170+
super.checkInputDataTypes()
171+
} else {
172+
TypeCheckResult.TypeCheckFailure(
173+
"The input csv should be a foldable string expression and not null; " +
174+
s"however, got ${child.sql}.")
175+
}
172176
}
173177

174178
override def eval(v: InternalRow): Any = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -764,10 +764,14 @@ case class SchemaOfJson(
764764
@transient
765765
private lazy val json = child.eval().asInstanceOf[UTF8String]
766766

767-
override def checkInputDataTypes(): TypeCheckResult = child match {
768-
case Literal(s, StringType) if s != null => super.checkInputDataTypes()
769-
case _ => TypeCheckResult.TypeCheckFailure(
770-
s"The input json should be a string literal and not null; however, got ${child.sql}.")
767+
override def checkInputDataTypes(): TypeCheckResult = {
768+
if (child.foldable && json != null) {
769+
super.checkInputDataTypes()
770+
} else {
771+
TypeCheckResult.TypeCheckFailure(
772+
"The input json should be a foldable string expression and not null; " +
773+
s"however, got ${child.sql}.")
774+
}
771775
}
772776

773777
override def eval(v: InternalRow): Any = {

sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ select from_csv('1', 1)
2424
struct<>
2525
-- !query output
2626
org.apache.spark.sql.AnalysisException
27-
Schema should be specified in DDL format as a string literal or output of the schema_of_csv function instead of 1;; line 1 pos 7
27+
The expression '1' is not a valid schema string.;; line 1 pos 7
2828

2929

3030
-- !query
@@ -91,7 +91,7 @@ select schema_of_csv(null)
9191
struct<>
9292
-- !query output
9393
org.apache.spark.sql.AnalysisException
94-
cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv should be a string literal and not null; however, got NULL.; line 1 pos 7
94+
cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got NULL.; line 1 pos 7
9595

9696

9797
-- !query
@@ -108,7 +108,7 @@ SELECT schema_of_csv(csvField) FROM csvTable
108108
struct<>
109109
-- !query output
110110
org.apache.spark.sql.AnalysisException
111-
cannot resolve 'schema_of_csv(csvtable.`csvField`)' due to data type mismatch: The input csv should be a string literal and not null; however, got csvtable.`csvField`.; line 1 pos 7
111+
cannot resolve 'schema_of_csv(csvtable.`csvField`)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got csvtable.`csvField`.; line 1 pos 7
112112

113113

114114
-- !query

sql/core/src/test/resources/sql-tests/results/json-functions.sql.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ select from_json('{"a":1}', 1)
115115
struct<>
116116
-- !query output
117117
org.apache.spark.sql.AnalysisException
118-
Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of 1;; line 1 pos 7
118+
The expression '1' is not a valid schema string.;; line 1 pos 7
119119

120120

121121
-- !query
@@ -326,7 +326,7 @@ select schema_of_json(null)
326326
struct<>
327327
-- !query output
328328
org.apache.spark.sql.AnalysisException
329-
cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a string literal and not null; however, got NULL.; line 1 pos 7
329+
cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got NULL.; line 1 pos 7
330330

331331

332332
-- !query
@@ -343,7 +343,7 @@ SELECT schema_of_json(jsonField) FROM jsonTable
343343
struct<>
344344
-- !query output
345345
org.apache.spark.sql.AnalysisException
346-
cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a string literal and not null; however, got jsontable.`jsonField`.; line 1 pos 7
346+
cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got jsontable.`jsonField`.; line 1 pos 7
347347

348348

349349
-- !query

sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,4 +212,30 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
212212
assert(readback(0).getAs[Row](0).getAs[Date](0).getTime >= 0)
213213
}
214214
}
215+
216+
test("support foldable schema by from_csv") {
217+
val options = Map[String, String]().asJava
218+
val schema = concat_ws(",", lit("i int"), lit("s string"))
219+
checkAnswer(
220+
Seq("""1,"a"""").toDS().select(from_csv($"value", schema, options)),
221+
Row(Row(1, "a")))
222+
223+
val errMsg = intercept[AnalysisException] {
224+
Seq(("1", "i int")).toDF("csv", "schema")
225+
.select(from_csv($"csv", $"schema", options)).collect()
226+
}.getMessage
227+
assert(errMsg.contains("Schema should be specified in DDL format as a string literal"))
228+
229+
val errMsg2 = intercept[AnalysisException] {
230+
Seq("1").toDF("csv").select(from_csv($"csv", lit(1), options)).collect()
231+
}.getMessage
232+
assert(errMsg2.contains("The expression '1' is not a valid schema string"))
233+
}
234+
235+
test("schema_of_csv - infers the schema of foldable CSV string") {
236+
val input = concat_ws(",", lit(0.1), lit(1))
237+
checkAnswer(
238+
spark.range(1).select(schema_of_csv(input)),
239+
Seq(Row("struct<_c0:double,_c1:int>")))
240+
}
215241
}

sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
313313
val errMsg1 = intercept[AnalysisException] {
314314
df3.selectExpr("from_json(value, 1)")
315315
}
316-
assert(errMsg1.getMessage.startsWith("Schema should be specified in DDL format as a string"))
316+
assert(errMsg1.getMessage.startsWith("The expression '1' is not a valid schema string"))
317317
val errMsg2 = intercept[AnalysisException] {
318318
df3.selectExpr("""from_json(value, 'time InvalidType')""")
319319
}
@@ -653,4 +653,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
653653
assert(json_tuple_result === len)
654654
}
655655
}
656+
657+
test("support foldable schema by from_json") {
658+
val options = Map[String, String]().asJava
659+
val schema = regexp_replace(lit("dpt_org_id INT, dpt_org_city STRING"), "dpt_org_", "")
660+
checkAnswer(
661+
Seq("""{"id":1,"city":"Moscow"}""").toDS().select(from_json($"value", schema, options)),
662+
Row(Row(1, "Moscow")))
663+
664+
val errMsg = intercept[AnalysisException] {
665+
Seq(("""{"i":1}""", "i int")).toDF("json", "schema")
666+
.select(from_json($"json", $"schema", options)).collect()
667+
}.getMessage
668+
assert(errMsg.contains("Schema should be specified in DDL format as a string literal"))
669+
}
670+
671+
test("schema_of_json - infers the schema of foldable JSON string") {
672+
val input = regexp_replace(lit("""{"item_id": 1, "item_price": 0.1}"""), "item_", "")
673+
checkAnswer(
674+
spark.range(1).select(schema_of_json(input)),
675+
Seq(Row("struct<id:bigint,price:double>")))
676+
}
656677
}

0 commit comments

Comments
 (0)