From f2ca6d0e05a7c7c65213a05781b1d69cdc991bea Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 10:26:01 -0800 Subject: [PATCH 01/17] Drop multiple columns in the DataFrame API --- .../org/apache/spark/sql/DataFrame.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 98358127e2709..662741b0a6e2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1254,6 +1254,30 @@ class DataFrame private[sql]( } } + /** + * Returns a new [[DataFrame]] with columns dropped. + * This is a no-op if schema doesn't contain column name(s). + * @group dfops + * @since 1.6.0 + */ + def drop(colNames: String*): DataFrame = { + val resolver = sqlContext.analyzer.resolver + val iter = colNames.iterator + var df = this + while (iter.hasNext) { + val colName = iter.next() + val shouldDrop = df.schema.exists(f => resolver(f.name, colName)) + if (shouldDrop) { + val colsAfterDrop = df.schema.filter { field => + val name = field.name + !resolver(name, colName) + }.map(f => Column(f.name)) + df = df.select(colsAfterDrop : _*) + } + } + df + } + /** * Returns a new [[DataFrame]] with a column dropped. * This version of drop accepts a Column rather than a name. From 01686ad259b66715e6ed5dcabc15ab3fa548b48a Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 12:04:53 -0800 Subject: [PATCH 02/17] Add varargs annotation --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 662741b0a6e2d..ca2274858b91f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1260,6 +1260,7 @@ class DataFrame private[sql]( * @group dfops * @since 1.6.0 */ + @scala.annotation.varargs def drop(colNames: String*): DataFrame = { val resolver = sqlContext.analyzer.resolver val iter = colNames.iterator From e1612caa2beaf7ee989f3c5663a5e62d8488f897 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 12:56:46 -0800 Subject: [PATCH 03/17] have the single column version delegate to the new method --- .../main/scala/org/apache/spark/sql/DataFrame.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index ca2274858b91f..5caee2c8ad800 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1241,17 +1241,7 @@ class DataFrame private[sql]( * @since 1.4.0 */ def drop(colName: String): DataFrame = { - val resolver = sqlContext.analyzer.resolver - val shouldDrop = schema.exists(f => resolver(f.name, colName)) - if (shouldDrop) { - val colsAfterDrop = schema.filter { field => - val name = field.name - !resolver(name, colName) - }.map(f => Column(f.name)) - select(colsAfterDrop : _*) - } else { - this - } + drop(colName.toSeq) } /** From 2c23f90de7e9a158e905dc840a980507bcce7c5e Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 13:02:56 -0800 Subject: [PATCH 04/17] have the single column version delegate to the new method --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5caee2c8ad800..d7e8af91765b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1241,7 +1241,7 @@ class DataFrame private[sql]( * @since 1.4.0 */ def drop(colName: String): DataFrame = { - drop(colName.toSeq) + drop(Seq(colName)) } /** From 64a959b8ab2cdce647b260f1ee8c6e3bf5b69bda Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 13:17:21 -0800 Subject: [PATCH 05/17] Correct syntax for passing varargs --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d7e8af91765b1..3af8fc7481133 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1241,7 +1241,7 @@ class DataFrame private[sql]( * @since 1.4.0 */ def drop(colName: String): DataFrame = { - drop(Seq(colName)) + drop(Seq(colName) : _*) } /** From 4541231c88cabb08af96c81a01c011190b620945 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 15:28:08 -0800 Subject: [PATCH 06/17] Add test for dropping multiple columns --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index dd6d06512ff60..4fedd56e34f40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -378,6 +378,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.schema.map(_.name) === Seq("value")) } + test("drop columns using drop") { + val src = Seq((1,2,3)).toDF("a", "b", "c") + val df = src.drop("a", "b") + checkAnswer( + df, + src.collect().map(x => Row(x.getInt(2))).toSeq) + assert(df.schema.map(_.name) === Seq("c")) + } + test("drop unknown column (no-op)") { val df = testData.drop("random") checkAnswer( From 6bbe12f97c5e0a377df03707d44755408d64c41d Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 16:05:50 -0800 Subject: [PATCH 07/17] Formatting --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 4fedd56e34f40..754878183eb68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -382,8 +382,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val src = Seq((1,2,3)).toDF("a", "b", "c") val df = src.drop("a", "b") checkAnswer( - df, - src.collect().map(x => Row(x.getInt(2))).toSeq) + df, src.collect().map(x => Row(x.getInt(2))).toSeq) assert(df.schema.map(_.name) === Seq("c")) } From de0ec018a584713a3ee1438045885dbeb6ce2c6c Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 16:15:59 -0800 Subject: [PATCH 08/17] Fix scalastyle warning --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 754878183eb68..6a16c15cb4bae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -379,7 +379,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("drop columns using drop") { - val src = Seq((1,2,3)).toDF("a", "b", "c") + val src = Seq((1, 2, 3)).toDF("a", "b", "c") val df = src.drop("a", "b") checkAnswer( df, src.collect().map(x => Row(x.getInt(2))).toSeq) From c512662466f3f7da991a736b713edaa7dad877fc Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Nov 2015 16:26:39 -0800 Subject: [PATCH 09/17] Minor change --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6a16c15cb4bae..5d12171ebc394 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -379,7 +379,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("drop columns using drop") { - val src = Seq((1, 2, 3)).toDF("a", "b", "c") + val src = Seq((0, 2, 3)).toDF("a", "b", "c") val df = src.drop("a", "b") checkAnswer( df, src.collect().map(x => Row(x.getInt(2))).toSeq) From 34ccee00119214ecb2b86567a94cc2a9e56e681d Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 16:53:20 -0800 Subject: [PATCH 10/17] Address Michael's review comments --- .../org/apache/spark/sql/DataFrame.scala | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3af8fc7481133..8d6e9b5107613 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1252,21 +1252,12 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val resolver = sqlContext.analyzer.resolver - val iter = colNames.iterator - var df = this - while (iter.hasNext) { - val colName = iter.next() - val shouldDrop = df.schema.exists(f => resolver(f.name, colName)) - if (shouldDrop) { - val colsAfterDrop = df.schema.filter { field => - val name = field.name - !resolver(name, colName) - }.map(f => Column(f.name)) - df = df.select(colsAfterDrop : _*) - } + val remainingColumns = df.schema.filter(f => colNames.contains(f.name)).map(f=>Column(f.name)) + if (remainingColumns.size == df.schema.size) { + this + } else { + df.select(remainingColumns: _*) } - df } /** From 1e4555a0836664a96f5f50f9995a44e44611a8b1 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 17:40:04 -0800 Subject: [PATCH 11/17] Address Scalastyle check warnings --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 8d6e9b5107613..24d894fab2fc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1252,11 +1252,11 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val remainingColumns = df.schema.filter(f => colNames.contains(f.name)).map(f=>Column(f.name)) - if (remainingColumns.size == df.schema.size) { + val remainingCols = df.schema.filter(f => colNames.contains(f.name)).map(f=>Column(f.name)) + if (remainingCols.size == df.schema.size) { this } else { - df.select(remainingColumns: _*) + df.select(remainingCols: _*) } } From 394632f5eca777e9178463defa47e9c855c9098b Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 17:59:40 -0800 Subject: [PATCH 12/17] Address Scalastyle check warnings --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 24d894fab2fc2..9c1575ab072a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1252,7 +1252,7 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val remainingCols = df.schema.filter(f => colNames.contains(f.name)).map(f=>Column(f.name)) + val remainingCols = df.schema.filter(f => colNames.contains(f.name)).map(f => Column(f.name)) if (remainingCols.size == df.schema.size) { this } else { From 86c68bce5a5b444fe6f787f022db276794d34105 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 18:00:38 -0800 Subject: [PATCH 13/17] Address Scalastyle check warnings --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 9c1575ab072a6..e8c092bb7077c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1253,7 +1253,7 @@ class DataFrame private[sql]( @scala.annotation.varargs def drop(colNames: String*): DataFrame = { val remainingCols = df.schema.filter(f => colNames.contains(f.name)).map(f => Column(f.name)) - if (remainingCols.size == df.schema.size) { + if (remainingCols.size == df.schema.size) { this } else { df.select(remainingCols: _*) From b18fc6ec08dd9fae4d65075aff629b1a42f5e7ee Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 18:53:00 -0800 Subject: [PATCH 14/17] Address compilation error --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index e8c092bb7077c..6e581f742e15a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1252,11 +1252,11 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val remainingCols = df.schema.filter(f => colNames.contains(f.name)).map(f => Column(f.name)) - if (remainingCols.size == df.schema.size) { + val remainingCols = this.schema.filter(f => colNames.contains(f.name)).map(f => Column(f.name)) + if (remainingCols.size == this.schema.size) { this } else { - df.select(remainingCols: _*) + this.select(remainingCols: _*) } } From 331b892e0a87193337c60608dd5d32b46d51e25c Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 21:23:28 -0800 Subject: [PATCH 15/17] Address Wenchen's comment --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6e581f742e15a..44ca26a3d95d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1252,7 +1252,9 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def drop(colNames: String*): DataFrame = { - val remainingCols = this.schema.filter(f => colNames.contains(f.name)).map(f => Column(f.name)) + val resolver = sqlContext.analyzer.resolver + val remainingCols = + schema.filter(f => colNames.forall(n => !resolver(f.name))).map(f => Column(f.name)) if (remainingCols.size == this.schema.size) { this } else { From 04adf769b2e0371fa98cf51107a0fa7f3bfde9c7 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 3 Dec 2015 21:48:23 -0800 Subject: [PATCH 16/17] Add missing parameter for resolver --- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 44ca26a3d95d1..06baa3acab81c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1254,7 +1254,7 @@ class DataFrame private[sql]( def drop(colNames: String*): DataFrame = { val resolver = sqlContext.analyzer.resolver val remainingCols = - schema.filter(f => colNames.forall(n => !resolver(f.name))).map(f => Column(f.name)) + schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name)) if (remainingCols.size == this.schema.size) { this } else { From a28313b12a8c2bb3819e8a02c052597e8747d67b Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 4 Dec 2015 19:40:48 -0800 Subject: [PATCH 17/17] Address Wenchen's comment --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5d12171ebc394..7763da486b017 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -381,8 +381,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("drop columns using drop") { val src = Seq((0, 2, 3)).toDF("a", "b", "c") val df = src.drop("a", "b") - checkAnswer( - df, src.collect().map(x => Row(x.getInt(2))).toSeq) + checkAnswer(df, Row(3)) assert(df.schema.map(_.name) === Seq("c")) }