From d3378a2b48dc86f9d0e5950b15237a12d15ed6cc Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Sat, 11 May 2024 15:47:06 +0800 Subject: [PATCH 1/2] SPARK-48241: CSV parsing failure with char/varchar type columns --- .../catalyst/plans/logical/LogicalPlan.scala | 4 +++- .../src/test/resources/test-data/char.csv | 4 ++++ .../execution/datasources/csv/CSVSuite.scala | 24 +++++++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/test-data/char.csv diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index b989233da674..7c373738d8ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -118,7 +118,9 @@ abstract class LogicalPlan def resolve(schema: StructType, resolver: Resolver): Seq[Attribute] = { schema.map { field => resolve(field.name :: Nil, resolver).map { - case a: AttributeReference => a + case a: AttributeReference => + // Keep the metadata in given schema. + a.copy(metadata = field.metadata)(exprId = a.exprId, qualifier = a.qualifier) case _ => throw QueryExecutionErrors.resolveCannotHandleNestedSchema(this) }.getOrElse { throw QueryCompilationErrors.cannotResolveAttributeError( diff --git a/sql/core/src/test/resources/test-data/char.csv b/sql/core/src/test/resources/test-data/char.csv new file mode 100644 index 000000000000..d2be68a15fc1 --- /dev/null +++ b/sql/core/src/test/resources/test-data/char.csv @@ -0,0 +1,4 @@ +color,name +pink,Bob +blue,Mike +grey,Tom diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 22ea133ee19a..0e58b96531da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -80,6 +80,7 @@ abstract class CSVSuite private val valueMalformedFile = "test-data/value-malformed.csv" private val badAfterGoodFile = "test-data/bad_after_good.csv" private val malformedRowFile = "test-data/malformedRow.csv" + private val charFile = "test-data/char.csv" /** Verifies data and schema. */ private def verifyCars( @@ -3342,6 +3343,29 @@ abstract class CSVSuite expected) } } + + test("SPARK-48241: CSV parsing failure with char/varchar type columns") { + withTable("charVarcharTable") { + spark.sql( + s""" + |CREATE TABLE charVarcharTable( + | color char(4), + | name varchar(10)) + |USING csv + |OPTIONS ( + | header "true", + | path "${testFile(charFile)}" + |) + """.stripMargin) + val expected = Seq( + Row("pink", "Bob"), + Row("blue", "Mike"), + Row("grey", "Tom")) + checkAnswer( + sql("SELECT * FROM charVarcharTable"), + expected) + } + } } class CSVv1Suite extends CSVSuite { From 204a4ab9374e192baa80c90d704bd6aaba202891 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Sat, 11 May 2024 18:47:29 +0800 Subject: [PATCH 2/2] Address comments --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7c373738d8ab..98e91585c2a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -120,7 +120,7 @@ abstract class LogicalPlan resolve(field.name :: Nil, resolver).map { case a: AttributeReference => // Keep the metadata in given schema. - a.copy(metadata = field.metadata)(exprId = a.exprId, qualifier = a.qualifier) + a.withMetadata(field.metadata) case _ => throw QueryExecutionErrors.resolveCannotHandleNestedSchema(this) }.getOrElse { throw QueryCompilationErrors.cannotResolveAttributeError(