From 2d56ac02eaff10972e5bc46f3b57cff993d60e24 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Fri, 18 Dec 2015 15:31:05 -0800 Subject: [PATCH 1/6] another approach to fix this problem --- .../datasources/DataSourceStrategy.scala | 19 +++++++-- .../spark/sql/DataFrameNaFunctionsSuite.scala | 41 +++++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8a15a51d825e..716fa4614638 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -88,16 +88,27 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." } + // need to add projections from combineFilters in + val combineFilter = combineFilters.reduceLeftOption(expressions.And) + val combinedProjects = combineFilter.map(_.references.toSet.union(projects.toSet).toSeq) + .getOrElse(projects) val scan = buildPartitionedTableScan( l, - projects, + combinedProjects, pushedFilters, t.partitionSpec.partitionColumns, selectedPartitions) - combineFilters - .reduceLeftOption(expressions.And) - .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil + // Add a Projection to guarantee the original projection: + // this is because "projs" may be different from the original "projects", + // in elements or their ordering + combineFilter.map(cf => if (projects.isEmpty || combinedProjects.size < 2) { + // If the original projection is empty or the scan's projection + // has 0 or one element, then no need for an extra Project + execution.Filter(cf, scan) + } else { + execution.Project(projects, execution.Filter(cf, scan)) + }).getOrElse(scan) :: Nil // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala index e34875471f09..da19c531fabf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala @@ -194,4 +194,45 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext { assert(out1(4) === Row("Amy", null, null)) assert(out1(5) === Row(null, null, null)) } + + test("Spark-12231: dropna with partitionBy and groupBy") { + withTempPath { dir => + val df = sqlContext.range(10) + val df1 = df.withColumn("a", $"id".cast("int")) + df1.write.partitionBy("id").parquet(dir.getCanonicalPath) + val df2 = sqlContext.read.parquet(dir.getCanonicalPath) + val group = df2.na.drop().groupBy().count().collect() + assert(group(0).get(0) == 10) + } + } + + test("Spark-12231: dropna with partitionBy") { + withTempPath { dir => + val df = sqlContext.range(10) + val df1 = df.withColumn("a", $"id".cast("int")) + df1.write.partitionBy("id").parquet(dir.getCanonicalPath) + val df2 = sqlContext.read.parquet(dir.getCanonicalPath) + val group = df2.na.drop().count() + assert(group === 10L) + } + } + + test("Spark-12231: dropna with sizable projection") { + // use a large projection to (almost) ensure that the projection ordering is respected + withTempPath { dir => + val df = sqlContext.range(10) + val df1 = df.withColumn("a", $"id".cast("int")) + .withColumn("b", $"id".cast("int") + 1) + .withColumn("c", $"id".cast("int") + 2) + .withColumn("d", $"id".cast("int") + 3) + + df1.write.partitionBy("id").parquet(dir.getCanonicalPath) + + val df2 = sqlContext.read.parquet(dir.getCanonicalPath) + + val group = df2.na.drop().orderBy("a").select("a", "b", "c", "d") + val result = group.collect() + assert(result(0) === Row(0, 1, 2, 3)) + } + } } From 305739f872ba90ba9ef4f3ef6c4f812b4024d8e9 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Fri, 18 Dec 2015 15:46:37 -0800 Subject: [PATCH 2/6] update comments --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 716fa4614638..417d8ab488e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -100,8 +100,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { selectedPartitions) // Add a Projection to guarantee the original projection: - // this is because "projs" may be different from the original "projects", - // in elements or their ordering + // this is because "combinedProjects" may be different from the + // original "projects", in elements or their ordering combineFilter.map(cf => if (projects.isEmpty || combinedProjects.size < 2) { // If the original projection is empty or the scan's projection // has 0 or one element, then no need for an extra Project From dbd311b48ffb2870170fbd571a5edd05e9f9fb5f Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Mon, 21 Dec 2015 18:24:39 -0800 Subject: [PATCH 3/6] fixing the code review comments --- .../datasources/DataSourceStrategy.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 417d8ab488e0..1cbce785bcf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -77,7 +77,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) // Predicates with both partition keys and attributes - val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet + val partitionAndNormalColumnFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray @@ -89,26 +89,32 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } // need to add projections from combineFilters in - val combineFilter = combineFilters.reduceLeftOption(expressions.And) - val combinedProjects = combineFilter.map(_.references.toSet.union(projects.toSet).toSeq) - .getOrElse(projects) + val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters) + val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) { + projects + } else { + (partitionAndNormalColumnAttrs ++ projects).toSeq + } + val scan = buildPartitionedTableScan( l, - combinedProjects, + partitionAndNormalColumnProjs, pushedFilters, t.partitionSpec.partitionColumns, selectedPartitions) // Add a Projection to guarantee the original projection: - // this is because "combinedProjects" may be different from the - // original "projects", in elements or their ordering - combineFilter.map(cf => if (projects.isEmpty || combinedProjects.size < 2) { - // If the original projection is empty or the scan's projection - // has 0 or one element, then no need for an extra Project - execution.Filter(cf, scan) - } else { - execution.Project(projects, execution.Filter(cf, scan)) - }).getOrElse(scan) :: Nil + // this is because "partitionAndNormalColumnAttrs" may be different from the original "projects", + // in elements or their ordering + + partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf => + if (projects.isEmpty || projects == partitionAndNormalColumnProjs) { + // if the original projection is empty, no need for the additional Project either + execution.Filter(cf, scan) + } else { + execution.Project(projects, execution.Filter(cf, scan)) + } + ).getOrElse(scan) :: Nil // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => From 357033998f9292202e9ba23732f82885a1a9c529 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Wed, 23 Dec 2015 22:28:50 -0800 Subject: [PATCH 4/6] adding test case in the datasources --- .../parquet/ParquetFilterSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 045425f282ad..52340f921a3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -323,6 +323,26 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-112231: Filter combine partition key and projects doesn't work in DataSource scan") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part=1" + (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").write.parquet(path) + + // If the "part = 1" filter gets pushed down, this query will throw an exception since + // "part" is not a valid column in the actual Parquet file + checkAnswer( + sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 1").orderBy("a") + selectExpr("a", "b", "c", "d"), + (2 to 3).map(i => Row(i, i + 1, i + 2, i + 3, 1))) + } + } + } + + + test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { import testImplicits._ From 1e82c453b3978b59053591ae823976ae56c7e7d2 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 24 Dec 2015 11:27:14 -0800 Subject: [PATCH 5/6] fixing the code review comments --- .../datasources/DataSourceStrategy.scala | 9 +++-- .../parquet/ParquetFilterSuite.scala | 39 ++++++++++++++----- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 1cbce785bcf7..3741a9cb32fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -77,7 +77,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) // Predicates with both partition keys and attributes - val partitionAndNormalColumnFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet + val partitionAndNormalColumnFilters = + filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray @@ -88,7 +89,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." } - // need to add projections from combineFilters in + // need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters) val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) { projects @@ -104,8 +105,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { selectedPartitions) // Add a Projection to guarantee the original projection: - // this is because "partitionAndNormalColumnAttrs" may be different from the original "projects", - // in elements or their ordering + // this is because "partitionAndNormalColumnAttrs" may be different + // from the original "projects", in elements or their ordering partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf => if (projects.isEmpty || projects == partitionAndNormalColumnProjs) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 52340f921a3a..ad02e272a652 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -323,24 +323,45 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-112231: Filter combine partition key and projects doesn't work in DataSource scan") { + test("SPARK-12231: test the filter and empty project in partitioned DataSource scan") { import testImplicits._ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withTempPath { dir => - val path = s"${dir.getCanonicalPath}/part=1" - (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").write.parquet(path) + val path = s"${dir.getCanonicalPath}" + (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d"). + write.partitionBy("a").parquet(path) - // If the "part = 1" filter gets pushed down, this query will throw an exception since - // "part" is not a valid column in the actual Parquet file - checkAnswer( - sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 1").orderBy("a") - selectExpr("a", "b", "c", "d"), - (2 to 3).map(i => Row(i, i + 1, i + 2, i + 3, 1))) + // The filter "a > 1 or b < 2" will not get pushed down, and the projection is empty, + // this query will throw an exception since the project from combinedFilter expect + // two projection while the + val df1 = sqlContext.read.parquet(dir.getCanonicalPath) + + assert(df1.filter("a > 1 or b < 2").count() == 2) } } } + test("SPARK-12231: test the new projection in partitioned DataSource scan") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}" + (1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d"). + write.partitionBy("a").parquet(path) + + // test the generate new projection case + // when projects != partitionAndNormalColumnProjs + + val df1 = sqlContext.read.parquet(dir.getCanonicalPath) + + checkAnswer( + df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", "d"), + (2 to 3).map(i => Row(i, i + 1, i + 2, i + 3))) + } + } + } test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { From 7f4a08554969c5c9ea94bf53da0fa5328fbae651 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 24 Dec 2015 18:21:31 -0800 Subject: [PATCH 6/6] delete the test cases from DataFramNaFunctionSuite --- .../spark/sql/DataFrameNaFunctionsSuite.scala | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala index da19c531fabf..e34875471f09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala @@ -194,45 +194,4 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext { assert(out1(4) === Row("Amy", null, null)) assert(out1(5) === Row(null, null, null)) } - - test("Spark-12231: dropna with partitionBy and groupBy") { - withTempPath { dir => - val df = sqlContext.range(10) - val df1 = df.withColumn("a", $"id".cast("int")) - df1.write.partitionBy("id").parquet(dir.getCanonicalPath) - val df2 = sqlContext.read.parquet(dir.getCanonicalPath) - val group = df2.na.drop().groupBy().count().collect() - assert(group(0).get(0) == 10) - } - } - - test("Spark-12231: dropna with partitionBy") { - withTempPath { dir => - val df = sqlContext.range(10) - val df1 = df.withColumn("a", $"id".cast("int")) - df1.write.partitionBy("id").parquet(dir.getCanonicalPath) - val df2 = sqlContext.read.parquet(dir.getCanonicalPath) - val group = df2.na.drop().count() - assert(group === 10L) - } - } - - test("Spark-12231: dropna with sizable projection") { - // use a large projection to (almost) ensure that the projection ordering is respected - withTempPath { dir => - val df = sqlContext.range(10) - val df1 = df.withColumn("a", $"id".cast("int")) - .withColumn("b", $"id".cast("int") + 1) - .withColumn("c", $"id".cast("int") + 2) - .withColumn("d", $"id".cast("int") + 3) - - df1.write.partitionBy("id").parquet(dir.getCanonicalPath) - - val df2 = sqlContext.read.parquet(dir.getCanonicalPath) - - val group = df2.na.drop().orderBy("a").select("a", "b", "c", "d") - val result = group.collect() - assert(result(0) === Row(0, 1, 2, 3)) - } - } }