From 1d52cfb1363c2fb1d6675bb3673cc0f9073553c8 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 1 Jun 2016 12:59:16 -0700 Subject: [PATCH] Revert "[SPARK-9876][SQL] Update Parquet to 1.8.1." This reverts commit dc6e94157ce08df91aa1a31db8e5ec733a1ab0c5. --- dev/deps/spark-deps-hadoop-2.2 | 11 +-- dev/deps/spark-deps-hadoop-2.3 | 11 +-- dev/deps/spark-deps-hadoop-2.4 | 11 +-- dev/deps/spark-deps-hadoop-2.6 | 11 +-- dev/deps/spark-deps-hadoop-2.7 | 11 +-- pom.xml | 2 +- .../SpecificParquetRecordReaderBase.java | 20 ++--- .../parquet/CatalystReadSupport.scala | 12 +-- .../parquet/CatalystSchemaConverter.scala | 16 ---- .../datasources/parquet/ParquetFilters.scala | 83 +++++++++++++++---- .../parquet/ParquetSchemaSuite.scala | 20 ++--- 11 files changed, 117 insertions(+), 91 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index b5c38a6c056e..96001eade028 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -129,13 +129,14 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.3.jar -parquet-column-1.8.1.jar -parquet-common-1.8.1.jar -parquet-encoding-1.8.1.jar +parquet-column-1.7.0.jar +parquet-common-1.7.0.jar +parquet-encoding-1.7.0.jar parquet-format-2.3.0-incubating.jar -parquet-hadoop-1.8.1.jar +parquet-generator-1.7.0.jar +parquet-hadoop-1.7.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.1.jar +parquet-jackson-1.7.0.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index 969df0495d4c..9f3d9ad97a9f 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -136,13 +136,14 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.3.jar -parquet-column-1.8.1.jar -parquet-common-1.8.1.jar -parquet-encoding-1.8.1.jar +parquet-column-1.7.0.jar +parquet-common-1.7.0.jar +parquet-encoding-1.7.0.jar parquet-format-2.3.0-incubating.jar -parquet-hadoop-1.8.1.jar +parquet-generator-1.7.0.jar +parquet-hadoop-1.7.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.1.jar +parquet-jackson-1.7.0.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index f0491ece7c2b..77d52666ba69 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -136,13 +136,14 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.3.jar -parquet-column-1.8.1.jar -parquet-common-1.8.1.jar -parquet-encoding-1.8.1.jar +parquet-column-1.7.0.jar +parquet-common-1.7.0.jar +parquet-encoding-1.7.0.jar parquet-format-2.3.0-incubating.jar -parquet-hadoop-1.8.1.jar +parquet-generator-1.7.0.jar +parquet-hadoop-1.7.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.1.jar +parquet-jackson-1.7.0.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index b3dced63b9e7..9afe50f765d3 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -144,13 +144,14 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.3.jar -parquet-column-1.8.1.jar -parquet-common-1.8.1.jar -parquet-encoding-1.8.1.jar +parquet-column-1.7.0.jar +parquet-common-1.7.0.jar +parquet-encoding-1.7.0.jar parquet-format-2.3.0-incubating.jar -parquet-hadoop-1.8.1.jar +parquet-generator-1.7.0.jar +parquet-hadoop-1.7.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.1.jar +parquet-jackson-1.7.0.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 16f60f29ffbb..879157a6dce7 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -145,13 +145,14 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.3.jar -parquet-column-1.8.1.jar -parquet-common-1.8.1.jar -parquet-encoding-1.8.1.jar +parquet-column-1.7.0.jar +parquet-common-1.7.0.jar +parquet-encoding-1.7.0.jar parquet-format-2.3.0-incubating.jar -parquet-hadoop-1.8.1.jar +parquet-generator-1.7.0.jar +parquet-hadoop-1.7.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.1.jar +parquet-jackson-1.7.0.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/pom.xml b/pom.xml index 60c8c8dc7a72..79ee7876f466 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ 1.2.1 10.11.1.1 - 1.8.1 + 1.7.0 1.6.0 9.2.16.v20160414 3.1.0 diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 3f7a872ff635..cbe8f78164ae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -58,8 +58,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.apache.spark.sql.types.StructType; @@ -188,19 +186,15 @@ protected void initialize(String path, List columns) throws IOException if (columns == null) { this.requestedSchema = fileSchema; } else { - if (columns.size() > 0) { - Types.MessageTypeBuilder builder = Types.buildMessage(); - for (String s: columns) { - if (!fileSchema.containsField(s)) { - throw new IOException("Can only project existing columns. Unknown field: " + s + - " File schema:\n" + fileSchema); - } - builder.addFields(fileSchema.getType(s)); + Types.MessageTypeBuilder builder = Types.buildMessage(); + for (String s: columns) { + if (!fileSchema.containsField(s)) { + throw new IOException("Can only project existing columns. Unknown field: " + s + + " File schema:\n" + fileSchema); } - this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME()); - } else { - this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE(); + builder.addFields(fileSchema.getType(s)); } + this.requestedSchema = builder.named("spark_schema"); } this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns()); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 9c885b252f01..850e807b8677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -109,14 +109,10 @@ private[parquet] object CatalystReadSupport { */ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) - if (clippedParquetFields.isEmpty) { - CatalystSchemaConverter.EMPTY_MESSAGE - } else { - Types - .buildMessage() - .addFields(clippedParquetFields: _*) - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) - } + Types + .buildMessage() + .addFields(clippedParquetFields: _*) + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 3688c3e2b57e..6f6340f541ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -538,22 +538,6 @@ private[parquet] class CatalystSchemaConverter( private[parquet] object CatalystSchemaConverter { val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" - // !! HACK ALERT !! - // - // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, - // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. - // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. - // - // To workaround this problem, here we first construct a `MessageType` with a single dummy - // field, and then remove the field to obtain an empty `MessageType`. - // - // TODO Reverts this change after upgrading parquet-mr to 1.8.2+ - val EMPTY_MESSAGE = Types - .buildMessage() - .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy") - .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) - EMPTY_MESSAGE.getFields.clear() - def checkFieldName(name: String): Unit = { // ,;{}()\n\t= and space are special characters in Parquet schema checkConversionRequirement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 624081250113..95afdc789f32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -22,6 +22,8 @@ import java.io.Serializable import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.OriginalType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.sources import org.apache.spark.sql.types._ @@ -51,15 +53,18 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* // Binary.fromString and Binary.fromByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), - Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) + Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull) case BinaryType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), - Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -74,14 +79,17 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), - Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) + Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull) case BinaryType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), - Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -94,13 +102,16 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), - Binary.fromString(v.asInstanceOf[String])) + Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) case BinaryType => (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -113,13 +124,16 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), - Binary.fromString(v.asInstanceOf[String])) + Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) case BinaryType => (n: String, v: Any) => - FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -133,13 +147,15 @@ private[sql] object ParquetFilters { (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), - Binary.fromString(v.asInstanceOf[String])) + Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) case BinaryType => (n: String, v: Any) => - FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -152,13 +168,16 @@ private[sql] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), - Binary.fromString(v.asInstanceOf[String])) + Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8"))) case BinaryType => (n: String, v: Any) => - FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = { @@ -175,14 +194,17 @@ private[sql] object ParquetFilters { (n: String, v: Set[Any]) => FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]])) + // See https://issues.apache.org/jira/browse/SPARK-11153 + /* case StringType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String])))) + SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))))) case BinaryType => (n: String, v: Set[Any]) => FilterApi.userDefined(binaryColumn(n), - SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]])))) + SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]])))) + */ } /** @@ -206,6 +228,8 @@ private[sql] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema).toMap + relaxParquetValidTypeMap + // NOTE: // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, @@ -275,4 +299,35 @@ private[sql] object ParquetFilters { case _ => None } } + + // !! HACK ALERT !! + // + // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to + // parquet-mr 1.8.1 or higher versions. + // + // In Parquet, not all types of columns can be used for filter push-down optimization. The set + // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and + // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be + // pushed down. + // + // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps + // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus, + // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly + // legal except that it fails the `ValidTypeMap` check. + // + // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue. + private lazy val relaxParquetValidTypeMap: Unit = { + val constructor = Class + .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor") + .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType]) + + constructor.setAccessible(true) + val enumTypeDescriptor = constructor + .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM) + .asInstanceOf[AnyRef] + + val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get + addMethod.setAccessible(true) + addMethod.invoke(null, classOf[Binary], enumTypeDescriptor) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 0b5038cb8280..6db649228210 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import org.apache.parquet.schema.{MessageType, MessageTypeParser} +import org.apache.parquet.schema.MessageTypeParser import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection @@ -1065,26 +1065,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest { parquetSchema: String, catalystSchema: StructType, expectedSchema: String): Unit = { - testSchemaClipping(testName, parquetSchema, catalystSchema, - MessageTypeParser.parseMessageType(expectedSchema)) - } - - private def testSchemaClipping( - testName: String, - parquetSchema: String, - catalystSchema: StructType, - expectedSchema: MessageType): Unit = { test(s"Clipping - $testName") { + val expected = MessageTypeParser.parseMessageType(expectedSchema) val actual = CatalystReadSupport.clipParquetSchema( MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) try { - expectedSchema.checkContains(actual) - actual.checkContains(expectedSchema) + expected.checkContains(actual) + actual.checkContains(expected) } catch { case cause: Throwable => fail( s"""Expected clipped schema: - |$expectedSchema + |$expected |Actual clipped schema: |$actual """.stripMargin, @@ -1437,7 +1429,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), - expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE) + expectedSchema = "message root {}") testSchemaClipping( "disjoint field sets",