From a7b89bd414874fc62576f7fb54e9f5e2ffe5f397 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 16 Jun 2016 19:59:00 -0700 Subject: [PATCH 1/5] fix --- .../parquet/ParquetFileFormat.scala | 20 ++++++++----------- .../parquet/ParquetFilterSuite.scala | 20 +++++++++++++++++++ .../datasources/parquet/ParquetIOSuite.scala | 18 +++++++++-------- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2cce3db9a692..ba24cc3342e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -363,9 +363,11 @@ private[sql] class ParquetFileFormat if (returningBatch) { vectorizedReader.enableReturningBatches() } - vectorizedReader + + val iter = new RecordReaderIterator(vectorizedReader) + iter.asInstanceOf[Iterator[InternalRow]] } else { - logDebug(s"Falling back to parquet-mr") + logDebug(s"Use parquet-mr") val reader = pushed match { case Some(filter) => new ParquetRecordReader[InternalRow]( @@ -375,26 +377,20 @@ private[sql] class ParquetFileFormat new ParquetRecordReader[InternalRow](new ParquetReadSupport) } reader.initialize(split, hadoopAttemptContext) - reader - } - val iter = new RecordReaderIterator(parquetReader) - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && - enableVectorizedReader) { - iter.asInstanceOf[Iterator[InternalRow]] - } else { + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + val iter = new RecordReaderIterator(reader) // This is a horrible erasure hack... if we type the iterator above, then it actually check // the type in next() and we get a class cast exception. If we make that function return // Object, then we can defer the cast until later! iter.asInstanceOf[Iterator[InternalRow]] - .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) } + parquetReader } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 45fd6a5d80de..bee94816c123 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -545,4 +545,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("Verify SQLConf PARQUET_FILTER_PUSHDOWN_ENABLED") { + import testImplicits._ + + Seq("true", "false").foreach { pushDown => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) + // When a filter is pushed to Parquet, Parquet can apply it to every row. + // So, we can check the number of rows returned from the Parquet + // to make sure our filter pushdown work. + val df = spark.read.parquet(path).where("a > 2") + val numExpectedRows = if (pushDown == "true") 1 else 3 + assert(stripSparkFilter(df).count == numExpectedRows) + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index fc9ce6bb3041..0154d16e9d16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -451,7 +451,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { val extraOptions = Map( SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, - "spark.sql.parquet.output.committer.class" -> + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName ) withTempPath { dir => @@ -462,23 +462,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("SPARK-6330 regression test") { + test("SPARK-6330 non-existent file or host") { // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// - intercept[Throwable] { + var e = intercept[AnalysisException] { spark.read.parquet("file:///nonexistent") - } - val errorMessage = intercept[Throwable] { + }.getMessage + assert(e.contains("Path does not exist: file:/nonexistent")) + + e = intercept[IllegalArgumentException] { spark.read.parquet("hdfs://nonexistent") - }.toString - assert(errorMessage.contains("UnknownHostException")) + }.getMessage + assert(e.contains("java.net.UnknownHostException: nonexistent")) } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { // Using a output committer that always fail when committing a task, so that both // `commitTask()` and `abortTask()` are invoked. val extraOptions = Map[String, String]( - "spark.sql.parquet.output.committer.class" -> + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName ) From a1da7981638723e07753e0aa97686602d3bb38a3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 16 Jun 2016 20:19:52 -0700 Subject: [PATCH 2/5] update the comment --- .../sql/execution/datasources/parquet/ParquetFilterSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index bee94816c123..a312ff0ef3bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -550,6 +550,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex import testImplicits._ Seq("true", "false").foreach { pushDown => + // When SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key is set to true and all the data types + // of the table schema are AtomicType, the parquet reader uses vectorizedReader. + // In this mode, filters will not be pushed down, no matter whether withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown, SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { withTempPath { dir => From ad1f18cf4ebf189581997876cd13614ec940b961 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 16 Jun 2016 20:33:16 -0700 Subject: [PATCH 3/5] update the document --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 ++++-- .../execution/datasources/parquet/ParquetFilterSuite.scala | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4b8916f59c41..09b59c5314b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -215,7 +215,8 @@ object SQLConf { .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown") - .doc("Enables Parquet filter push-down optimization when set to true.") + .doc("Enables Parquet filter push-down optimization when set to true and vectorized parquet " + + "decoding is not being used.") .booleanConf .createWithDefault(true) @@ -236,7 +237,8 @@ object SQLConf { val PARQUET_VECTORIZED_READER_ENABLED = SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader") - .doc("Enables vectorized parquet decoding.") + .doc("Enables vectorized parquet decoding when set to true and all the data types of " + + "the table schemas are atomic types.") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a312ff0ef3bc..4405cf86881b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -553,6 +553,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // When SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key is set to true and all the data types // of the table schema are AtomicType, the parquet reader uses vectorizedReader. // In this mode, filters will not be pushed down, no matter whether + // SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key is true or not. withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown, SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { withTempPath { dir => From 9967cc72545324e7a542fcf1b49372d977c0011b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 17 Jun 2016 00:05:02 -0700 Subject: [PATCH 4/5] update the comments. --- .../execution/datasources/parquet/ParquetFilterSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 4405cf86881b..5226a39c8aae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -452,7 +452,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val df = spark.read.parquet(path).filter("a = 2") // The result should be single row. - // When a filter is pushed to Parquet, Parquet can apply it to every row. + // When a filter is pushed to Parquet, Parquet can apply it to every row group. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. assert(stripSparkFilter(df).count == 1) @@ -524,7 +524,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val path = s"${dir.getCanonicalPath}/table1" (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) - // When a filter is pushed to Parquet, Parquet can apply it to every row. + // When a filter is pushed to Parquet, Parquet can apply it to every row group. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. val df = spark.read.parquet(path).where("b in (0,2)") @@ -559,7 +559,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) - // When a filter is pushed to Parquet, Parquet can apply it to every row. + // When a filter is pushed to Parquet, Parquet can apply it to every row group. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. val df = spark.read.parquet(path).where("a > 2") From d1b2cbbe73e74ee80dd3afa6a9a1fe5214138b22 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 17 Jun 2016 00:21:15 -0700 Subject: [PATCH 5/5] update the comments. --- .../datasources/parquet/ParquetFilterSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5226a39c8aae..28e2a6ee76bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -452,8 +452,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val df = spark.read.parquet(path).filter("a = 2") // The result should be single row. - // When a filter is pushed to Parquet, Parquet can apply it to every row group. - // So, we can check the number of rows returned from the Parquet + // When a filter is pushed to Parquet, Parquet can apply it to every row group and + // then every row. So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. assert(stripSparkFilter(df).count == 1) } @@ -524,8 +524,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val path = s"${dir.getCanonicalPath}/table1" (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) - // When a filter is pushed to Parquet, Parquet can apply it to every row group. - // So, we can check the number of rows returned from the Parquet + // When a filter is pushed to Parquet, Parquet can apply it to every row group and + // then every row. So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. val df = spark.read.parquet(path).where("b in (0,2)") assert(stripSparkFilter(df).count == 3) @@ -559,8 +559,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) - // When a filter is pushed to Parquet, Parquet can apply it to every row group. - // So, we can check the number of rows returned from the Parquet + // When a filter is pushed to Parquet, Parquet can apply it to every row group and + // then every row. So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. val df = spark.read.parquet(path).where("a > 2") val numExpectedRows = if (pushDown == "true") 1 else 3