Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,6 @@ class ParquetFileFormat

case _ => false
}

override def supportFieldName(name: String): Boolean = {
!name.matches(".*[ ,;{}()\n\t=].*")
}
}

object ParquetFileFormat extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ class SparkToParquetSchemaConverter(
}

private def convertField(field: StructField, repetition: Type.Repetition): Type = {
ParquetSchemaConverter.checkFieldName(field.name)

field.dataType match {
// ===================
Expand Down Expand Up @@ -698,23 +697,6 @@ private[sql] object ParquetSchemaConverter {
val EMPTY_MESSAGE: MessageType =
Types.buildMessage().named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)

def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
if (name.matches(".*[ ,;{}()\n\t=].*")) {
throw QueryCompilationErrors.columnNameContainsInvalidCharactersError(name)
}
}

def checkFieldNames(schema: StructType): Unit = {
schema.foreach { field =>
checkFieldName(field.name)
field.dataType match {
case s: StructType => checkFieldNames(s)
case _ =>
}
}
}

def checkConversionRequirement(f: => Boolean, message: String): Unit = {
if (!f) {
throw new AnalysisException(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ case class ParquetWrite(

ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

ParquetSchemaConverter.checkFieldNames(dataSchema)
// This metadata is useful for keeping UDTs like Vector/Matrix.
ParquetWriteSupport.setSchema(dataSchema, conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,28 +991,6 @@ class FileBasedDataSourceSuite extends QueryTest
checkAnswer(df, Row("v1", "v2"))
}
}

test("SPARK-36271: V1 insert should check schema field name too") {
withView("v") {
spark.range(1).createTempView("v")
withTempDir { dir =>
val e = intercept[AnalysisException] {
sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite)
.format("parquet").save(dir.getCanonicalPath)
}.getMessage
assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s)."))
}

withTempDir { dir =>
val e = intercept[AnalysisException] {
sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS col1 FROM v")
.write.mode(SaveMode.Overwrite)
.format("parquet").save(dir.getCanonicalPath)
}.getMessage
assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s)."))
}
}
}
}

object TestingUDT {
Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4243,6 +4243,25 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(df3, df4)
}
}

test("SPARK-27442: Spark support read/write parquet file with invalid char in field name") {
withTempDir { dir =>
Seq((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), (2, 4, 6, 8, 10, 12, 14, 16, 18, 20))
.toDF("max(t)", "max(t", "=", "\n", ";", "a b", "{", ".", "a.b", "a")
.repartition(1)
.write.mode(SaveMode.Overwrite).parquet(dir.getAbsolutePath)
val df = spark.read.parquet(dir.getAbsolutePath)
checkAnswer(df,
Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) ::
Row(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) :: Nil)
assert(df.schema.names.sameElements(
Array("max(t)", "max(t", "=", "\n", ";", "a b", "{", ".", "a.b", "a")))
checkAnswer(df.select("`max(t)`", "`a b`", "`{`", "`.`", "`a.b`"),
Row(1, 6, 7, 8, 9) :: Row(2, 12, 14, 16, 18) :: Nil)
checkAnswer(df.where("`a.b` > 10"),
Row(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) :: Nil)
}
}
}

case class Foo(bar: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,6 @@ class HiveParquetSourceSuite extends ParquetPartitioningTest with ParquetTest {
}
}

test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
withTempDir { tempDir =>
val filePath = new File(tempDir, "testParquet").getCanonicalPath
val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath

val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as("x").join(df.as("y"), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
intercept[Throwable](df2.write.parquet(filePath))

val df3 = df2.toDF("str", "max_int")
df3.write.parquet(filePath2)
val df4 = read.parquet(filePath2)
checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
assert(df4.columns === Array("str", "max_int"))
}
}

test("SPARK-25993 CREATE EXTERNAL TABLE with subdirectories") {
Seq("true", "false").foreach { parquetConversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2930,12 +2930,12 @@ class HiveDDLSuite
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
val e = intercept[AnalysisException] {
val e = intercept[SparkException] {
spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
s"STORED AS PARQUET SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM v")
}.getMessage
assert(e.contains("Column name \"(IF((1 = 1), 1, 0))\" contains invalid character(s). " +
"Please use alias to rename it."))
}.getCause.getCause.getMessage
assert(e.contains(
"field ended by ';': expected ';' but got 'IF' at line 2: optional int32 (IF"))
}
}
}
Expand All @@ -2944,7 +2944,7 @@ class HiveDDLSuite
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
val e = intercept[AnalysisException] {
val e = intercept[SparkException] {
spark.sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}'
Expand All @@ -2953,27 +2953,9 @@ class HiveDDLSuite
|NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Hive Serde fail for this case as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Hive Serde fail for this case as well?

Yea, updated

|FROM v
""".stripMargin)
}.getMessage
assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains" +
" invalid character(s). Please use alias to rename it."))
}
}
}

test("SPARK-36312: ParquetWriteSupport should check inner field") {
Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception thrown by hive serde, seems hive serder won't check inner fields. So just remove this . cc @cloud-fan

withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
val e = intercept[AnalysisException] {
spark.sql(
"""
|SELECT
|NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1
|FROM v
|""".stripMargin).write.mode(SaveMode.Overwrite).parquet(path.toString)
}.getMessage
assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains" +
" invalid character(s). Please use alias to rename it."))
}.getCause.getCause.getMessage
assert(e.contains("expected at the position 19 of " +
"'struct<ID:bigint,IF(ID=1,ID,0):bigint,B:bigint>' but '(' is found."))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2212,41 +2212,6 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}
}

test("SPARK-21912 Parquet table should not create invalid column names") {
Seq(" ", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name =>
val source = "PARQUET"
withTable("t21912") {
val m = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912(`col$name` INT) USING $source")
}.getMessage
assert(m.contains(s"contains invalid character(s)"))

val m1 = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912 STORED AS $source AS SELECT 1 `col$name`")
}.getMessage
assert(m1.contains(s"contains invalid character(s)"))

val m2 = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912 USING $source AS SELECT 1 `col$name`")
}.getMessage
assert(m2.contains(s"contains invalid character(s)"))

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
val m3 = intercept[AnalysisException] {
sql(s"CREATE TABLE t21912(`col$name` INT) USING hive OPTIONS (fileFormat '$source')")
}.getMessage
assert(m3.contains(s"contains invalid character(s)"))
}

sql(s"CREATE TABLE t21912(`col` INT) USING $source")
val m4 = intercept[AnalysisException] {
sql(s"ALTER TABLE t21912 ADD COLUMNS(`col$name` INT)")
}.getMessage
assert(m4.contains(s"contains invalid character(s)"))
}
}
}

test("SPARK-32889: ORC table column name supports special characters") {
// " " "," is not allowed.
Seq("$", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}

test("SPARK-8079: Avoid NPE thrown from BaseWriterContainer.abortJob") {
withTempPath { dir =>
intercept[AnalysisException] {
// Parquet doesn't allow field names with spaces. Here we are intentionally making an
// exception thrown from the `ParquetRelation2.prepareForWriteJob()` method to trigger
// the bug. Please refer to spark-8079 for more details.
spark.range(1, 10)
.withColumnRenamed("id", "a b")
.write
.format("parquet")
.save(dir.getCanonicalPath)
}
}
}

test("SPARK-8604: Parquet data source should write summary file while doing appending") {
withSQLConf(
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
Expand Down