From fb4d67d80407c612998caf8df7ac47bdf2f08506 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 28 Aug 2015 02:21:20 +0800 Subject: [PATCH 1/5] Clips Parquet requested schema for better compatibility --- .../parquet/CatalystReadSupport.scala | 192 ++++++++----- .../parquet/CatalystSchemaConverter.scala | 14 +- .../parquet/ParquetQuerySuite.scala | 20 ++ .../parquet/ParquetSchemaSuite.scala | 253 ++++++++++++++++++ .../spark/sql/test/SharedSQLContext.scala | 2 +- 5 files changed, 406 insertions(+), 75 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 0a6bb44445f6..c166e679b78b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -25,11 +25,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema._ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{MapType, ArrayType, DataType, StructType} private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { // Called after `init()` when initializing Parquet record reader. @@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // `StructType` containing all requested columns. val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - // Below we construct a Parquet schema containing all requested columns. This schema tells - // Parquet which columns to read. - // - // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise, - // we have to fallback to the full file schema which contains all columns in the file. - // Obviously this may waste IO bandwidth since it may read more columns than requested. - // - // Two things to note: - // - // 1. It's possible that some requested columns don't exist in the target Parquet file. For - // example, in the case of schema merging, the globally merged schema may contain extra - // columns gathered from other Parquet files. These columns will be simply filled with nulls - // when actually reading the target Parquet file. - // - // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to - // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to - // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file - // containing a single integer array field `f1` may have the following legacy 2-level - // structure: - // - // message root { - // optional group f1 (LIST) { - // required INT32 element; - // } - // } - // - // while `CatalystSchemaConverter` may generate a standard 3-level structure: - // - // message root { - // optional group f1 (LIST) { - // repeated group list { - // required INT32 element; - // } - // } - // } - // - // Apparently, we can't use the 2nd schema to read the target Parquet file as they have - // different physical structures. val parquetRequestedSchema = maybeRequestedSchema.fold(context.getFileSchema) { schemaString => - val toParquet = new CatalystSchemaConverter(conf) - val fileSchema = context.getFileSchema.asGroupType() - val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet - - StructType - // Deserializes the Catalyst schema of requested columns - .fromString(schemaString) - .map { field => - if (fileFieldNames.contains(field.name)) { - // If the field exists in the target Parquet file, extracts the field type from the - // full file schema and makes a single-field Parquet schema - new MessageType("root", fileSchema.getType(field.name)) - } else { - // Otherwise, just resorts to `CatalystSchemaConverter` - toParquet.convert(StructType(Array(field))) - } - } - // Merges all single-field Parquet schemas to form a complete schema for all requested - // columns. Note that it's possible that no columns are requested at all (e.g., count - // some partition column of a partitioned Parquet table). That's why `fold` is used here - // and always fallback to an empty Parquet schema. - .fold(new MessageType("root")) { - _ union _ - } + val catalystRequestedSchema = StructType.fromString(schemaString) + CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) } val metadata = @@ -160,4 +101,127 @@ private[parquet] object CatalystReadSupport { val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" + + def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { + val clippedGroup = clipParquetType(parquetSchema.asGroupType(), catalystSchema).asGroupType() + Types.buildMessage().addFields(clippedGroup.getFields.asScala: _*).named("root") + } + + private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + catalystType match { + case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => + clipParquetListType(parquetType.asGroupType(), t.elementType) + + case t: MapType if !isPrimitiveCatalystType(t.valueType) => + clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) + + case t: StructType => + clipParquetRecord(parquetType.asGroupType(), t) + + case _ => + parquetType + } + } + + private def isPrimitiveCatalystType(dataType: DataType): Boolean = { + dataType match { + case _: ArrayType | _: MapType | _: StructType => false + case _ => true + } + } + + private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + // Unannotated repeated group, list element type is just the group itself. Clip it. + if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { + clipParquetType(parquetList, elementType) + } else { + assert(parquetList.getOriginalType == OriginalType.LIST) + assert(parquetList.getFieldCount == 1) + assert(parquetList.getType(0).isRepetition(Repetition.REPEATED)) + assert(!parquetList.getType(0).isPrimitive) + + val repeatedGroup = parquetList.getType(0).asGroupType() + + // If the repeated field is a group with multiple fields, or the repeated field is a group + // with one field and is named either "array" or uses the LIST-annotated group's name with + // "_tuple" appended then the repeated type is the element type and elements are required. + // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the + // only field. + if ( + repeatedGroup.getFieldCount > 1 || + repeatedGroup.getName == "array" || + repeatedGroup.getName == parquetList.getName + "_tuple" + ) { + Types + .buildGroup(parquetList.getRepetition) + .as(OriginalType.LIST) + .addField(clipParquetType(repeatedGroup, elementType)) + .named(parquetList.getName) + } else { + // Otherwise, the repeated field's type is the element type with the repeated field's + // repetition. + Types + .buildGroup(parquetList.getRepetition) + .as(OriginalType.LIST) + .addField( + Types + .repeatedGroup() + .addField(clipParquetType(repeatedGroup.getType(0), elementType)) + .named(repeatedGroup.getName)) + .named(parquetList.getName) + } + } + } + + private def clipParquetMapType( + parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + val repeatedGroup = parquetMap.getType(0).asGroupType() + val parquetKeyType = repeatedGroup.getType(0) + val parquetValueType = repeatedGroup.getType(1) + + val clippedRepeatedGrouop = + Types + .repeatedGroup() + .as(repeatedGroup.getOriginalType) + .addField(parquetKeyType) + .addField(clipParquetType(parquetValueType, valueType)) + .named(repeatedGroup.getName) + + Types + .buildGroup(parquetMap.getRepetition) + .as(parquetMap.getOriginalType) + .addField(clippedRepeatedGrouop) + .named(parquetMap.getName) + } + + private def clipParquetRecord(parquetRecord: GroupType, structType: StructType): GroupType = { + val resultFields = { + val parquetFields = parquetRecord.getFields.asScala + + val clippedFields = { + val catalystFieldNames = structType.fieldNames.toSet + parquetFields.collect { + case f if catalystFieldNames.contains(f.getName) => + clipParquetType(f, structType.apply(f.getName).dataType) + } + } + + val paddedFields = { + val parquetFieldNames = parquetFields.map(_.getName).toSet + val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) + structType.collect { + case f if !parquetFieldNames.contains(f.name) => + toParquet.convertField(f) + } + } + + (clippedFields ++ paddedFields).sortBy(_.getName) + } + + Types + .buildGroup(parquetRecord.getRepetition) + .as(parquetRecord.getOriginalType) + .addFields(resultFields: _*) + .named(parquetRecord.getName) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index be6c0545f5a0..a21ab1dbb25d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf} * to old style non-standard behaviors. */ private[parquet] class CatalystSchemaConverter( - private val assumeBinaryIsString: Boolean, - private val assumeInt96IsTimestamp: Boolean, - private val followParquetFormatSpec: Boolean) { - - // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in - // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant. - def this() = this( - assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, - assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get) + assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, + assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, + followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get +) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index b7b70c2bbbd5..0dde78db89ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -229,4 +229,24 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-10301") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s") + .coalesce(1) + + df.write.mode("append").parquet(path) + + val userDefinedSchema = new StructType() + .add("s", + new StructType() + .add("a", LongType, nullable = true), + nullable = true) + + checkAnswer(sqlContext.read.schema(userDefinedSchema).parquet(path), Row(Row(0))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 9dcbc1a047be..c195ab0e4af8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.parquet.schema.MessageTypeParser +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -941,4 +942,256 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); |} """.stripMargin) + + private def testSchemaClipping( + testName: String, + parquetSchema: String, + catalystSchema: StructType, + expectedSchema: String): Unit = { + test(s"Clipping - $testName") { + val expected = MessageTypeParser.parseMessageType(expectedSchema) + val actual = CatalystReadSupport.clipParquetSchema( + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) + + try { + expected.checkContains(actual) + actual.checkContains(expected) + } catch { case cause: Throwable => + fail( + s"""Expected clipped schema: + |$expected + |Actual clipped schema: + |$actual + """.stripMargin, + cause) + } + } + } + + testSchemaClipping( + "simple nested struct", + + parquetSchema = + s"""message root { + | required group f0 { + | optional int32 f00; + | optional int32 f01; + | } + |} + """.stripMargin, + + catalystSchema = { + val f0Type = new StructType().add("f00", IntegerType, nullable = true) + new StructType() + .add("f0", f0Type, nullable = false) + .add("f1", IntegerType, nullable = true) + }, + + expectedSchema = + s"""message root { + | required group f0 { + | optional int32 f00; + | } + | optional int32 f1; + |} + """.stripMargin) + + testSchemaClipping( + "parquet-protobuf style array", + + parquetSchema = + s"""message root { + | required group f0 { + | repeated binary f00 (UTF8); + | repeated group f01 { + | optional int32 f010; + | optional double f011; + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f11Type = new StructType().add("f011", DoubleType, nullable = true) + val f01Type = ArrayType(StringType, containsNull = false) + val f0Type = new StructType() + .add("f00", f01Type, nullable = false) + .add("f01", f11Type, nullable = false) + val f1Type = ArrayType(IntegerType, containsNull = true) + new StructType() + .add("f0", f0Type, nullable = false) + .add("f1", f1Type, nullable = true) + }, + + expectedSchema = + s"""message root { + | required group f0 { + | repeated binary f00 (UTF8); + | repeated group f01 { + | optional double f011; + | } + | } + | + | optional group f1 (LIST) { + | repeated group list { + | optional int32 element; + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-thrift style array", + + parquetSchema = + s"""message root { + | required group f0 { + | optional group f00 { + | repeated binary f00_tuple (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group f01_tuple { + | optional int32 f010; + | optional double f011; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f11ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = false), nullable = false) + .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + s"""message root { + | required group f0 { + | optional group f00 { + | repeated binary f00_tuple (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group f01_tuple { + | optional double f011; + | optional int64 f012; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-avro style array", + + parquetSchema = + s"""message root { + | required group f0 { + | optional group f00 { + | repeated binary array (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group array { + | optional int32 f010; + | optional double f011; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f11ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = false), nullable = false) + .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + s"""message root { + | required group f0 { + | optional group f00 { + | repeated binary array (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group array { + | optional double f011; + | optional int64 f012; + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "parquet-hive style array", + + parquetSchema = + s"""message root { + | optional group f0 { + | optional group f00 (LIST) { + | repeated group bag { + | optional binary array_element; + | } + | } + | + | optional group f01 (LIST) { + | repeated group bag { + | optional group array_element { + | optional int32 f010; + | optional double f011; + | } + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f01ElementType = new StructType() + .add("f011", DoubleType, nullable = true) + .add("f012", LongType, nullable = true) + + val f0Type = new StructType() + .add("f00", ArrayType(StringType, containsNull = true), nullable = true) + .add("f01", ArrayType(f01ElementType, containsNull = true), nullable = true) + + new StructType().add("f0", f0Type, nullable = true) + }, + + expectedSchema = + s"""message root { + | optional group f0 { + | optional group f00 (LIST) { + | repeated group bag { + | optional binary array_element; + | } + | } + | + | optional group f01 (LIST) { + | repeated group bag { + | optional group array_element { + | optional double f011; + | optional int64 f012; + | } + | } + | } + | } + |} + """.stripMargin) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index d23c6a073266..bccaa66a15a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.test -import org.apache.spark.sql.{ColumnName, SQLContext} +import org.apache.spark.sql.{Column, ColumnName, SQLContext} /** From b6f4526ceddbfaecf79813b6a48464184e51aaa2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 28 Aug 2015 21:53:20 +0800 Subject: [PATCH 2/5] Fixes test failures --- .../parquet/CatalystReadSupport.scala | 40 +-- .../parquet/CatalystRowConverter.scala | 51 +-- .../parquet/ParquetSchemaSuite.scala | 290 +++++++++--------- 3 files changed, 168 insertions(+), 213 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index c166e679b78b..f69ae3ec267e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -30,7 +30,7 @@ import org.apache.parquet.schema._ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{MapType, ArrayType, DataType, StructType} +import org.apache.spark.sql.types._ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { // Called after `init()` when initializing Parquet record reader. @@ -195,33 +195,21 @@ private[parquet] object CatalystReadSupport { } private def clipParquetRecord(parquetRecord: GroupType, structType: StructType): GroupType = { - val resultFields = { - val parquetFields = parquetRecord.getFields.asScala - - val clippedFields = { - val catalystFieldNames = structType.fieldNames.toSet - parquetFields.collect { - case f if catalystFieldNames.contains(f.getName) => - clipParquetType(f, structType.apply(f.getName).dataType) - } + val tailoredFields = { + val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) + structType.map { f => + parquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType)) + .getOrElse(toParquet.convertField(f)) } - - val paddedFields = { - val parquetFieldNames = parquetFields.map(_.getName).toSet - val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) - structType.collect { - case f if !parquetFieldNames.contains(f.name) => - toParquet.convertField(f) - } - } - - (clippedFields ++ paddedFields).sortBy(_.getName) } - Types - .buildGroup(parquetRecord.getRepetition) - .as(parquetRecord.getOriginalType) - .addFields(resultFields: _*) - .named(parquetRecord.getName) + new GroupType( + parquetRecord.getRepetition, + parquetRecord.getName, + parquetRecord.getOriginalType, + tailoredFields.asJava) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index fe13dfbbed38..f17e794b7665 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. * - * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the - * Parquet file being read, while constructor argument [[catalystType]] refers to requested - * fields of the global schema. The key difference is that, in case of schema merging, - * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have - * the following [[catalystType]]: - * {{{ - * new StructType() - * .add("f1", IntegerType, nullable = false) - * .add("f2", StringType, nullable = true) - * .add("f3", new StructType() - * .add("f31", DoubleType, nullable = false) - * .add("f32", IntegerType, nullable = true) - * .add("f33", StringType, nullable = true), nullable = false) - * }}} - * and the following [[parquetType]] (`f2` and `f32` are missing): - * {{{ - * message root { - * required int32 f1; - * required group f3 { - * required double f31; - * optional binary f33 (utf8); - * } - * } - * }}} - * * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type * @param updater An updater which propagates converted field values to the parent container @@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad - // those missing fields and create converters for them, although values of these fields are - // always null. - val paddedParquetFields = { - val parquetFields = parquetType.getFields.asScala - val parquetFieldNames = parquetFields.map(_.getName).toSet - val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) - - // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when - // creating the schema converter here, since values of missing fields are always null. - val toParquet = new CatalystSchemaConverter() - - (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f => - catalystType.indexWhere(_.name == f.getName) - } - } - - if (paddedParquetFields.length != catalystType.length) { - throw new UnsupportedOperationException( - "A Parquet file's schema has different number of fields with the table schema. " + - "Please enable schema merging by setting \"mergeSchema\" to true when load " + - "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.") - } - - paddedParquetFields.zip(catalystType).zipWithIndex.map { + parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index c195ab0e4af8..d2e72a1b2495 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -972,13 +972,13 @@ class ParquetSchemaSuite extends ParquetSchemaTest { "simple nested struct", parquetSchema = - s"""message root { - | required group f0 { - | optional int32 f00; - | optional int32 f01; - | } - |} - """.stripMargin, + """message root { + | required group f0 { + | optional int32 f00; + | optional int32 f01; + | } + |} + """.stripMargin, catalystSchema = { val f0Type = new StructType().add("f00", IntegerType, nullable = true) @@ -988,28 +988,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { }, expectedSchema = - s"""message root { - | required group f0 { - | optional int32 f00; - | } - | optional int32 f1; - |} - """.stripMargin) + """message root { + | required group f0 { + | optional int32 f00; + | } + | optional int32 f1; + |} + """.stripMargin) testSchemaClipping( "parquet-protobuf style array", parquetSchema = - s"""message root { - | required group f0 { - | repeated binary f00 (UTF8); - | repeated group f01 { - | optional int32 f010; - | optional double f011; - | } - | } - |} - """.stripMargin, + """message root { + | required group f0 { + | repeated binary f00 (UTF8); + | repeated group f01 { + | optional int32 f010; + | optional double f011; + | } + | } + |} + """.stripMargin, catalystSchema = { val f11Type = new StructType().add("f011", DoubleType, nullable = true) @@ -1024,41 +1024,41 @@ class ParquetSchemaSuite extends ParquetSchemaTest { }, expectedSchema = - s"""message root { - | required group f0 { - | repeated binary f00 (UTF8); - | repeated group f01 { - | optional double f011; - | } - | } - | - | optional group f1 (LIST) { - | repeated group list { - | optional int32 element; - | } - | } - |} - """.stripMargin) + """message root { + | required group f0 { + | repeated binary f00 (UTF8); + | repeated group f01 { + | optional double f011; + | } + | } + | + | optional group f1 (LIST) { + | repeated group list { + | optional int32 element; + | } + | } + |} + """.stripMargin) testSchemaClipping( "parquet-thrift style array", parquetSchema = - s"""message root { - | required group f0 { - | optional group f00 { - | repeated binary f00_tuple (UTF8); - | } - | - | optional group f01 (LIST) { - | repeated group f01_tuple { - | optional int32 f010; - | optional double f011; - | } - | } - | } - |} - """.stripMargin, + """message root { + | required group f0 { + | optional group f00 { + | repeated binary f00_tuple (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group f01_tuple { + | optional int32 f010; + | optional double f011; + | } + | } + | } + |} + """.stripMargin, catalystSchema = { val f11ElementType = new StructType() @@ -1073,41 +1073,41 @@ class ParquetSchemaSuite extends ParquetSchemaTest { }, expectedSchema = - s"""message root { - | required group f0 { - | optional group f00 { - | repeated binary f00_tuple (UTF8); - | } - | - | optional group f01 (LIST) { - | repeated group f01_tuple { - | optional double f011; - | optional int64 f012; - | } - | } - | } - |} - """.stripMargin) + """message root { + | required group f0 { + | optional group f00 { + | repeated binary f00_tuple (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group f01_tuple { + | optional double f011; + | optional int64 f012; + | } + | } + | } + |} + """.stripMargin) testSchemaClipping( "parquet-avro style array", parquetSchema = - s"""message root { - | required group f0 { - | optional group f00 { - | repeated binary array (UTF8); - | } - | - | optional group f01 (LIST) { - | repeated group array { - | optional int32 f010; - | optional double f011; - | } - | } - | } - |} - """.stripMargin, + """message root { + | required group f0 { + | optional group f00 { + | repeated binary array (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group array { + | optional int32 f010; + | optional double f011; + | } + | } + | } + |} + """.stripMargin, catalystSchema = { val f11ElementType = new StructType() @@ -1122,45 +1122,45 @@ class ParquetSchemaSuite extends ParquetSchemaTest { }, expectedSchema = - s"""message root { - | required group f0 { - | optional group f00 { - | repeated binary array (UTF8); - | } - | - | optional group f01 (LIST) { - | repeated group array { - | optional double f011; - | optional int64 f012; - | } - | } - | } - |} - """.stripMargin) + """message root { + | required group f0 { + | optional group f00 { + | repeated binary array (UTF8); + | } + | + | optional group f01 (LIST) { + | repeated group array { + | optional double f011; + | optional int64 f012; + | } + | } + | } + |} + """.stripMargin) testSchemaClipping( "parquet-hive style array", parquetSchema = - s"""message root { - | optional group f0 { - | optional group f00 (LIST) { - | repeated group bag { - | optional binary array_element; - | } - | } - | - | optional group f01 (LIST) { - | repeated group bag { - | optional group array_element { - | optional int32 f010; - | optional double f011; - | } - | } - | } - | } - |} - """.stripMargin, + """message root { + | optional group f0 { + | optional group f00 (LIST) { + | repeated group bag { + | optional binary array_element; + | } + | } + | + | optional group f01 (LIST) { + | repeated group bag { + | optional group array_element { + | optional int32 f010; + | optional double f011; + | } + | } + | } + | } + |} + """.stripMargin, catalystSchema = { val f01ElementType = new StructType() @@ -1175,23 +1175,39 @@ class ParquetSchemaSuite extends ParquetSchemaTest { }, expectedSchema = - s"""message root { - | optional group f0 { - | optional group f00 (LIST) { - | repeated group bag { - | optional binary array_element; - | } - | } - | - | optional group f01 (LIST) { - | repeated group bag { - | optional group array_element { - | optional double f011; - | optional int64 f012; - | } - | } - | } - | } - |} - """.stripMargin) + """message root { + | optional group f0 { + | optional group f00 (LIST) { + | repeated group bag { + | optional binary array_element; + | } + | } + | + | optional group f01 (LIST) { + | repeated group bag { + | optional group array_element { + | optional double f011; + | optional int64 f012; + | } + | } + | } + | } + |} + """.stripMargin) + + testSchemaClipping( + "empty requested schema", + + parquetSchema = + """message root { + | required group f0 { + | required int32 f00; + | required int64 f01; + | } + |} + """.stripMargin, + + catalystSchema = new StructType(), + + expectedSchema = "message root {}") } From 6f009a24edb341bb149b60facac55e459af3ed7e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 29 Aug 2015 00:48:08 +0800 Subject: [PATCH 3/5] More tests and comments --- .../parquet/CatalystReadSupport.scala | 17 ++++- .../parquet/ParquetQuerySuite.scala | 63 ++++++++++++++++++- .../spark/sql/test/SharedSQLContext.scala | 2 +- 3 files changed, 76 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index f69ae3ec267e..cbdd3d11daf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -102,6 +102,10 @@ private[parquet] object CatalystReadSupport { val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" + /** + * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist + * in `catalystSchema`, and adding those only exist in `catalystSchema`. + */ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { val clippedGroup = clipParquetType(parquetSchema.asGroupType(), catalystSchema).asGroupType() Types.buildMessage().addFields(clippedGroup.getFields.asScala: _*).named("root") @@ -131,6 +135,8 @@ private[parquet] object CatalystReadSupport { } private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + assert(!isPrimitiveCatalystType(elementType)) + // Unannotated repeated group, list element type is just the group itself. Clip it. if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { clipParquetType(parquetList, elementType) @@ -175,11 +181,13 @@ private[parquet] object CatalystReadSupport { private def clipParquetMapType( parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + assert(!isPrimitiveCatalystType(valueType)) + val repeatedGroup = parquetMap.getType(0).asGroupType() val parquetKeyType = repeatedGroup.getType(0) val parquetValueType = repeatedGroup.getType(1) - val clippedRepeatedGrouop = + val clippedRepeatedGroup = Types .repeatedGroup() .as(repeatedGroup.getOriginalType) @@ -190,7 +198,7 @@ private[parquet] object CatalystReadSupport { Types .buildGroup(parquetMap.getRepetition) .as(parquetMap.getOriginalType) - .addField(clippedRepeatedGrouop) + .addField(clippedRepeatedGroup) .named(parquetMap.getName) } @@ -206,6 +214,11 @@ private[parquet] object CatalystReadSupport { } } + // Here we can't use builder methods defined in `Types` to construct the `GroupType` and have to + // resort to this deprecated constructor. The reason is that, `tailoredFields` can be empty, + // and `Types` builder methods don't allow constructing empty group types. For example, query + // `SELECT COUNT(1) FROM t` requests for zero columns. + // TODO Refactor method signature to return a list of fields instead of a `GroupType` new GroupType( parquetRecord.getRepetition, parquetRecord.getName, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 0dde78db89ce..a379523d67f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -230,7 +230,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } - test("SPARK-10301") { + test("SPARK-10301 Clipping nested structs in requested schema") { withTempPath { dir => val path = dir.getCanonicalPath val df = sqlContext @@ -240,13 +240,70 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext df.write.mode("append").parquet(path) + val userDefinedSchema = new StructType() + .add("s", new StructType().add("a", LongType, nullable = true), nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(0))) + } + + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s") + .coalesce(1) + + val df2 = sqlContext + .range(1, 2) + .selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s") + .coalesce(1) + + df1.write.parquet(path) + df2.write.mode(SaveMode.Append).parquet(path) + + val userDefinedSchema = new StructType() + .add("s", + new StructType() + .add("a", LongType, nullable = true) + .add("c", LongType, nullable = true), + nullable = true) + + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Seq( + Row(Row(0, null)), + Row(Row(null, 1)))) + } + + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = sqlContext + .range(1) + .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) AS s") + .coalesce(1) + + df.write.parquet(path) + val userDefinedSchema = new StructType() .add("s", new StructType() - .add("a", LongType, nullable = true), + .add( + "a", + ArrayType( + new StructType() + .add("b", LongType, nullable = true) + .add("d", StringType, nullable = true), + containsNull = true), + nullable = true), nullable = true) - checkAnswer(sqlContext.read.schema(userDefinedSchema).parquet(path), Row(Row(0))) + checkAnswer( + sqlContext.read.schema(userDefinedSchema).parquet(path), + Row(Row(Seq(Row(0, null))))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index bccaa66a15a3..d23c6a073266 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.test -import org.apache.spark.sql.{Column, ColumnName, SQLContext} +import org.apache.spark.sql.{ColumnName, SQLContext} /** From f21d88e436806a27e2895625fd61a4213e386507 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 30 Aug 2015 16:19:46 +0800 Subject: [PATCH 4/5] More comments and test cases --- .../parquet/CatalystReadSupport.scala | 98 +++++++++++++------ .../parquet/ParquetSchemaSuite.scala | 41 ++++++++ 2 files changed, 110 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index cbdd3d11daf2..dc4ff06df6f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.{Map => JMap} -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter} import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext @@ -107,26 +107,33 @@ private[parquet] object CatalystReadSupport { * in `catalystSchema`, and adding those only exist in `catalystSchema`. */ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { - val clippedGroup = clipParquetType(parquetSchema.asGroupType(), catalystSchema).asGroupType() - Types.buildMessage().addFields(clippedGroup.getFields.asScala: _*).named("root") + val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) + Types.buildMessage().addFields(clippedParquetFields: _*).named("root") } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { catalystType match { case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => + // Only clips array types with nested type as element type. clipParquetListType(parquetType.asGroupType(), t.elementType) case t: MapType if !isPrimitiveCatalystType(t.valueType) => + // Only clips map types with nested type as value type. clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) case t: StructType => - clipParquetRecord(parquetType.asGroupType(), t) + clipParquetGroup(parquetType.asGroupType(), t) case _ => parquetType } } + /** + * Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to + * [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an + * [[AtomicType]]. + */ private def isPrimitiveCatalystType(dataType: DataType): Boolean = { dataType match { case _: ArrayType | _: MapType | _: StructType => false @@ -134,16 +141,33 @@ private[parquet] object CatalystReadSupport { } } + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type + * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a + * [[StructType]]. + */ private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + // Precondition of this method, should only be called for lists with nested element types. assert(!isPrimitiveCatalystType(elementType)) - // Unannotated repeated group, list element type is just the group itself. Clip it. + // Unannotated repeated group should be interpreted as required list of required element, so + // list element type is just the group itself. Clip it. if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { clipParquetType(parquetList, elementType) } else { - assert(parquetList.getOriginalType == OriginalType.LIST) - assert(parquetList.getFieldCount == 1) - assert(parquetList.getType(0).isRepetition(Repetition.REPEATED)) + assert( + parquetList.getOriginalType == OriginalType.LIST, + "Invalid Parquet schema. " + + "Original type of annotated Parquet lists must be LIST: " + + parquetList.toString) + + assert( + parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED), + "Invalid Parquet schema. " + + "LIST-annotated group should only have exactly one repeated field: " + + parquetList) + + // Precondition of this method, should only be called for lists with nested element types. assert(!parquetList.getType(0).isPrimitive) val repeatedGroup = parquetList.getType(0).asGroupType() @@ -179,8 +203,14 @@ private[parquet] object CatalystReadSupport { } } + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type + * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a + * [[StructType]]. Note that key type of any [[MapType]] is always a primitive type. + */ private def clipParquetMapType( parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + // Precondition of this method, should only be called for maps with nested value types. assert(!isPrimitiveCatalystType(valueType)) val repeatedGroup = parquetMap.getType(0).asGroupType() @@ -202,27 +232,37 @@ private[parquet] object CatalystReadSupport { .named(parquetMap.getName) } - private def clipParquetRecord(parquetRecord: GroupType, structType: StructType): GroupType = { - val tailoredFields = { - val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) - structType.map { f => - parquetFieldMap - .get(f.name) - .map(clipParquetType(_, f.dataType)) - .getOrElse(toParquet.convertField(f)) - } - } + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. + * + * @return A clipped [[GroupType]], which has at least one field. + * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty + * [[MessageType]]. Because it's legal to construct an empty requested schema for column + * pruning. + */ + private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { + val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) + Types + .buildGroup(parquetRecord.getRepetition) + .as(parquetRecord.getOriginalType) + .addFields(clippedParquetFields: _*) + .named(parquetRecord.getName) + } - // Here we can't use builder methods defined in `Types` to construct the `GroupType` and have to - // resort to this deprecated constructor. The reason is that, `tailoredFields` can be empty, - // and `Types` builder methods don't allow constructing empty group types. For example, query - // `SELECT COUNT(1) FROM t` requests for zero columns. - // TODO Refactor method signature to return a list of fields instead of a `GroupType` - new GroupType( - parquetRecord.getRepetition, - parquetRecord.getName, - parquetRecord.getOriginalType, - tailoredFields.asJava) + /** + * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. + * + * @return A list of clipped [[GroupType]] fields, which can be empty. + */ + private def clipParquetGroupFields( + parquetRecord: GroupType, structType: StructType): Seq[Type] = { + val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) + structType.map { f => + parquetFieldMap + .get(f.name) + .map(clipParquetType(_, f.dataType)) + .getOrElse(toParquet.convertField(f)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index d2e72a1b2495..28c59a4abdd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1195,6 +1195,47 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin) + testSchemaClipping( + "2-level list of required struct", + + parquetSchema = + s"""message root { + | required group f0 { + | required group f00 (LIST) { + | repeated group element { + | required int32 f000; + | optional int64 f001; + | } + | } + | } + |} + """.stripMargin, + + catalystSchema = { + val f00ElementType = + new StructType() + .add("f001", LongType, nullable = true) + .add("f002", DoubleType, nullable = false) + + val f00Type = ArrayType(f00ElementType, containsNull = false) + val f0Type = new StructType().add("f00", f00Type, nullable = false) + + new StructType().add("f0", f0Type, nullable = false) + }, + + expectedSchema = + s"""message root { + | required group f0 { + | required group f00 (LIST) { + | repeated group element { + | optional int64 f001; + | required double f002; + | } + | } + | } + |} + """.stripMargin) + testSchemaClipping( "empty requested schema", From 38644d8a45175cbdf20d2ace021c2c2544a50ab3 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 30 Aug 2015 18:48:10 +0800 Subject: [PATCH 5/5] Adds test case for parquet files with different physical schemas but share the same logical schema --- .../ParquetAvroCompatibilitySuite.scala | 1 + .../ParquetInteroperabilitySuite.scala | 90 +++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index bd7cf8c10abe..36b929ee1f40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.File import java.nio.ByteBuffer import java.util.{List => JList, Map => JMap} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala new file mode 100644 index 000000000000..83b65fb419ed --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSQLContext { + test("parquet files with different physical schemas but share the same logical schema") { + import ParquetCompatibilityTest._ + + // This test case writes two Parquet files, both representing the following Catalyst schema + // + // StructType( + // StructField( + // "f", + // ArrayType(IntegerType, containsNull = false), + // nullable = false)) + // + // The first Parquet file comes with parquet-avro style 2-level LIST-annotated group, while the + // other one comes with parquet-protobuf style 1-level unannotated primitive field. + withTempDir { dir => + val avroStylePath = new File(dir, "avro-style").getCanonicalPath + val protobufStylePath = new File(dir, "protobuf-style").getCanonicalPath + + val avroStyleSchema = + """message avro_style { + | required group f (LIST) { + | repeated int32 array; + | } + |} + """.stripMargin + + writeDirect(avroStylePath, avroStyleSchema, { rc => + rc.message { + rc.field("f", 0) { + rc.group { + rc.field("array", 0) { + rc.addInteger(0) + rc.addInteger(1) + } + } + } + } + }) + + logParquetSchema(avroStylePath) + + val protobufStyleSchema = + """message protobuf_style { + | repeated int32 f; + |} + """.stripMargin + + writeDirect(protobufStylePath, protobufStyleSchema, { rc => + rc.message { + rc.field("f", 0) { + rc.addInteger(2) + rc.addInteger(3) + } + } + }) + + logParquetSchema(protobufStylePath) + + checkAnswer( + sqlContext.read.parquet(dir.getCanonicalPath), + Seq( + Row(Seq(0, 1)), + Row(Seq(2, 3)))) + } + } +}