Skip to content

Conversation

@HyukjinKwon
Copy link
Member

https://issues.apache.org/jira/browse/SPARK-12227

In this PR, I added the support to drop multiple columns by reference. This was only supported with the string names.

As varargs does not recognise the type for multuple variables, I changed drop(colNames: String*) to drop(colName: String, colNames: String*) correspondingly to orderBy(sortCol: String, sortCols: String*) and sort(sortCol: String, sortCols: String*).

Accordingly, the call in drop(colName: String) was changed.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #47412 has finished for PR 10218 at commit 2223526.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this break binary compatibility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please tell me a bit more about what you meant? I am not used to binary compatibility issues and to me it looks fine about the compatibility between compiled ones with different Spark versions.

One thing I am a bit worried is the ambiguity of parameters (in terms of Java interoperability), which I could fix by adding an additional parameter (if this is a problem).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, I am not too sure about all the versions but at least this works fine at Java 7 and Scala 2.10.4.

  • Scala
class TestCompatibility {
  def test(a: String): Unit = {
    test(a, Seq(): _ *)
  }
  @varargs
  def test(a: String, b: String*): Unit = {
    (a +: b).foreach(println)
  }
  def test(a: Int): Unit = {
    test(Seq(a) : _ *)
  }
  @varargs
  def test(a: Int*): Unit = {
    a.foreach(println)
  }
}
  • Java
public class Test {
    public static void main(String[] args) {
        new TestCompatibility().test("a");
        new TestCompatibility().test("a", "b");
        new TestCompatibility().test("a", "b", "c");
        new TestCompatibility().test(1);
        new TestCompatibility().test(1, 2);
        new TestCompatibility().test(1, 2, 3);
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, the annotation is wrong it was added in #9862 which did not get merged into 1.6 so no worry about changing the method signature.

@marmbrus
Copy link
Contributor

I'm not sure this is worth the complexity. I think most users will only ever drop by name (since dropping a complex expression doesn't really make sense), and in that case constructing a column is strictly more typing.

@HyukjinKwon
Copy link
Member Author

I see. I added this as sort, cube, select and etc do the same thing for supporting it by name and expression. Although I am not greatly insightful for this DataFrame APIs, I feel like some users would feel it is inconsistent if most APIs support this by name and expression but this does not.

Also, I think this might not be so complicated. I just added a loop and made drop(col: Column) call this but I accept that this might look complicated (in terms of readability).

So, it looks like consistency vs readability for me.

@marmbrus
Copy link
Contributor

More functions for the sake of more functions does not make the API easier to use. We should not add functions just to be consistent, we should only add them if they are going to be used.

The complexity come from the fact that users who have a seq of string have to do something like this now:

val toDrop = Seq("col1", "col2", "col3")
df.drop(toDrop.head, toDrop.tail)

Can you construct a case when you would have columns instead of strings that you wanted to drop? In the test case is strictly more typing to use this API compared to the one that exists already. This doesn't seem worth complicating the case above.

val df = src.drop("a", "b")
val df = src.drop(src("a"), src("b"))

@HyukjinKwon
Copy link
Member Author

Thanks for your detailed explanation! Then does this mean closing this?

@marmbrus
Copy link
Contributor

yes please.

@HyukjinKwon HyukjinKwon deleted the SPARK-12227 branch September 23, 2016 18:28
dongjoon-hyun pushed a commit that referenced this pull request Nov 22, 2019
…ValueGroupedDataset

### What changes were proposed in this pull request?

This PR proposes to add `as` API to RelationalGroupedDataset. It creates KeyValueGroupedDataset instance using given grouping expressions, instead of a typed function in groupByKey API. Because it can leverage existing columns, it can use existing data partition, if any, when doing operations like cogroup.

### Why are the changes needed?

Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset.

1. KeyValueGroupedDataset ignores existing data partition if any. That is a problem.
2. groupByKey calls typed function to create additional keys. You can not reuse existing columns, if you just need grouping by them.

```scala
// df1 and df2 are certainly partitioned and sorted.
val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
```
```scala
// This groupBy.as.cogroup won't unnecessarily repartition the data
val df3 = df1.groupBy("a").as[Int]
  .cogroup(df2.groupBy("a").as[Int]) { case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
    }
}
```

```
== Physical Plan ==
*(5) SerializeFromObject [input[0, int, false] AS value#11247]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4922/12067092816eec1b6f, a#11209: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11209], [a#11225], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11246: int
   :- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#11209, 5), false, [id=#10218]
   :     +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :        +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(4) Sort [a#11225 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#11225, 5), false, [id=#10223]
         +- *(3) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
            +- *(3) LocalTableScan [_1#11218, _2#11219, _3#11220]
```

```scala
// Current approach creates additional AppendColumns and repartition data again
val df4 = df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) {
  case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
  }
}
```

```
== Physical Plan ==
*(7) SerializeFromObject [input[0, int, false] AS value#11257]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4933/138102700737171997, value#11252: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11252], [value#11254], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11256: int
   :- *(3) Sort [value#11252 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(value#11252, 5), true, [id=#10302]
   :     +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4930/19529195347ce07f47, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11252]
   :        +- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :           +- Exchange hashpartitioning(a#11209, 5), false, [id=#10297]
   :              +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :                 +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(6) Sort [value#11254 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#11254, 5), true, [id=#10312]
         +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4932/15265288491f0e0c1f, createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11254]
            +- *(5) Sort [a#11225 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(a#11225, 5), false, [id=#10307]
                  +- *(4) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
                     +- *(4) LocalTableScan [_1#11218, _2#11219, _3#11220]
```

### Does this PR introduce any user-facing change?

Yes, this adds a new `as` API to RelationalGroupedDataset. Users can use it to create KeyValueGroupedDataset and do cogroup.

### How was this patch tested?

Unit tests.

Closes #26509 from viirya/SPARK-29427-2.

Lead-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants