From c6ab1928659c2c4fc9253d6bf81b9f25c13186ec Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 24 Jan 2019 21:46:02 +0800 Subject: [PATCH 1/4] refactor --- .../sql/catalyst/json/JacksonGenerator.scala | 2 +- .../datasources/DataSourceUtils.scala | 2 +- .../execution/datasources/FileFormat.scala | 2 +- .../datasources/csv/CSVFileFormat.scala | 4 +- .../datasources/json/JsonFileFormat.scala | 10 +- .../datasources/orc/OrcFileFormat.scala | 12 +- .../parquet/ParquetFileFormat.scala | 10 +- .../datasources/text/TextFileFormat.scala | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 110 ++++++++---------- .../spark/sql/hive/orc/OrcFileFormat.scala | 12 +- 10 files changed, 73 insertions(+), 93 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 951f5190cd50..9d82f94292cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -178,7 +178,7 @@ private[sql] class JacksonGenerator( var i = 0 while (i < row.numFields) { val field = schema(i) - if (!row.isNullAt(i)) { + if (!row.isNullAt(i) || field.dataType == NullType) { gen.writeFieldName(field.name) fieldWriters(i).apply(row, i) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 90cec5e72c1a..3334d3fc3e77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -45,7 +45,7 @@ object DataSourceUtils { */ private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { schema.foreach { field => - if (!format.supportDataType(field.dataType, isReadPath)) { + if (!format.supportsDataType(field.dataType)) { throw new AnalysisException( s"$format data source does not support ${field.dataType.catalogString} data type.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 2c162e23644e..1615580ab154 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -156,7 +156,7 @@ trait FileFormat { * Returns whether this format supports the given [[DataType]] in read/write path. * By default all data types are supported. */ - def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true + def supportsDataType(dataType: DataType): Boolean = true } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index f4f139d18005..fa599a9185c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -153,10 +153,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 40f55e706801..8fa624634f26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -140,17 +140,17 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportsDataType(f.dataType) } - case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportsDataType(elementType) case MapType(keyType, valueType, _) => - supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) + supportsDataType(keyType) && supportsDataType(valueType) - case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) case _: NullType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 14779cdba417..ee68e407e942 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -235,19 +235,17 @@ class OrcFileFormat } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportsDataType(f.dataType) } - case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportsDataType(elementType) case MapType(keyType, valueType, _) => - supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) + supportsDataType(keyType) && supportsDataType(valueType) - case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - - case _: NullType => isReadPath + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f04502d113ac..6f824755de2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -453,17 +453,17 @@ class ParquetFileFormat } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportsDataType(f.dataType) } - case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportsDataType(elementType) case MapType(keyType, valueType, _) => - supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) + supportsDataType(keyType) && supportsDataType(valueType) - case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 0607f7b3c0d4..83b238f878c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -139,7 +139,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = + override def supportsDataType(dataType: DataType): Boolean = dataType == StringType } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 54299e9808bf..7003508aa5ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -366,70 +366,54 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } - test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - withTempDir { dir => - val tempDir = new File(dir, "files").getCanonicalPath - - Seq("orc").foreach { format => - // write path - var msg = intercept[AnalysisException] { - sql("select null").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - msg = intercept[AnalysisException] { - spark.udf.register("testType", () => new NullData()) - sql("select testType()").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - // read path - // We expect the types below should be passed for backward-compatibility - - // Null type - var schema = StructType(StructField("a", NullType, true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - - // UDT having null data - schema = StructType(StructField("a", new NullUDT(), true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - } - - Seq("parquet", "csv").foreach { format => - // write path - var msg = intercept[AnalysisException] { - sql("select null").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - msg = intercept[AnalysisException] { - spark.udf.register("testType", () => new NullData()) - sql("select testType()").write.format(format).mode("overwrite").save(tempDir) - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) - - // read path - msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", NullType, true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) + test("SPARK-26716 supports writing and reading Null data type - json") { + withTempPath { dir => + val df = spark.range(10).map(id => (id, null)).toDF("c1", "c2") + df.write.json(dir.getCanonicalPath) + checkAnswer(spark.read.json(dir.getCanonicalPath), df) + val schema = + StructType(Seq(StructField("c1", LongType, true), StructField("c2", NullType, true))) + checkAnswer(spark.read.schema(schema).json(dir.getCanonicalPath), df) + } + } - msg = intercept[AnalysisException] { - val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) - spark.range(1).write.format(format).mode("overwrite").save(tempDir) - spark.read.schema(schema).format(format).load(tempDir).collect() - }.getMessage - assert(msg.toLowerCase(Locale.ROOT) - .contains(s"$format data source does not support null data type.")) + test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") { + withTempDir { dir => + val tempDir = new File(dir, "files").getCanonicalPath + + Seq("parquet", "csv", "orc").foreach { format => + // write path + var msg = intercept[AnalysisException] { + sql("select null").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + + msg = intercept[AnalysisException] { + spark.udf.register("testType", () => new NullData()) + sql("select testType()").write.format(format).mode("overwrite").save(tempDir) + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + + // read path + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", NullType, true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + + msg = intercept[AnalysisException] { + val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) + spark.range(1).write.format(format).mode("overwrite").save(tempDir) + spark.read.schema(schema).format(format).load(tempDir).collect() + }.getMessage + assert(msg.toLowerCase(Locale.ROOT) + .contains(s"$format data source does not support null data type.")) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 4e641e34c18d..b4817d29b69f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -181,19 +181,17 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } - override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match { + override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) } + case st: StructType => st.forall { f => supportsDataType(f.dataType) } - case ArrayType(elementType, _) => supportDataType(elementType, isReadPath) + case ArrayType(elementType, _) => supportsDataType(elementType) case MapType(keyType, valueType, _) => - supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath) + supportsDataType(keyType) && supportsDataType(valueType) - case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath) - - case _: NullType => isReadPath + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) case _ => false } From 1cc2b3465b422074b55155d9796043d286fe79ec Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 26 Jan 2019 01:17:24 +0800 Subject: [PATCH 2/4] address comments --- .../spark/sql/catalyst/json/JacksonGenerator.scala | 2 +- .../sql/execution/datasources/DataSourceUtils.scala | 2 +- .../spark/sql/execution/datasources/FileFormat.scala | 2 +- .../sql/execution/datasources/csv/CSVFileFormat.scala | 4 ++-- .../execution/datasources/json/JsonFileFormat.scala | 10 +++++----- .../sql/execution/datasources/orc/OrcFileFormat.scala | 10 +++++----- .../datasources/parquet/ParquetFileFormat.scala | 10 +++++----- .../execution/datasources/text/TextFileFormat.scala | 2 +- .../apache/spark/sql/FileBasedDataSourceSuite.scala | 11 ----------- .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 10 +++++----- 10 files changed, 26 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9d82f94292cd..951f5190cd50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -178,7 +178,7 @@ private[sql] class JacksonGenerator( var i = 0 while (i < row.numFields) { val field = schema(i) - if (!row.isNullAt(i) || field.dataType == NullType) { + if (!row.isNullAt(i)) { gen.writeFieldName(field.name) fieldWriters(i).apply(row, i) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 3334d3fc3e77..a32a9405676f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -45,7 +45,7 @@ object DataSourceUtils { */ private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { schema.foreach { field => - if (!format.supportsDataType(field.dataType)) { + if (!format.supportDataType(field.dataType)) { throw new AnalysisException( s"$format data source does not support ${field.dataType.catalogString} data type.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 1615580ab154..f0b49715c8da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -156,7 +156,7 @@ trait FileFormat { * Returns whether this format supports the given [[DataType]] in read/write path. * By default all data types are supported. */ - def supportsDataType(dataType: DataType): Boolean = true + def supportDataType(dataType: DataType): Boolean = true } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index fa599a9185c6..d08a54cc9b1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -153,10 +153,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] - override def supportsDataType(dataType: DataType): Boolean = dataType match { + override def supportDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 8fa624634f26..d3f04145b83d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -140,17 +140,17 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] - override def supportsDataType(dataType: DataType): Boolean = dataType match { + override def supportDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportsDataType(f.dataType) } + case st: StructType => st.forall { f => supportDataType(f.dataType) } - case ArrayType(elementType, _) => supportsDataType(elementType) + case ArrayType(elementType, _) => supportDataType(elementType) case MapType(keyType, valueType, _) => - supportsDataType(keyType) && supportsDataType(valueType) + supportDataType(keyType) && supportDataType(valueType) - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) case _: NullType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index ee68e407e942..2a764957be11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -235,17 +235,17 @@ class OrcFileFormat } } - override def supportsDataType(dataType: DataType): Boolean = dataType match { + override def supportDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportsDataType(f.dataType) } + case st: StructType => st.forall { f => supportDataType(f.dataType) } - case ArrayType(elementType, _) => supportsDataType(elementType) + case ArrayType(elementType, _) => supportDataType(elementType) case MapType(keyType, valueType, _) => - supportsDataType(keyType) && supportsDataType(valueType) + supportDataType(keyType) && supportDataType(valueType) - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 6f824755de2e..efa4f3f166d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -453,17 +453,17 @@ class ParquetFileFormat } } - override def supportsDataType(dataType: DataType): Boolean = dataType match { + override def supportDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportsDataType(f.dataType) } + case st: StructType => st.forall { f => supportDataType(f.dataType) } - case ArrayType(elementType, _) => supportsDataType(elementType) + case ArrayType(elementType, _) => supportDataType(elementType) case MapType(keyType, valueType, _) => - supportsDataType(keyType) && supportsDataType(valueType) + supportDataType(keyType) && supportDataType(valueType) - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 83b238f878c4..f8a24eb08029 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -139,7 +139,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { } } - override def supportsDataType(dataType: DataType): Boolean = + override def supportDataType(dataType: DataType): Boolean = dataType == StringType } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 7003508aa5ca..f52e82bc7fbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -366,17 +366,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } - test("SPARK-26716 supports writing and reading Null data type - json") { - withTempPath { dir => - val df = spark.range(10).map(id => (id, null)).toDF("c1", "c2") - df.write.json(dir.getCanonicalPath) - checkAnswer(spark.read.json(dir.getCanonicalPath), df) - val schema = - StructType(Seq(StructField("c1", LongType, true), StructField("c2", NullType, true))) - checkAnswer(spark.read.schema(schema).json(dir.getCanonicalPath), df) - } - } - test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") { withTempDir { dir => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index b4817d29b69f..bfb0a95d4e70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -181,17 +181,17 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable } } - override def supportsDataType(dataType: DataType): Boolean = dataType match { + override def supportDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true - case st: StructType => st.forall { f => supportsDataType(f.dataType) } + case st: StructType => st.forall { f => supportDataType(f.dataType) } - case ArrayType(elementType, _) => supportsDataType(elementType) + case ArrayType(elementType, _) => supportDataType(elementType) case MapType(keyType, valueType, _) => - supportsDataType(keyType) && supportsDataType(valueType) + supportDataType(keyType) && supportDataType(valueType) - case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case udt: UserDefinedType[_] => supportDataType(udt.sqlType) case _ => false } From 2552ba6ac1850f17092f17fd6d28cba1101a8b3a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 26 Jan 2019 12:27:49 +0800 Subject: [PATCH 3/4] add TODO comment --- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index f52e82bc7fbf..fe9b39403e4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -367,6 +367,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { + // TODO(SPARK-26716): support data type validating in V2 data source, and test V2 as well. withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath From 442bb2b34a4ab21b6e0cfa22a21138f0851992b8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 27 Jan 2019 16:33:47 +0800 Subject: [PATCH 4/4] update jira id --- .../scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index fe9b39403e4a..5e6705094e60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -367,7 +367,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - // TODO(SPARK-26716): support data type validating in V2 data source, and test V2 as well. + // TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well. withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath