From 020daf1c97a21bd9e70e0d264b4c5b46cb07cb1d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 22 Oct 2015 21:58:27 +0800 Subject: [PATCH 1/2] Bumps parquet-mr to 1.8.1 --- pom.xml | 2 +- .../parquet/CatalystReadSupport.scala | 1 - .../datasources/parquet/ParquetFilters.scala | 96 ++++--------------- .../datasources/parquet/ParquetRelation.scala | 15 ++- .../ParquetAvroCompatibilitySuite.scala | 58 ++++++++++- .../parquet/ParquetSchemaSuite.scala | 34 ++++--- 6 files changed, 103 insertions(+), 103 deletions(-) diff --git a/pom.xml b/pom.xml index 445e65c0459b..f41109753f78 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 1.2.1 10.10.1.1 - 1.7.0 + 1.8.1 1.6.0 1.2.4 8.1.14.v20131031 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index a958373eb769..adc671222bcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -29,7 +29,6 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema._ import org.apache.spark.Logging -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 07714329370a..72c786d669d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -22,8 +22,6 @@ import java.io.Serializable import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.OriginalType -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.sources import org.apache.spark.sql.types._ @@ -53,18 +51,15 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* - // Binary.fromString and Binary.fromByteArray don't accept null values + // Binary.fromString and Binary.fromConstantByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), - Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull) + Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) case BinaryType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), - Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) - */ + Option(v).map(b => Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])).orNull) } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -79,17 +74,15 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* + // Binary.fromString and Binary.fromConstantByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), - Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull) + Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) case BinaryType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), - Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) - */ + Option(v).map(b => Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])).orNull) } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -101,17 +94,12 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.lt(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])) } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -123,17 +111,12 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => - FilterApi.ltEq(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.ltEq(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])) } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -145,17 +128,12 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => - FilterApi.gt(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.gt(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])) } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -167,17 +145,12 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Any) => - FilterApi.gtEq(binaryColumn(n), - Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) + FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String])) case BinaryType => (n: String, v: Any) => - FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) - */ + FilterApi.gtEq(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])) } private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { @@ -193,18 +166,14 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Set[Any]) => FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) - - // See https://issues.apache.org/jira/browse/SPARK-11153 - /* case StringType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))))) + SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String])))) case BinaryType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) - */ + SetInFilter(v.map(e => Binary.fromConstantByteArray(e.asInstanceOf[Array[Byte]])))) } /** @@ -213,8 +182,6 @@ private[sql] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap - relaxParquetValidTypeMap - // NOTE: // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, @@ -271,35 +238,4 @@ private[sql] object ParquetFilters { case _ => None } } - - // !! HACK ALERT !! - // - // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to - // parquet-mr 1.8.1 or higher versions. - // - // In Parquet, not all types of columns can be used for filter push-down optimization. The set - // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and - // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be - // pushed down. - // - // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps - // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus, - // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly - // legal except that it fails the `ValidTypeMap` check. - // - // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue. - private lazy val relaxParquetValidTypeMap: Unit = { - val constructor = Class - .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor") - .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType]) - - constructor.setAccessible(true) - val enumTypeDescriptor = constructor - .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM) - .asInstanceOf[AnyRef] - - val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get - addMethod.setAccessible(true) - addMethod.invoke(null, classOf[Binary], enumTypeDescriptor) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 77d851ca486b..3840665bb49d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -548,7 +548,20 @@ private[sql] object ParquetRelation extends Logging { } conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) + // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, + // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. + // Here we select the "narrowest" column if `requiredColumns` is empty. + // + // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. We should revert this change after + // upgrading to 1.8.2 or some later version. + val requestedSchema = { + val requiredDataColumns = requiredColumns.map(dataSchema(_)) + if (requiredDataColumns.isEmpty) { + StructType(dataSchema.minBy(_.dataType.defaultSize) :: Nil) + } else { + StructType(requiredDataColumns) + } + } CatalystSchemaConverter.checkFieldNames(requestedSchema).json }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index 36b929ee1f40..e90a56aa316e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.File import java.nio.ByteBuffer import java.util.{List => JList, Map => JMap} @@ -27,6 +26,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.parquet.test.avro._ @@ -35,14 +35,14 @@ import org.apache.spark.sql.test.SharedSQLContext class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with SharedSQLContext { private def withWriter[T <: IndexedRecord] (path: String, schema: Schema) - (f: AvroParquetWriter[T] => Unit): Unit = { + (f: ParquetWriter[T] => Unit): Unit = { logInfo( s"""Writing Avro records with the following Avro schema into Parquet file: | |${schema.toString(true)} """.stripMargin) - val writer = new AvroParquetWriter[T](new Path(path), schema) + val writer = AvroParquetWriter.builder[T](new Path(path)).withSchema(schema).build() try f(writer) finally writer.close() } @@ -163,8 +163,56 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared } } - ignore("nullable arrays (parquet-avro 1.7.0 does not properly support this)") { - // TODO Complete this test case after upgrading to parquet-mr 1.8+ + test("nullable arrays") { + withTempPath { dir => + import ParquetCompatibilityTest._ + + // This Parquet schema is translated from the following Avro schema, with Hadoop configuration + // `parquet.avro.write-old-list-structure` set to `false`: + // + // record AvroArrayOfOptionalInts { + // array f; + // } + val schema = + """message AvroArrayOfOptionalInts { + | required group f (LIST) { + | repeated group list { + | optional int32 element; + | } + | } + |} + """.stripMargin + + writeDirect(dir.getCanonicalPath, schema, { rc => + rc.message { + rc.field("f", 0) { + rc.group { + rc.field("list", 0) { + rc.group { + rc.field("element", 0) { + rc.addInteger(0) + } + } + + rc.group { /* null */ } + + rc.group { + rc.field("element", 0) { + rc.addInteger(1) + } + } + + rc.group { /* null */ } + } + } + } + } + }) + + checkAnswer( + sqlContext.read.parquet(dir.getCanonicalPath), + Row(Array(0: Integer, null, 1: Integer, null))) + } } test("SPARK-10136 array of primitive array") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 60fa81b1ab81..056b9e27ee1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1385,21 +1385,25 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin) - testSchemaClipping( - "empty requested schema", - - parquetSchema = - """message root { - | required group f0 { - | required int32 f00; - | required int64 f01; - | } - |} - """.stripMargin, - - catalystSchema = new StructType(), - - expectedSchema = "message root {}") + // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType. Should + // re-enable this test case after upgrading to parquet-mr 1.8.2 or some later version. + ignore("empty requested schema") { + testSchemaClipping( + "empty requested schema", + + parquetSchema = + """message root { + | required group f0 { + | required int32 f00; + | required int64 f01; + | } + |} + """.stripMargin, + + catalystSchema = new StructType(), + + expectedSchema = "message root {}") + } testSchemaClipping( "disjoint field sets", From e48c83a9802acea9853ed27a22e5e6ed7a32f427 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 27 Oct 2015 19:34:37 +0800 Subject: [PATCH 2/2] Workaround PARQUET-363 --- .../parquet/CatalystReadSupport.scala | 29 ++++++++++++++++--- .../datasources/parquet/ParquetRelation.scala | 15 +--------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index adc671222bcb..430964c2f5b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema._ @@ -111,10 +112,30 @@ private[parquet] object CatalystReadSupport { */ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) - Types - .buildMessage() - .addFields(clippedParquetFields: _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + + if (clippedParquetFields.isEmpty) { + // !! HACK ALERT !! + // + // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, + // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. + // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. + // + // To workaround this problem, here we first construct a `MessageType` with a single dummy + // field, and then remove the field to obtain an empty `MessageType`. + // + // TODO Reverts this change after upgrading parquet-mr to 1.8.2+ + val messageType = Types + .buildMessage() + .addField(Types.required(INT32).named("dummy")) + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + messageType.getFields.clear() + messageType + } else { + Types + .buildMessage() + .addFields(clippedParquetFields: _*) + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + } } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 3840665bb49d..77d851ca486b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -548,20 +548,7 @@ private[sql] object ParquetRelation extends Logging { } conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { - // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, - // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. - // Here we select the "narrowest" column if `requiredColumns` is empty. - // - // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. We should revert this change after - // upgrading to 1.8.2 or some later version. - val requestedSchema = { - val requiredDataColumns = requiredColumns.map(dataSchema(_)) - if (requiredDataColumns.isEmpty) { - StructType(dataSchema.minBy(_.dataType.defaultSize) :: Nil) - } else { - StructType(requiredDataColumns) - } - } + val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) CatalystSchemaConverter.checkFieldNames(requestedSchema).json })