diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2cce3db9a692..ba24cc3342e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -363,9 +363,11 @@ private[sql] class ParquetFileFormat if (returningBatch) { vectorizedReader.enableReturningBatches() } - vectorizedReader + + val iter = new RecordReaderIterator(vectorizedReader) + iter.asInstanceOf[Iterator[InternalRow]] } else { - logDebug(s"Falling back to parquet-mr") + logDebug(s"Use parquet-mr") val reader = pushed match { case Some(filter) => new ParquetRecordReader[InternalRow]( @@ -375,26 +377,20 @@ private[sql] class ParquetFileFormat new ParquetRecordReader[InternalRow](new ParquetReadSupport) } reader.initialize(split, hadoopAttemptContext) - reader - } - val iter = new RecordReaderIterator(parquetReader) - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && - enableVectorizedReader) { - iter.asInstanceOf[Iterator[InternalRow]] - } else { + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + val iter = new RecordReaderIterator(reader) // This is a horrible erasure hack... if we type the iterator above, then it actually check // the type in next() and we get a class cast exception. If we make that function return // Object, then we can defer the cast until later! iter.asInstanceOf[Iterator[InternalRow]] - .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) } + parquetReader } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4b8916f59c41..09b59c5314b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -215,7 +215,8 @@ object SQLConf { .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown") - .doc("Enables Parquet filter push-down optimization when set to true.") + .doc("Enables Parquet filter push-down optimization when set to true and vectorized parquet " + + "decoding is not being used.") .booleanConf .createWithDefault(true) @@ -236,7 +237,8 @@ object SQLConf { val PARQUET_VECTORIZED_READER_ENABLED = SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader") - .doc("Enables vectorized parquet decoding.") + .doc("Enables vectorized parquet decoding when set to true and all the data types of " + + "the table schemas are atomic types.") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 45fd6a5d80de..28e2a6ee76bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -452,8 +452,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val df = spark.read.parquet(path).filter("a = 2") // The result should be single row. - // When a filter is pushed to Parquet, Parquet can apply it to every row. - // So, we can check the number of rows returned from the Parquet + // When a filter is pushed to Parquet, Parquet can apply it to every row group and + // then every row. So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. assert(stripSparkFilter(df).count == 1) } @@ -524,8 +524,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val path = s"${dir.getCanonicalPath}/table1" (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) - // When a filter is pushed to Parquet, Parquet can apply it to every row. - // So, we can check the number of rows returned from the Parquet + // When a filter is pushed to Parquet, Parquet can apply it to every row group and + // then every row. So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. val df = spark.read.parquet(path).where("b in (0,2)") assert(stripSparkFilter(df).count == 3) @@ -545,4 +545,28 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("Verify SQLConf PARQUET_FILTER_PUSHDOWN_ENABLED") { + import testImplicits._ + + Seq("true", "false").foreach { pushDown => + // When SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key is set to true and all the data types + // of the table schema are AtomicType, the parquet reader uses vectorizedReader. + // In this mode, filters will not be pushed down, no matter whether + // SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key is true or not. + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) + // When a filter is pushed to Parquet, Parquet can apply it to every row group and + // then every row. So, we can check the number of rows returned from the Parquet + // to make sure our filter pushdown work. + val df = spark.read.parquet(path).where("a > 2") + val numExpectedRows = if (pushDown == "true") 1 else 3 + assert(stripSparkFilter(df).count == numExpectedRows) + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index fc9ce6bb3041..0154d16e9d16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -451,7 +451,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { val extraOptions = Map( SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, - "spark.sql.parquet.output.committer.class" -> + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName ) withTempPath { dir => @@ -462,23 +462,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("SPARK-6330 regression test") { + test("SPARK-6330 non-existent file or host") { // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// - intercept[Throwable] { + var e = intercept[AnalysisException] { spark.read.parquet("file:///nonexistent") - } - val errorMessage = intercept[Throwable] { + }.getMessage + assert(e.contains("Path does not exist: file:/nonexistent")) + + e = intercept[IllegalArgumentException] { spark.read.parquet("hdfs://nonexistent") - }.toString - assert(errorMessage.contains("UnknownHostException")) + }.getMessage + assert(e.contains("java.net.UnknownHostException: nonexistent")) } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { // Using a output committer that always fail when committing a task, so that both // `commitTask()` and `abortTask()` are invoked. val extraOptions = Map[String, String]( - "spark.sql.parquet.output.committer.class" -> + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName )