Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ private[spark] object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down Expand Up @@ -87,8 +88,7 @@ trait SQLConf {
*
* Defaults to false as this feature is currently experimental.
*/
private[spark] def codegenEnabled: Boolean =
if (getConf(CODEGEN_ENABLED, "false") == "true") true else false
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
Expand All @@ -108,6 +108,12 @@ trait SQLConf {
private[spark] def defaultSizeInBytes: Long =
getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong

/**
* When set to true, we always treat byte arrays in Parquet files as strings.
*/
private[spark] def isParquetBinaryAsString: Boolean =
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ private[sql] case class ParquetRelation(
.getSchema

/** Attributes */
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
override val output =
ParquetTypesConverter.readSchemaFromFile(
new Path(path),
conf,
sqlContext.isParquetBinaryAsString)

override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
}
}
// if both unavailable, fall back to deducing the schema from the given Parquet schema
// TODO: Why it can be null?
if (schema == null) {
log.debug("falling back to Parquet read schema")
schema = ParquetTypesConverter.convertToAttributes(parquetSchema)
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass

def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
def toPrimitiveDataType(
parquetType: ParquetPrimitiveType,
binayAsString: Boolean): DataType =
parquetType.getPrimitiveTypeName match {
case ParquetPrimitiveTypeName.BINARY
if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
if (parquetType.getOriginalType == ParquetOriginalType.UTF8 ||
binayAsString) => StringType
case ParquetPrimitiveTypeName.BINARY => BinaryType
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
Expand Down Expand Up @@ -85,7 +88,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @param parquetType The type to convert.
* @return The corresponding Catalyst type.
*/
def toDataType(parquetType: ParquetType): DataType = {
def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
def correspondsToMap(groupType: ParquetGroupType): Boolean = {
if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
false
Expand All @@ -107,7 +110,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}

if (parquetType.isPrimitive) {
toPrimitiveDataType(parquetType.asPrimitiveType)
toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
} else {
val groupType = parquetType.asGroupType()
parquetType.getOriginalType match {
Expand All @@ -116,7 +119,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetOriginalType.LIST => { // TODO: check enums!
assert(groupType.getFieldCount == 1)
val field = groupType.getFields.apply(0)
ArrayType(toDataType(field), containsNull = false)
ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
}
case ParquetOriginalType.MAP => {
assert(
Expand All @@ -126,9 +129,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
assert(
keyValueGroup.getFieldCount == 2,
"Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
val keyType = toDataType(keyValueGroup.getFields.apply(0))
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
val valueType = toDataType(keyValueGroup.getFields.apply(1))
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
// at here.
Expand All @@ -138,22 +141,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
// Note: the order of these checks is important!
if (correspondsToMap(groupType)) { // MapType
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
val keyType = toDataType(keyValueGroup.getFields.apply(0))
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
val valueType = toDataType(keyValueGroup.getFields.apply(1))
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
// at here.
MapType(keyType, valueType)
} else if (correspondsToArray(groupType)) { // ArrayType
val elementType = toDataType(groupType.getFields.apply(0))
val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
ArrayType(elementType, containsNull = false)
} else { // everything else: StructType
val fields = groupType
.getFields
.map(ptype => new StructField(
ptype.getName,
toDataType(ptype),
toDataType(ptype, isBinaryAsString),
ptype.getRepetition != Repetition.REQUIRED))
StructType(fields)
}
Expand Down Expand Up @@ -276,15 +279,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}

def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = {
parquetSchema
.asGroupType()
.getFields
.map(
field =>
new AttributeReference(
field.getName,
toDataType(field),
toDataType(field, isBinaryAsString),
field.getRepetition != Repetition.REQUIRED)())
}

Expand Down Expand Up @@ -403,7 +406,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @param conf The Hadoop configuration to use.
* @return A list of attributes that make up the schema.
*/
def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
def readSchemaFromFile(
origPath: Path,
conf: Option[Configuration],
isBinaryAsString: Boolean): Seq[Attribute] = {
val keyValueMetadata: java.util.Map[String, String] =
readMetaData(origPath, conf)
.getFileMetaData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this patch will be great for impala users like us :) thanks, moreover, there is a getCreatedBy method in readMetaData(origPath, conf).getFileMetaData, and impala creates parquet files always with its own CreatedBy information (always contains string "impala"), so, maybe we can do some auto-detection like (https://github.com/apache/spark/pull/1599/files)

if (fileMetaData.getCreatedBy.contains("impala")) {
  isBinaryAsString = true
  log.info(s"Impala parquet file found, blabla...")
}

does this auto-detection make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with auto detection like this is, what happens when impala starts adding the correct annotation and supporting byte arrays?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, such a auto detection brings confusion, this is a problem of impala, not spark sql, we are not going to make a impala file format corrector :)

Expand All @@ -412,7 +418,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
} else {
val attributes = convertToAttributes(
readMetaData(origPath, conf).getFileMetaData.getSchema)
readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}

import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil
import parquet.schema.MessageTypeParser

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job

Expand All @@ -33,7 +31,6 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -138,6 +135,57 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
}

test("Treat binary as string") {
val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString

// Create the test file.
val file = getTempFilePath("parquet")
val path = file.toString
val range = (0 to 255)
val rowRDD = TestSQLContext.sparkContext.parallelize(range)
.map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes))
// We need to ask Parquet to store the String column as a Binary column.
val schema = StructType(
StructField("c1", IntegerType, false) ::
StructField("c2", BinaryType, false) :: Nil)
val schemaRDD1 = applySchema(rowRDD, schema)
schemaRDD1.saveAsParquetFile(path)
val resultWithBinary = parquetFile(path).collect
range.foreach {
i =>
assert(resultWithBinary(i).getInt(0) === i)
assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
}

TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
// This ParquetRelation always use Parquet types to derive output.
val parquetRelation = new ParquetRelation(
path.toString,
Some(TestSQLContext.sparkContext.hadoopConfiguration),
TestSQLContext) {
override val output =
ParquetTypesConverter.convertToAttributes(
ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
TestSQLContext.isParquetBinaryAsString)
}
val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
val resultWithString = schemaRDD.collect
range.foreach {
i =>
assert(resultWithString(i).getInt(0) === i)
assert(resultWithString(i)(1) === s"val_$i")
}

schemaRDD.registerTempTable("tmp")
checkAnswer(
sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)

// Set it back.
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
}

test("Read/Write All Types with non-primitive type") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
Expand Down