From 2333c6d3dffd580529705e33f5ccdc8871670c0f Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Mon, 14 Dec 2015 11:51:35 -0800 Subject: [PATCH 1/2] another approach to fix this problem --- .../datasources/DataSourceStrategy.scala | 4 ++- .../spark/sql/DataFrameNaFunctionsSuite.scala | 27 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 544d5eccec03..a6b17305d5da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -85,9 +85,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." } + // need to add projections from combineFilters in + val combineProjections = projects.toSet.union(combineFilters.flatMap(_.references).toSet).toSeq val scan = buildPartitionedTableScan( l, - projects, + combineProjections, pushedFilters, t.partitionSpec.partitionColumns, selectedPartitions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala index e34875471f09..0b4c59038dd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala @@ -194,4 +194,31 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext { assert(out1(4) === Row("Amy", null, null)) assert(out1(5) === Row(null, null, null)) } + + test("dropna with partitionBy and groupBy") { + withTempPath { dir => + val df = sqlContext.range(10) + val df1 = df.withColumn("a", $"id".cast("int")) + df1.write.partitionBy("id").parquet(dir.getCanonicalPath) + + val df2 = sqlContext.read.parquet(dir.getCanonicalPath) + + val group = df2.na.drop().groupBy().count() + group.collect() + } + } + + test("dropna with partitionBy") { + withTempPath { dir => + val df = sqlContext.range(10) + val df1 = df.withColumn("a", $"id".cast("int")) + df1.write.partitionBy("id").parquet(dir.getCanonicalPath) + + val df2 = sqlContext.read.parquet(dir.getCanonicalPath) + + val group = df2.na.drop().count() + + } + } + } From 4d470769ae9194ed646d40db65a0c7c3c762d064 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Mon, 14 Dec 2015 16:25:44 -0800 Subject: [PATCH 2/2] change the scala style --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a6b17305d5da..41a5f03db5c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -86,7 +86,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } // need to add projections from combineFilters in - val combineProjections = projects.toSet.union(combineFilters.flatMap(_.references).toSet).toSeq + val combineProjections = + projects.toSet.union(combineFilters.flatMap(_.references).toSet).toSeq val scan = buildPartitionedTableScan( l, combineProjections,