diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 9287bd47cf113..889e0b89432b8 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -146,13 +146,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.6.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar +parquet-format-2.3.0-incubating.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index ab1de3d3dd8ad..47c422c63a59f 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -147,13 +147,13 @@ opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.6.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar +parquet-column-1.8.1.jar +parquet-common-1.8.1.jar +parquet-encoding-1.8.1.jar +parquet-format-2.3.0-incubating.jar +parquet-hadoop-1.8.1.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar +parquet-jackson-1.8.1.jar pmml-model-1.2.15.jar pmml-schema-1.2.15.jar protobuf-java-2.5.0.jar diff --git a/pom.xml b/pom.xml index ccd8546a269c1..7c58026fd480d 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,7 @@ 1.2.1 10.12.1.1 - 1.8.2 + 1.8.1 1.6.0 9.3.11.v20160721 3.1.0 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 997964238ca98..d6c0bb98d8cd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -582,9 +582,9 @@ class Analyzer( // |- view2 (defaultDatabase = db2) // |- view3 (defaultDatabase = db3) // |- view4 (defaultDatabase = db4) - // In this case, the view `view1` is a nested view, it directly references `table2`, `view2` + // In this case, the view `view1` is a nested view, it directly references `table2`、`view2` // and `view4`, the view `view2` references `view3`. On resolving the table, we look up the - // relations `table2`, `view2`, `view4` using the default database `db1`, and look up the + // relations `table2`、`view2`、`view4` using the default database `db1`, and look up the // relation `view3` using the default database `db2`. // // Note this is compatible with the views defined by older versions of Spark(before 2.2), which diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 54006e20a3eb6..616d342ecc165 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -402,6 +402,13 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru @InterfaceStability.Stable object StructType extends AbstractDataType { + /** + * A key used in field metadata to indicate that the field comes from the result of merging + * two different StructTypes that do not always contain the field. That is to say, the field + * might be missing (optional) from one of the StructTypes. + */ + private[sql] val metadataKeyForOptionalField = "_OPTIONAL_" + override private[sql] def defaultConcreteType: DataType = new StructType override private[sql] def acceptsType(other: DataType): Boolean = { @@ -462,6 +469,8 @@ object StructType extends AbstractDataType { case (StructType(leftFields), StructType(rightFields)) => val newFields = ArrayBuffer.empty[StructField] + // This metadata will record the fields that only exist in one of two StructTypes + val optionalMeta = new MetadataBuilder() val rightMapped = fieldsMap(rightFields) leftFields.foreach { @@ -473,7 +482,8 @@ object StructType extends AbstractDataType { nullable = leftNullable || rightNullable) } .orElse { - Some(leftField) + optionalMeta.putBoolean(metadataKeyForOptionalField, value = true) + Some(leftField.copy(metadata = optionalMeta.build())) } .foreach(newFields += _) } @@ -482,7 +492,8 @@ object StructType extends AbstractDataType { rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach { f => - newFields += f + optionalMeta.putBoolean(metadataKeyForOptionalField, value = true) + newFields += f.copy(metadata = optionalMeta.build()) } StructType(newFields) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index c4635c8f126af..8038e413f7838 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -134,6 +134,55 @@ class DataTypeSuite extends SparkFunSuite { assert(mapped === expected) } + test("merge where right is empty") { + val left = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val right = StructType(List()) + val merged = left.merge(right) + + assert(DataType.equalsIgnoreCompatibleNullability(merged, left)) + assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + } + + test("merge where left is empty") { + + val left = StructType(List()) + + val right = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val merged = left.merge(right) + + assert(DataType.equalsIgnoreCompatibleNullability(merged, right)) + assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + } + + test("merge where both are non-empty") { + val left = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val right = StructType( + StructField("c", LongType) :: Nil) + + val expected = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: + StructField("c", LongType) :: Nil) + + val merged = left.merge(right) + + assert(DataType.equalsIgnoreCompatibleNullability(merged, expected)) + assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + } + test("merge where right contains type conflict") { val left = StructType( StructField("a", LongType) :: diff --git a/sql/core/pom.xml b/sql/core/pom.xml index c9ac366ed6e62..9b67c0318faba 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -133,19 +133,6 @@ parquet-avro test - - - org.apache.avro - avro - 1.8.1 - test - org.mockito mockito-core diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 87fbf8b1bc9c4..03dc43704bf75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -109,7 +109,9 @@ class ParquetFileFormat // We want to clear this temporary metadata from saving into Parquet file. // This metadata is only useful for detecting optional columns when pushdowning filters. - ParquetWriteSupport.setSchema(dataSchema, conf) + val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, + dataSchema).asInstanceOf[StructType] + ParquetWriteSupport.setSchema(dataSchemaToWrite, conf) // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) // and `CatalystWriteSupport` (writing actual rows to Parquet files). @@ -295,7 +297,11 @@ class ParquetFileFormat ParquetWriteSupport.SPARK_ROW_SCHEMA, ParquetSchemaConverter.checkFieldNames(requiredSchema).json) - ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) + // We want to clear this temporary metadata from saving into Parquet file. + // This metadata is only useful for detecting optional columns when pushdowning filters. + val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, + requiredSchema).asInstanceOf[StructType] + ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf) // Sets flags for `CatalystSchemaConverter` hadoopConf.setBoolean( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 763841efbd9f3..de28026758ef8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -41,6 +41,8 @@ private[parquet] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* // Binary.fromString and Binary.fromByteArray don't accept null values case StringType => (n: String, v: Any) => FilterApi.eq( @@ -50,6 +52,7 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -64,6 +67,8 @@ private[parquet] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), @@ -72,6 +77,7 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + */ } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -84,6 +90,8 @@ private[parquet] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), @@ -91,6 +99,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -103,6 +112,8 @@ private[parquet] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), @@ -110,6 +121,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -122,6 +134,8 @@ private[parquet] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), @@ -129,6 +143,7 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -141,6 +156,8 @@ private[parquet] object ParquetFilters { case DoubleType => (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + /* case StringType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), @@ -148,17 +165,27 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + */ } /** - * Returns a map from name of the column to the data type, if predicate push down applies. + * Returns a map from name of the column to the data type, if predicate push down applies + * (i.e. not an optional field). + * + * SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField. + * These fields only exist in one side of merged schemas. Due to that, we can't push down filters + * using such fields, otherwise Parquet library will throw exception (PARQUET-389). + * Here we filter out such fields. */ private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match { case StructType(fields) => // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters // and it does not support to create filters for them. - fields.map(f => f.name -> f.dataType).toMap + fields.filter { f => + !f.metadata.contains(StructType.metadataKeyForOptionalField) || + !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) + }.map(f => f.name -> f.dataType).toMap case _ => Map.empty[String, DataType] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 0b805e4362883..a670de9d05877 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -559,8 +559,21 @@ private[parquet] class ParquetSchemaConverter( private[parquet] object ParquetSchemaConverter { val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" - val EMPTY_MESSAGE: MessageType = - Types.buildMessage().named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + // !! HACK ALERT !! + // + // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType, + // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`. + // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT. + // + // To workaround this problem, here we first construct a `MessageType` with a single dummy + // field, and then remove the field to obtain an empty `MessageType`. + // + // TODO Reverts this change after upgrading parquet-mr to 1.8.2+ + val EMPTY_MESSAGE = Types + .buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy") + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + EMPTY_MESSAGE.getFields.clear() def checkFieldName(name: String): Unit = { // ,;{}()\n\t= and space are special characters in Parquet schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 98427cfe3031c..7683a098fb2bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -229,7 +229,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("filter pushdown - string") { + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + ignore("filter pushdown - string") { withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate( @@ -257,7 +258,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("filter pushdown - binary") { + // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213 + ignore("filter pushdown - binary") { implicit class IntToBinary(int: Int) { def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) } @@ -366,36 +368,76 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } - test("Filter applied on merged Parquet schema with new column should work") { + test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ Seq("true", "false").map { vectorized => withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { withTempPath { dir => - val path1 = s"${dir.getCanonicalPath}/table1" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1) - val path2 = s"${dir.getCanonicalPath}/table2" - (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(path2) - - // No matter "c = 1" gets pushed down or not, this query should work without exception. - val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a") + val pathOne = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) + val pathTwo = s"${dir.getCanonicalPath}/table2" + (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) + + // If the "c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. This is a Parquet issue (PARQUET-389). + val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") checkAnswer( df, Row(1, "1", null)) - val path3 = s"${dir.getCanonicalPath}/table3" + // The fields "a" and "c" only exist in one Parquet file. + assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + + val pathThree = s"${dir.getCanonicalPath}/table3" + df.write.parquet(pathThree) + + // We will remove the temporary metadata when writing Parquet file. + val schema = spark.read.parquet(pathThree).schema + assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) + + val pathFour = s"${dir.getCanonicalPath}/table4" val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") - dfStruct.select(struct("a").as("s")).write.parquet(path3) + dfStruct.select(struct("a").as("s")).write.parquet(pathFour) - val path4 = s"${dir.getCanonicalPath}/table4" + val pathFive = s"${dir.getCanonicalPath}/table5" val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") - dfStruct2.select(struct("c").as("s")).write.parquet(path4) + dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) - // No matter "s.c = 1" gets pushed down or not, this query should work without exception. - val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1") + // If the "s.c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. + val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") .selectExpr("s") checkAnswer(dfStruct3, Row(Row(null, 1))) + + // The fields "s.a" and "s.c" only exist in one Parquet file. + val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] + assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + + val pathSix = s"${dir.getCanonicalPath}/table6" + dfStruct3.write.parquet(pathSix) + + // We will remove the temporary metadata when writing Parquet file. + val forPathSix = spark.read.parquet(pathSix).schema + assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) + + // sanity test: make sure optional metadata field is not wrongly set. + val pathSeven = s"${dir.getCanonicalPath}/table7" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven) + val pathEight = s"${dir.getCanonicalPath}/table8" + (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) + + val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") + checkAnswer( + df2, + Row(1, "1")) + + // The fields "a" and "b" exist in both two Parquet files. No metadata is set. + assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField)) + assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField)) } } }