Skip to content

Commit add75d4

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-2927][SQL] Add a conf to configure if we always read Binary columns stored in Parquet as String columns
This PR adds a new conf flag `spark.sql.parquet.binaryAsString`. When it is `true`, if there is no parquet metadata file available to provide the schema of the data, we will always treat binary fields stored in parquet as string fields. This conf is used to provide a way to read string fields generated without UTF8 decoration. JIRA: https://issues.apache.org/jira/browse/SPARK-2927 Author: Yin Huai <[email protected]> Closes #1855 from yhuai/parquetBinaryAsString and squashes the following commits: 689ffa9 [Yin Huai] Add missing "=". 80827de [Yin Huai] Unit test. 1765ca4 [Yin Huai] Use .toBoolean. 9d3f199 [Yin Huai] Merge remote-tracking branch 'upstream/master' into parquetBinaryAsString 5d436a1 [Yin Huai] The initial support of adding a conf to treat binary columns stored in Parquet as string columns.
1 parent 078f3fb commit add75d4

File tree

5 files changed

+87
-22
lines changed

5 files changed

+87
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ private[spark] object SQLConf {
3131
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
3232
val CODEGEN_ENABLED = "spark.sql.codegen"
3333
val DIALECT = "spark.sql.dialect"
34+
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
3435

3536
object Deprecated {
3637
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -87,8 +88,7 @@ trait SQLConf {
8788
*
8889
* Defaults to false as this feature is currently experimental.
8990
*/
90-
private[spark] def codegenEnabled: Boolean =
91-
if (getConf(CODEGEN_ENABLED, "false") == "true") true else false
91+
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
9292

9393
/**
9494
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -108,6 +108,12 @@ trait SQLConf {
108108
private[spark] def defaultSizeInBytes: Long =
109109
getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong
110110

111+
/**
112+
* When set to true, we always treat byte arrays in Parquet files as strings.
113+
*/
114+
private[spark] def isParquetBinaryAsString: Boolean =
115+
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
116+
111117
/** ********************** SQLConf functionality methods ************ */
112118

113119
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ private[sql] case class ParquetRelation(
6060
.getSchema
6161

6262
/** Attributes */
63-
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
63+
override val output =
64+
ParquetTypesConverter.readSchemaFromFile(
65+
new Path(path),
66+
conf,
67+
sqlContext.isParquetBinaryAsString)
6468

6569
override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
6670

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
8080
}
8181
}
8282
// if both unavailable, fall back to deducing the schema from the given Parquet schema
83+
// TODO: Why it can be null?
8384
if (schema == null) {
8485
log.debug("falling back to Parquet read schema")
85-
schema = ParquetTypesConverter.convertToAttributes(parquetSchema)
86+
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
8687
}
8788
log.debug(s"list of attributes that will be read: $schema")
8889
new RowRecordMaterializer(parquetSchema, schema)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
4343
def isPrimitiveType(ctype: DataType): Boolean =
4444
classOf[PrimitiveType] isAssignableFrom ctype.getClass
4545

46-
def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
46+
def toPrimitiveDataType(
47+
parquetType: ParquetPrimitiveType,
48+
binayAsString: Boolean): DataType =
4749
parquetType.getPrimitiveTypeName match {
4850
case ParquetPrimitiveTypeName.BINARY
49-
if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
51+
if (parquetType.getOriginalType == ParquetOriginalType.UTF8 ||
52+
binayAsString) => StringType
5053
case ParquetPrimitiveTypeName.BINARY => BinaryType
5154
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
5255
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
@@ -85,7 +88,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
8588
* @param parquetType The type to convert.
8689
* @return The corresponding Catalyst type.
8790
*/
88-
def toDataType(parquetType: ParquetType): DataType = {
91+
def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
8992
def correspondsToMap(groupType: ParquetGroupType): Boolean = {
9093
if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
9194
false
@@ -107,7 +110,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
107110
}
108111

109112
if (parquetType.isPrimitive) {
110-
toPrimitiveDataType(parquetType.asPrimitiveType)
113+
toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
111114
} else {
112115
val groupType = parquetType.asGroupType()
113116
parquetType.getOriginalType match {
@@ -116,7 +119,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
116119
case ParquetOriginalType.LIST => { // TODO: check enums!
117120
assert(groupType.getFieldCount == 1)
118121
val field = groupType.getFields.apply(0)
119-
ArrayType(toDataType(field), containsNull = false)
122+
ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
120123
}
121124
case ParquetOriginalType.MAP => {
122125
assert(
@@ -126,9 +129,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
126129
assert(
127130
keyValueGroup.getFieldCount == 2,
128131
"Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
129-
val keyType = toDataType(keyValueGroup.getFields.apply(0))
132+
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
130133
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
131-
val valueType = toDataType(keyValueGroup.getFields.apply(1))
134+
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
132135
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
133136
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
134137
// at here.
@@ -138,22 +141,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
138141
// Note: the order of these checks is important!
139142
if (correspondsToMap(groupType)) { // MapType
140143
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
141-
val keyType = toDataType(keyValueGroup.getFields.apply(0))
144+
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
142145
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
143-
val valueType = toDataType(keyValueGroup.getFields.apply(1))
146+
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
144147
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
145148
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
146149
// at here.
147150
MapType(keyType, valueType)
148151
} else if (correspondsToArray(groupType)) { // ArrayType
149-
val elementType = toDataType(groupType.getFields.apply(0))
152+
val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
150153
ArrayType(elementType, containsNull = false)
151154
} else { // everything else: StructType
152155
val fields = groupType
153156
.getFields
154157
.map(ptype => new StructField(
155158
ptype.getName,
156-
toDataType(ptype),
159+
toDataType(ptype, isBinaryAsString),
157160
ptype.getRepetition != Repetition.REQUIRED))
158161
StructType(fields)
159162
}
@@ -276,15 +279,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
276279
}
277280
}
278281

279-
def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
282+
def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = {
280283
parquetSchema
281284
.asGroupType()
282285
.getFields
283286
.map(
284287
field =>
285288
new AttributeReference(
286289
field.getName,
287-
toDataType(field),
290+
toDataType(field, isBinaryAsString),
288291
field.getRepetition != Repetition.REQUIRED)())
289292
}
290293

@@ -404,7 +407,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
404407
* @param conf The Hadoop configuration to use.
405408
* @return A list of attributes that make up the schema.
406409
*/
407-
def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
410+
def readSchemaFromFile(
411+
origPath: Path,
412+
conf: Option[Configuration],
413+
isBinaryAsString: Boolean): Seq[Attribute] = {
408414
val keyValueMetadata: java.util.Map[String, String] =
409415
readMetaData(origPath, conf)
410416
.getFileMetaData
@@ -413,7 +419,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
413419
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
414420
} else {
415421
val attributes = convertToAttributes(
416-
readMetaData(origPath, conf).getFileMetaData.getSchema)
422+
readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
417423
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
418424
attributes
419425
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
2121

2222
import parquet.hadoop.ParquetFileWriter
2323
import parquet.hadoop.util.ContextUtil
24-
import parquet.schema.MessageTypeParser
25-
2624
import org.apache.hadoop.fs.{FileSystem, Path}
2725
import org.apache.hadoop.mapreduce.Job
2826

@@ -33,7 +31,6 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
3331
import org.apache.spark.sql.catalyst.expressions._
3432
import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
3533
import org.apache.spark.sql.catalyst.util.getTempFilePath
36-
import org.apache.spark.sql.execution.SparkPlan
3734
import org.apache.spark.sql.test.TestSQLContext
3835
import org.apache.spark.sql.test.TestSQLContext._
3936
import org.apache.spark.util.Utils
@@ -138,6 +135,57 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
138135
}
139136
}
140137

138+
test("Treat binary as string") {
139+
val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
140+
141+
// Create the test file.
142+
val file = getTempFilePath("parquet")
143+
val path = file.toString
144+
val range = (0 to 255)
145+
val rowRDD = TestSQLContext.sparkContext.parallelize(range)
146+
.map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes))
147+
// We need to ask Parquet to store the String column as a Binary column.
148+
val schema = StructType(
149+
StructField("c1", IntegerType, false) ::
150+
StructField("c2", BinaryType, false) :: Nil)
151+
val schemaRDD1 = applySchema(rowRDD, schema)
152+
schemaRDD1.saveAsParquetFile(path)
153+
val resultWithBinary = parquetFile(path).collect
154+
range.foreach {
155+
i =>
156+
assert(resultWithBinary(i).getInt(0) === i)
157+
assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
158+
}
159+
160+
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
161+
// This ParquetRelation always use Parquet types to derive output.
162+
val parquetRelation = new ParquetRelation(
163+
path.toString,
164+
Some(TestSQLContext.sparkContext.hadoopConfiguration),
165+
TestSQLContext) {
166+
override val output =
167+
ParquetTypesConverter.convertToAttributes(
168+
ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
169+
TestSQLContext.isParquetBinaryAsString)
170+
}
171+
val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
172+
val resultWithString = schemaRDD.collect
173+
range.foreach {
174+
i =>
175+
assert(resultWithString(i).getInt(0) === i)
176+
assert(resultWithString(i)(1) === s"val_$i")
177+
}
178+
179+
schemaRDD.registerTempTable("tmp")
180+
checkAnswer(
181+
sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
182+
(5, "val_5") ::
183+
(7, "val_7") :: Nil)
184+
185+
// Set it back.
186+
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
187+
}
188+
141189
test("Read/Write All Types with non-primitive type") {
142190
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
143191
val range = (0 to 255)

0 commit comments

Comments
 (0)