From 760edad510ad255af3bff5ddfa76c6079f3dc1bf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 8 Sep 2017 07:04:45 +0000 Subject: [PATCH 1/4] Add test fro JacksonUtils. --- .../sql/catalyst/json/JacksonUtils.scala | 4 +- .../sql/catalyst/json/JacksonUtilsSuite.scala | 81 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 3b23c6cd2816..134d16e981a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -44,7 +44,9 @@ object JacksonUtils { case at: ArrayType => verifyType(name, at.elementType) - case mt: MapType => verifyType(name, mt.keyType) + // For MapType, its keys are treated as a string (i.e. calling `toString`) basically when + // generating JSON, so we only care if the values are valid for JSON. + case mt: MapType => verifyType(name, mt.valueType) case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala new file mode 100644 index 000000000000..5d2ebad7d609 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter +import java.util.TimeZone + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.RandomDataGenerator +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.types._ + +class JacksonUtilsSuite extends SparkFunSuite { + + test("verifySchema") { + def verfifyJSONGenerate(schema: StructType): Unit = { + val convertToInternalRow = CatalystTypeConverters.createToCatalystConverter(schema) + val maybeDataGen = RandomDataGenerator.forType(schema, nullable = false) + val dataGen = maybeDataGen.getOrElse( + fail(s"Failed to create data generator for type $schema")) + val row = convertToInternalRow(dataGen.apply()).asInstanceOf[InternalRow] + val gen = new JacksonGenerator( + schema, new CharArrayWriter(), new JSONOptions(Map.empty, TimeZone.getDefault.getID)) + gen.write(row) + } + + // The valid schema + val atomicTypes = DataTypeTestUtils.atomicTypes + val atomicArrayTypes = atomicTypes.map(ArrayType(_, containsNull = false)) + val atomicMapTypes = for (keyType <- atomicTypes; + valueType <- atomicTypes) yield MapType(keyType, valueType, false) + + (atomicTypes ++ atomicArrayTypes ++ atomicMapTypes).foreach { dataType => + val schema = StructType(StructField("a", dataType, nullable = false) :: Nil) + JacksonUtils.verifySchema(schema) + verfifyJSONGenerate(schema) + } + + val invalidTypes = Seq(CalendarIntervalType) + + // For MapType, its keys are treated as a string basically when generating JSON, so we only + // care if the values are valid for JSON. + val alsoValidMapTypes = for (keyType <- atomicTypes ++ invalidTypes; + valueType <- atomicTypes) yield MapType(keyType, valueType, true) + alsoValidMapTypes.foreach { dataType => + val schema = StructType(StructField("a", dataType, nullable = false) :: Nil) + JacksonUtils.verifySchema(schema) + verfifyJSONGenerate(schema) + } + + // The invalid schema + val invalidArrayTypes = invalidTypes.map(ArrayType(_, containsNull = false)) + val invalidMapTypes = for (keyType <- atomicTypes ++ invalidTypes; + valueType <- invalidTypes) yield MapType(keyType, valueType, false) + + (invalidTypes ++ invalidArrayTypes ++ invalidMapTypes).foreach { dataType => + val schema = StructType(StructField("a", dataType, nullable = false) :: Nil) + intercept[UnsupportedOperationException] { + JacksonUtils.verifySchema(schema) + } + intercept[RuntimeException] { + verfifyJSONGenerate(schema) + } + } + } +} From 884c53305bad3d9a543d0feaccc2606dc3fe5928 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 9 Sep 2017 04:33:48 +0000 Subject: [PATCH 2/4] Address comments. --- .../expressions/JsonExpressionsSuite.scala | 23 ++++++ .../sql/catalyst/json/JacksonUtilsSuite.scala | 81 ------------------- .../apache/spark/sql/JsonFunctionsSuite.scala | 12 +++ 3 files changed, 35 insertions(+), 81 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 9991bda165a0..96e585de909e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,6 +21,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -610,4 +611,26 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { """{"t":"2015-12-31T16:00:00"}""" ) } + + test("to_json: verify MapType's value type instead of key type") { + // Keys in map are treated as strings when converting to JSON. The type doesn't matter at all. + val mapType1 = MapType(CalendarIntervalType, IntegerType) + val schema1 = StructType(StructField("a", mapType1) :: Nil) + val struct1 = Literal.create(null, schema1) + checkEvaluation( + StructsToJson(Map.empty, struct1, gmtId), + null + ) + + // The value type must be valid for converting to JSON. + val mapType2 = MapType(IntegerType, CalendarIntervalType) + val schema2 = StructType(StructField("a", mapType2) :: Nil) + val struct2 = Literal.create(null, schema2) + intercept[UnresolvedException[_]] { + checkEvaluation( + StructsToJson(Map.empty, struct2, gmtId), + null + ) + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala deleted file mode 100644 index 5d2ebad7d609..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonUtilsSuite.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.json - -import java.io.CharArrayWriter -import java.util.TimeZone - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.RandomDataGenerator -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.types._ - -class JacksonUtilsSuite extends SparkFunSuite { - - test("verifySchema") { - def verfifyJSONGenerate(schema: StructType): Unit = { - val convertToInternalRow = CatalystTypeConverters.createToCatalystConverter(schema) - val maybeDataGen = RandomDataGenerator.forType(schema, nullable = false) - val dataGen = maybeDataGen.getOrElse( - fail(s"Failed to create data generator for type $schema")) - val row = convertToInternalRow(dataGen.apply()).asInstanceOf[InternalRow] - val gen = new JacksonGenerator( - schema, new CharArrayWriter(), new JSONOptions(Map.empty, TimeZone.getDefault.getID)) - gen.write(row) - } - - // The valid schema - val atomicTypes = DataTypeTestUtils.atomicTypes - val atomicArrayTypes = atomicTypes.map(ArrayType(_, containsNull = false)) - val atomicMapTypes = for (keyType <- atomicTypes; - valueType <- atomicTypes) yield MapType(keyType, valueType, false) - - (atomicTypes ++ atomicArrayTypes ++ atomicMapTypes).foreach { dataType => - val schema = StructType(StructField("a", dataType, nullable = false) :: Nil) - JacksonUtils.verifySchema(schema) - verfifyJSONGenerate(schema) - } - - val invalidTypes = Seq(CalendarIntervalType) - - // For MapType, its keys are treated as a string basically when generating JSON, so we only - // care if the values are valid for JSON. - val alsoValidMapTypes = for (keyType <- atomicTypes ++ invalidTypes; - valueType <- atomicTypes) yield MapType(keyType, valueType, true) - alsoValidMapTypes.foreach { dataType => - val schema = StructType(StructField("a", dataType, nullable = false) :: Nil) - JacksonUtils.verifySchema(schema) - verfifyJSONGenerate(schema) - } - - // The invalid schema - val invalidArrayTypes = invalidTypes.map(ArrayType(_, containsNull = false)) - val invalidMapTypes = for (keyType <- atomicTypes ++ invalidTypes; - valueType <- invalidTypes) yield MapType(keyType, valueType, false) - - (invalidTypes ++ invalidArrayTypes ++ invalidMapTypes).foreach { dataType => - val schema = StructType(StructField("a", dataType, nullable = false) :: Nil) - intercept[UnsupportedOperationException] { - JacksonUtils.verifySchema(schema) - } - intercept[RuntimeException] { - verfifyJSONGenerate(schema) - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index cf2d00fc9442..b33edfc1305f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -257,6 +257,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { "A type of keys and values in map() must be string, but got")) } + test("SPARK-21954: JacksonUtils should verify MapType's value type instead of key type") { + // interval type is invalid for converting to JSON. However, the keys of a map are treated + // as strings, so its type doesn't matter. + checkAnswer( + sql("SELECT to_json(struct(map(interval 1 second, 'a')))"), + Row("""{"col1":{"interval 1 seconds":"a"}}""") :: Nil) + val e = intercept[AnalysisException] { + sql("SELECT to_json(struct(map('a', interval 1 second)))") + } + assert(e.getMessage.contains("Unable to convert column col1 of type calendarinterval to JSON")) + } + test("SPARK-19967 Support from_json in SQL") { val df1 = Seq("""{"a": 1}""").toDS() checkAnswer( From 2cf6bbba922e72e1482e8cb158b08063816e45b8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 9 Sep 2017 06:05:09 +0000 Subject: [PATCH 3/4] Address comment. --- .../apache/spark/sql/JsonFunctionsSuite.scala | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index b33edfc1305f..d00abf1bc36d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.sql.functions.{from_json, struct, to_json} +import org.apache.spark.sql.functions.{from_json, lit, map, struct, to_json} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -196,14 +196,28 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { } test("to_json unsupported type") { - val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a") - .select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c")) + val baseDf = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a") + val df = baseDf.select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c")) val e = intercept[AnalysisException]{ // Unsupported type throws an exception df.select(to_json($"c")).collect() } assert(e.getMessage.contains( "Unable to convert column a of type calendarinterval to JSON.")) + + // interval type is invalid for converting to JSON. However, the keys of a map are treated + // as strings, so its type doesn't matter. + val df2 = baseDf + .select(struct(map($"a._1".cast(CalendarIntervalType), lit("a")).as("col1")).as("c")) + val df3 = baseDf + .select(struct(map(lit("a"), $"a._1".cast(CalendarIntervalType)).as("col1")).as("c")) + checkAnswer( + df2.select(to_json($"c")), + Row("""{"col1":{"interval -3 months 7 hours":"a"}}""") :: Nil) + val e2 = intercept[AnalysisException] { + df3.select(to_json($"c")).collect() + } + assert(e2.getMessage.contains("Unable to convert column col1 of type calendarinterval to JSON")) } test("roundtrip in to_json and from_json - struct") { @@ -257,18 +271,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { "A type of keys and values in map() must be string, but got")) } - test("SPARK-21954: JacksonUtils should verify MapType's value type instead of key type") { - // interval type is invalid for converting to JSON. However, the keys of a map are treated - // as strings, so its type doesn't matter. - checkAnswer( - sql("SELECT to_json(struct(map(interval 1 second, 'a')))"), - Row("""{"col1":{"interval 1 seconds":"a"}}""") :: Nil) - val e = intercept[AnalysisException] { - sql("SELECT to_json(struct(map('a', interval 1 second)))") - } - assert(e.getMessage.contains("Unable to convert column col1 of type calendarinterval to JSON")) - } - test("SPARK-19967 Support from_json in SQL") { val df1 = Seq("""{"a": 1}""").toDS() checkAnswer( From fbf8a88ba76e03602d82060b982038ddb53b43df Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 9 Sep 2017 06:53:50 +0000 Subject: [PATCH 4/4] Tweak the test. --- .../expressions/JsonExpressionsSuite.scala | 4 ++-- .../apache/spark/sql/JsonFunctionsSuite.scala | 20 +++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 96e585de909e..5de11433b0c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,7 +21,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -626,7 +626,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val mapType2 = MapType(IntegerType, CalendarIntervalType) val schema2 = StructType(StructField("a", mapType2) :: Nil) val struct2 = Literal.create(null, schema2) - intercept[UnresolvedException[_]] { + intercept[TreeNodeException[_]] { checkEvaluation( StructsToJson(Map.empty, struct2, gmtId), null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index d00abf1bc36d..119af213047f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -195,6 +195,16 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row("""{"_1":"26/08/2015 18:00"}""") :: Nil) } + test("to_json - key types of map don't matter") { + // interval type is invalid for converting to JSON. However, the keys of a map are treated + // as strings, so its type doesn't matter. + val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a") + .select(struct(map($"a._1".cast(CalendarIntervalType), lit("a")).as("col1")).as("c")) + checkAnswer( + df.select(to_json($"c")), + Row("""{"col1":{"interval -3 months 7 hours":"a"}}""") :: Nil) + } + test("to_json unsupported type") { val baseDf = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a") val df = baseDf.select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c")) @@ -205,17 +215,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(e.getMessage.contains( "Unable to convert column a of type calendarinterval to JSON.")) - // interval type is invalid for converting to JSON. However, the keys of a map are treated - // as strings, so its type doesn't matter. + // interval type is invalid for converting to JSON. We can't use it as value type of a map. val df2 = baseDf - .select(struct(map($"a._1".cast(CalendarIntervalType), lit("a")).as("col1")).as("c")) - val df3 = baseDf .select(struct(map(lit("a"), $"a._1".cast(CalendarIntervalType)).as("col1")).as("c")) - checkAnswer( - df2.select(to_json($"c")), - Row("""{"col1":{"interval -3 months 7 hours":"a"}}""") :: Nil) val e2 = intercept[AnalysisException] { - df3.select(to_json($"c")).collect() + df2.select(to_json($"c")).collect() } assert(e2.getMessage.contains("Unable to convert column col1 of type calendarinterval to JSON")) }