Skip to content

Commit 1de001d

Browse files
committed
Replace special characters '(' and ')' of Parquet schema.
1 parent 4bdfb7b commit 1de001d

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,10 +390,11 @@ private[parquet] object ParquetTypesConverter extends Logging {
390390

391391
def convertFromAttributes(attributes: Seq[Attribute],
392392
toThriftSchemaNames: Boolean = false): MessageType = {
393-
val fields = attributes.map(
394-
attribute =>
393+
val fields = attributes.map { old_attribute =>
394+
val attribute = old_attribute.withName(old_attribute.name.replaceAll("\\((.*)\\)", "[$1]"))
395395
fromDataType(attribute.dataType, attribute.name, attribute.nullable,
396-
toThriftSchemaNames = toThriftSchemaNames))
396+
toThriftSchemaNames = toThriftSchemaNames)
397+
}
397398
new MessageType("root", fields)
398399
}
399400

@@ -405,7 +406,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
405406
}
406407

407408
def convertToString(schema: Seq[Attribute]): String = {
408-
StructType.fromAttributes(schema).json
409+
val replaced_schema = schema.map { attribute =>
410+
attribute.withName(attribute.name.replaceAll("\\((.*)\\)", "[$1]"))
411+
}
412+
StructType.fromAttributes(replaced_schema).json
409413
}
410414

411415
def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = {

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,18 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
578578

579579
sql("DROP TABLE alwaysNullable")
580580
}
581+
582+
test("Aggregation attribute names including special chars '(' and ')' should be replaced") {
583+
val tempDir = Utils.createTempDir()
584+
val filePath = new File(tempDir, "testParquet").getCanonicalPath
585+
586+
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
587+
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
588+
df2.saveAsParquetFile(filePath)
589+
val df3 = parquetFile(filePath)
590+
checkAnswer(df3, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
591+
assert(df3.columns === Array("str", "MAX[int]"))
592+
}
581593
}
582594

583595
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {

0 commit comments

Comments
 (0)